314 lines
8.7 KiB
GDScript3
Raw Normal View History

2024-12-27 21:00:07 +01:00
static func do_action_(
on_next = null, on_error = null, on_completed = null
) -> Callable:
var do_action = func(source : Observable) -> Observable:
# """Invokes an action for each element in the observable
# sequence and invokes an action on graceful or exceptional
# termination of the observable sequence. This method can be used
# for debugging, logging, etc. of query behavior by intercepting
# the message stream to run arbitrary actions for messages on the
# pipeline.
#
# Examples:
# >>> do_action(send).call(observable)
# >>> do_action(on_next, on_error).call(observable)
# >>> do_action(on_next, on_error, on_completed).call(observable)
#
# Args:
# on_next: [Optional] Action to invoke for each element in
# the observable sequence.
# on_error: [Optional] Action to invoke on exceptional
# termination of the observable sequence.
# on_completed: [Optional] Action to invoke on graceful
# termination of the observable sequence.
#
# Returns:
# An observable source sequence with the side-effecting
# behavior applied.
# """
var subscribe = func(
observer : ObserverBase,
scheduler : SchedulerBase = null
) -> DisposableBase:
var _on_next = func(x):
if on_next == null:
observer.on_next(x)
else:
GDRx.try(func():
on_next.call(x)
).catch("Error", func(e):
observer.on_error(e)
).end_try_catch()
observer.on_next(x)
var _on_error = func(error):
if on_error == null:
observer.on_error(error)
else:
GDRx.try(func():
on_error.call(error)
).catch("Error", func(e):
observer.on_error(e)
).end_try_catch()
observer.on_error(error)
var _on_completed = func():
if on_completed == null:
observer.on_completed()
else:
GDRx.try(func():
on_completed.call()
).catch("Error", func(e):
observer.on_error(e)
).end_try_catch()
observer.on_completed()
return source.subscribe(
_on_next, _on_error, _on_completed,
scheduler
)
return Observable.new(subscribe)
return do_action
static func do_(observer : ObserverBase) -> Callable:
# """Invokes an action for each element in the observable sequence and
# invokes an action on graceful or exceptional termination of the
# observable sequence. This method can be used for debugging, logging,
# etc. of query behavior by intercepting the message stream to run
# arbitrary actions for messages on the pipeline.
#
# >>> do(observer)
#
# Args:
# observer: Observer
#
# Returns:
# An operator function that takes the source observable and
# returns the source sequence with the side-effecting behavior
# applied.
# """
return do_action_(observer.on_next, observer.on_error, observer.on_completed)
static func do_after_next(source : Observable, after_next : Callable) -> Observable:
# """Invokes an action with each element after it has been emitted downstream.
# This can be helpful for debugging, logging, and other side effects.
#
# after_next -- Action to invoke on each element after it has been emitted
# """
var subscribe = func(
observer : ObserverBase, _scheduler : SchedulerBase = null
) -> DisposableBase:
var on_next = func(value):
GDRx.try(func():
observer.on_next(value)
after_next.call(value)
).catch("Error", func(e):
observer.on_error(e)
).end_try_catch()
return source.subscribe(on_next, observer.on_error, observer.on_completed)
return Observable.new(subscribe)
static func do_on_subscribe(source : Observable, on_subscribe : Callable) -> Observable:
# """Invokes an action on subscription.
#
# This can be helpful for debugging, logging, and other side effects
# on the start of an operation.
#
# Args:
# on_subscribe: Action to invoke on subscription
# """
var subscribe = func(
observer : ObserverBase, scheduler : SchedulerBase = null
) -> DisposableBase:
on_subscribe.call()
return source.subscribe(
observer.on_next,
observer.on_error,
observer.on_completed,
scheduler
)
return Observable.new(subscribe)
class OnDispose extends DisposableBase:
var on_dispose : Callable
func _init(on_dispose_ : Callable):
super._init()
self.on_dispose = on_dispose_
func dispose():
this.on_dispose.call()
static func do_on_dispose(source : Observable, on_dispose : Callable) -> Observable:
# """Invokes an action on disposal.
#
# This can be helpful for debugging, logging, and other side effects
# on the disposal of an operation.
#
# Args:
# on_dispose: Action to invoke on disposal
# """
var subscribe = func(
observer : ObserverBase, scheduler : SchedulerBase = null
) -> DisposableBase:
var composite_disposable = CompositeDisposable.new()
composite_disposable.add(OnDispose.new(on_dispose))
var subscription = source.subscribe(
observer.on_next,
observer.on_error,
observer.on_completed,
scheduler
)
composite_disposable.add(subscription)
return composite_disposable
return Observable.new(subscribe)
static func do_on_terminate(source : Observable, on_terminate : Callable) -> Observable:
# """Invokes an action on an on_complete() or on_error() event.
# This can be helpful for debugging, logging, and other side effects
# when completion or an error terminates an operation.
#
#
# on_terminate -- Action to invoke when on_complete or throw is called
# """
var subscribe = func(
observer : ObserverBase, scheduler : SchedulerBase = null
) -> DisposableBase:
var on_completed = func():
if not GDRx.try(func():
on_terminate.call()
).catch("Error", func(err):
observer.on_error(err)
).end_try_catch():
observer.on_completed()
var on_error = func(error):
if not GDRx.try(func():
on_terminate.call()
).catch("Error", func(err):
observer.on_error(err)
).end_try_catch():
observer.on_error(error)
return source.subscribe(
observer.on_next, on_error, on_completed,
scheduler
)
return Observable.new(subscribe)
static func do_after_terminate(source : Observable, after_terminate : Callable) -> Observable:
# """Invokes an action after an on_complete() or on_error() event.
# This can be helpful for debugging, logging, and other side effects
# when completion or an error terminates an operation
#
#
# on_terminate -- Action to invoke after on_complete or throw is called
# """
var subscribe = func(
observer : ObserverBase, scheduler : SchedulerBase = null
) -> DisposableBase:
var on_completed = func():
observer.on_completed()
GDRx.try(func():
after_terminate.call()
).catch("Error", func(err):
observer.on_error(err)
).end_try_catch()
var on_error = func(error):
observer.on_error(error)
GDRx.try(func():
after_terminate.call()
).catch("Error", func(err):
observer.on_error(err)
).end_try_catch()
return source.subscribe(
observer.on_next, on_error, on_completed,
scheduler
)
return Observable.new(subscribe)
class FinallyOnDispose extends DisposableBase:
var finally_action : Callable
var was_invoked : Array[bool]
func _init(finally_action_ : Callable, was_invoked_ : Array[bool]):
super._init()
self.finally_action = finally_action_
self.was_invoked = was_invoked_
func dispose():
if not this.was_invoked[0]:
finally_action.call()
this.was_invoked[0] = true
static func do_finally(finally_action : Callable) -> Callable:
# """Invokes an action after an on_complete(), on_error(), or disposal
# event occurs.
#
# This can be helpful for debugging, logging, and other side effects
# when completion, an error, or disposal terminates an operation.
#
# Note this operator will strive to execute the finally_action once,
# and prevent any redudant calls
#
# Args:
# finally_action -- Action to invoke after on_complete, on_error,
# or disposal is called
# """
var partial = func(source : Observable) -> Observable:
var subscribe = func(
observer : ObserverBase, scheduler : SchedulerBase = null
) -> DisposableBase:
var was_invoked : Array[bool] = [false]
var on_completed = func():
observer.on_completed()
GDRx.try(func():
if not was_invoked[0]:
finally_action.call()
was_invoked[0] = true
) \
.catch("Error", func(err):
observer.on_error(err)
) \
.end_try_catch()
var on_error = func(error):
observer.on_error(error)
GDRx.try(func():
if not was_invoked[0]:
finally_action.call()
was_invoked[0] = true
) \
.catch("Error", func(err):
observer.on_error(err)
) \
.end_try_catch()
var composite_disposable = CompositeDisposable.new()
composite_disposable.add(FinallyOnDispose.new(finally_action, was_invoked))
var subscription = source.subscribe(
observer.on_next, on_error, on_completed,
scheduler
)
composite_disposable.add(subscription)
return composite_disposable
return Observable.new(subscribe)
return partial