TestGame/addons/reactivex/operators/_delaywithmapper.gd
2024-12-27 21:00:07 +01:00

96 lines
2.4 KiB
GDScript

static func delay_with_mapper_(
subscription_delay = null,
delay_duration_mapper = null
) -> Callable:
var delay_with_mapper = func(source : Observable) -> Observable:
# """Time shifts the observable sequence based on a subscription
# delay and a delay mapper function for each element.
#
# Examples:
# >>> var obs = delay_with_selector.call(source)
#
# Args:
# subscription_delay: [Optional] Sequence indicating the
# delay for the subscription to the source.
# delay_duration_mapper: [Optional] Selector function to
# retrieve a sequence indicating the delay for each given
# element.
#
# Returns:
# Time-shifted observable sequence.
# """
var sub_delay : Observable = null
var mapper = null
if subscription_delay is ObservableBase:
mapper = delay_duration_mapper
sub_delay = subscription_delay
else:
mapper = subscription_delay
var subscribe = func(
observer : ObserverBase,
scheduler : SchedulerBase = null
) -> DisposableBase:
var delays = CompositeDisposable.new()
var at_end = [false]
var done = func():
if at_end[0] and delays.length == 0:
observer.on_completed()
var subscription = SerialDisposable.new()
var start = func():
var on_next = func(x):
var delay = RefValue.Null()
if GDRx.try(func():
if GDRx.assert_(mapper != null): return
delay.v = mapper.call(x)
) \
.catch("Error", func(error):
observer.on_error(error)
) \
.end_try_catch(): return
var d = SingleAssignmentDisposable.new()
delays.add(d)
var on_next = func(__):
observer.on_next(x)
delays.remove(d)
done.call()
var on_completed = func():
observer.on_next(x)
delays.remove(d)
done.call()
d.disposable = delay.v.subscribe(
on_next, observer.on_error, on_completed,
scheduler
)
var on_completed = func():
at_end[0] = true
subscription.dispose()
done.call()
subscription.set_disposable(source.subscribe(
on_next, observer.on_error, on_completed,
scheduler
))
if sub_delay == null:
start.call()
else:
subscription.set_disposable(sub_delay.subscribe(
func(__): start.call(), observer.on_error, start
))
return CompositeDisposable.new([subscription, delays])
return Observable.new(subscribe)
return delay_with_mapper