124 lines
3.3 KiB
GDScript3
Raw Permalink Normal View History

2025-01-11 00:25:52 +01:00
static func observable_delay_timespan(
source : Observable,
duetime : float,
scheduler : SchedulerBase = null
) -> Observable:
var _duetime : RefValue = RefValue.Set(duetime)
var subscribe = func(
observer : ObserverBase,
scheduler_ : SchedulerBase = null
) -> DisposableBase:
var _scheduler : SchedulerBase
if scheduler != null: _scheduler = scheduler
elif scheduler_ != null: _scheduler = scheduler_
else: _scheduler = SceneTreeTimeoutScheduler.singleton()
var duetime_ = _duetime.v
var cancelable = SerialDisposable.new()
var error : RefValue = RefValue.Null()
var active = [false]
var running = [false]
var queue : Array[Tuple] = []
@warning_ignore("shadowed_variable")
var on_next = func(notification : Tuple):
var should_run = false
if true:
var __ = LockGuard.new(source.lock)
if notification.at(0) is OnErrorNotification:
queue.clear()
queue.append(notification)
error.v = notification.at(0).err
should_run = not running[0]
else:
queue.append(
Tuple.new([
notification.at(0),
notification.at(1) + duetime_
])
)
should_run = not active[0]
active[0] = true
if should_run:
if error.v != null:
observer.on_error(error.v)
else:
var mad = MultipleAssignmentDisposable.new()
cancelable.disposable = mad
var action = func(scheduler : SchedulerBase, _state = null, __action_rec : Callable = func(__, ___, ____): return null):
if error.v != null:
return
var err
var should_continue : bool
var recurse_duetime : float
if true:
var __ = LockGuard.new(source.lock)
running[0] = true
while true:
var result : Notification = null
if not queue.is_empty() and queue[0].at(1) <= scheduler.now():
result = queue.pop_front().at(0)
if result != null:
result.accept(observer)
if result == null:
break
should_continue = false
recurse_duetime = 0
if not queue.is_empty():
should_continue = true
var diff = queue[0].at(1) - scheduler.now()
recurse_duetime = max(0, diff)
else:
active[0] = false
err = error.v
running[0] = false
if err != null:
observer.on_error(err)
elif should_continue:
mad.disposable = scheduler.schedule_relative(
recurse_duetime, __action_rec.bind(__action_rec)
)
mad.disposable = _scheduler.schedule_relative(duetime_, action.bind(action))
var subscription = source.pipe2(
GDRx.op.materialize(),
GDRx.op.timestamp()
).subscribe(on_next, GDRx.basic.noop, GDRx.basic.noop, _scheduler)
return CompositeDisposable.new([subscription, cancelable])
return Observable.new(subscribe)
static func delay_(
duetime : float, scheduler : SchedulerBase = null
) -> Callable:
var delay = func(source : Observable) -> Observable:
# """Time shifts the observable sequence.
#
# A partially applied delay operator function.
#
# Examples:
# >>> var res = delay.call(source)
#
# Args:
# source: The observable sequence to delay.
#
# Returns:
# A time-shifted observable sequence.
# """
return observable_delay_timespan(source, duetime, scheduler)
return delay