EvilCocoGame1/addons/reactivex/operators/_takelastwithtime.gd

59 lines
1.8 KiB
GDScript3
Raw Normal View History

2025-01-11 00:25:52 +01:00
static func take_last_with_time_(
duration : float,
scheduler : SchedulerBase = null
) -> Callable:
var take_last_with_time = func(source : Observable) -> Observable:
# """Returns elements within the specified duration from the end
# of the observable source sequence.
#
# Example:
# >>> var res = take_last_with_time.call(source)
#
# This operator accumulates a queue with a length enough to store
# elements received during the initial duration window. As more
# elements are received, elements older than the specified
# duration are taken from the queue and produced on the result
# sequence. This causes elements to be delayed with duration.
#
# Args:
# duration: Duration for taking elements from the end of the
# sequence.
#
# Returns:
# An observable sequence with the elements taken during the
# specified duration from the end of the source sequence.
# """
var subscribe = func(
observer : ObserverBase,
scheduler_ : SchedulerBase = null
) -> DisposableBase:
var _scheduler : SchedulerBase
if scheduler != null: _scheduler = scheduler
elif scheduler_ != null: _scheduler = scheduler_
else: _scheduler = SceneTreeTimeoutScheduler.singleton()
var q : Array[Dictionary] = []
var on_next = func(x):
var now = _scheduler.now()
q.append({"interval": now, "value": x})
while not q.is_empty() and now - q[0]["interval"] >= duration:
q.pop_front()
var on_completed = func():
var now = _scheduler.now()
while not q.is_empty():
var _next = q.pop_front()
if now - _next["interval"] <= duration:
observer.on_next(_next["value"])
observer.on_completed()
return source.subscribe(
on_next, observer.on_error, on_completed,
scheduler_
)
return Observable.new(subscribe)
return take_last_with_time