55 lines
1.5 KiB
GDScript3
Raw Permalink Normal View History

2024-12-27 21:00:07 +01:00
static func exclusive_() -> Callable:
# """Performs a exclusive waiting for the first to finish before
# subscribing to another observable. Observables that come in between
# subscriptions will be dropped on the floor.
#
# Returns:
# An exclusive observable with only the results that
# happen when subscribed.
# """
var exclusive = func(source : Observable) -> Observable:
var subscribe = func(
observer : ObserverBase, scheduler : SchedulerBase = null
) -> DisposableBase:
var has_current = [false]
var is_stopped = [false]
var m = SingleAssignmentDisposable.new()
var g = CompositeDisposable.new()
g.add(m)
var on_next = func(inner_source : Observable):
if not has_current[0]:
has_current[0] = true
var inner_subscription = SingleAssignmentDisposable.new()
g.add(inner_subscription)
var on_completed_inner = func():
g.remove(inner_subscription)
has_current[0] = false
if is_stopped[0] and g.length == 1:
observer.on_completed()
inner_subscription.disposable = inner_source.subscribe(
observer.on_next,
observer.on_error,
on_completed_inner,
scheduler
)
var on_completed = func():
is_stopped[0] = true
if not has_current[0] and g.length == 1:
observer.on_completed()
m.disposable = source.subscribe(
on_next, observer.on_error, on_completed,
scheduler
)
return g
return Observable.new(subscribe)
return exclusive