55 lines
1.5 KiB
GDScript
55 lines
1.5 KiB
GDScript
static func exclusive_() -> Callable:
|
|
# """Performs a exclusive waiting for the first to finish before
|
|
# subscribing to another observable. Observables that come in between
|
|
# subscriptions will be dropped on the floor.
|
|
#
|
|
# Returns:
|
|
# An exclusive observable with only the results that
|
|
# happen when subscribed.
|
|
# """
|
|
var exclusive = func(source : Observable) -> Observable:
|
|
var subscribe = func(
|
|
observer : ObserverBase, scheduler : SchedulerBase = null
|
|
) -> DisposableBase:
|
|
var has_current = [false]
|
|
var is_stopped = [false]
|
|
var m = SingleAssignmentDisposable.new()
|
|
var g = CompositeDisposable.new()
|
|
|
|
g.add(m)
|
|
|
|
var on_next = func(inner_source : Observable):
|
|
if not has_current[0]:
|
|
has_current[0] = true
|
|
|
|
var inner_subscription = SingleAssignmentDisposable.new()
|
|
g.add(inner_subscription)
|
|
|
|
var on_completed_inner = func():
|
|
g.remove(inner_subscription)
|
|
has_current[0] = false
|
|
if is_stopped[0] and g.length == 1:
|
|
observer.on_completed()
|
|
|
|
inner_subscription.disposable = inner_source.subscribe(
|
|
observer.on_next,
|
|
observer.on_error,
|
|
on_completed_inner,
|
|
scheduler
|
|
)
|
|
|
|
var on_completed = func():
|
|
is_stopped[0] = true
|
|
if not has_current[0] and g.length == 1:
|
|
observer.on_completed()
|
|
|
|
m.disposable = source.subscribe(
|
|
on_next, observer.on_error, on_completed,
|
|
scheduler
|
|
)
|
|
return g
|
|
|
|
return Observable.new(subscribe)
|
|
|
|
return exclusive
|