EvilCocoGame1/addons/reactivex/operators/_timeoutwithmapper.gd

122 lines
3.3 KiB
GDScript3
Raw Permalink Normal View History

2025-01-11 00:25:52 +01:00
static func timeout_with_mapper_(
first_timeout : Observable = null,
timeout_duration_mapper : Callable = func(__) -> Observable: return GDRx.obs.never(),
other : Observable = null
) -> Callable:
# """Returns the source observable sequence, switching to the other
# observable sequence if a timeout is signaled.
#
# var res = GDRx.obs.timeout_with_mapper(GDRx.obs.timer(500))
# var res = GDRx.obs.timeout_with_mapper(GDRx.obs.timer(500), func(x): return GDRx.obs.timer(200))
# var res = GDRx.obs.timeout_with_mapper(
# GDRx.obs.timer(500),
# func(x): return GDRx.obs.timer(200)),
# GDRx.obs.return_value(42)
# )
#
# Args:
# first_timeout -- [Optional] Observable sequence that represents the
# timeout for the first element. If not provided, this defaults to
# reactivex.never().
# timeout_duration_mapper -- [Optional] Selector to retrieve an
# observable sequence that represents the timeout between the
# current element and the next element.
# other -- [Optional] Sequence to return in case of a timeout. If not
# provided, this is set to reactivex.throw().
#
# Returns:
# The source sequence switching to the other sequence in case
# of a timeout.
# """
var first_timeout_ = first_timeout if first_timeout != null else GDRx.obs.never()
var other_ = other if other != null else GDRx.obs.throw(RxBaseError.new("Timeout"))
var timeout_with_mapper = func(source : Observable) -> Observable:
var subscribe = func(
observer : ObserverBase,
scheduler : SchedulerBase = null
) -> DisposableBase:
var subscription = SerialDisposable.new()
var timer = SerialDisposable.new()
var original = SingleAssignmentDisposable.new()
subscription.disposable = original
var switched = false
var _id = [0]
var set_timer = func(timeout : Observable):
var my_id = _id[0]
var timer_wins = func():
return _id[0] == my_id
var d = SingleAssignmentDisposable.new()
timer.disposable = d
var on_next = func(__):
if timer_wins.call():
subscription.disposable = other_.subscribe(
observer, GDRx.basic.noop, GDRx.basic.noop, scheduler
)
d.dispose()
var on_error = func(e):
if timer_wins.call():
observer.on_error(e)
var on_completed = func():
if timer_wins.call():
subscription.disposable = other_.subscribe(
observer
)
d.disposable = timeout.subscribe(
on_next, on_error, on_completed,
scheduler
)
set_timer.call(first_timeout_)
var observer_wins = func():
var res = not switched
if res:
_id[0] += 1
return res
var on_next = func(x):
if observer_wins.call():
observer.on_next(x)
var timeout = RefValue.Null()
if GDRx.try(func():
timeout.v = timeout_duration_mapper.call(x)
) \
.catch("Error", func(e):
observer.on_error(e)
) \
.end_try_catch(): return
set_timer.call(timeout.v)
var on_error = func(error):
if observer_wins.call():
observer.on_error(error)
var on_completed = func():
if observer_wins.call():
observer.on_completed()
original.disposable = source.subscribe(
on_next, on_error, on_completed,
scheduler
)
return CompositeDisposable.new([subscription, timer])
return Observable.new(subscribe)
return timeout_with_mapper