EvilCocoGame1/addons/reactivex/observer/scheduledobserver.gd
2025-01-11 00:25:52 +01:00

77 lines
1.6 KiB
GDScript

extends Observer
class_name ScheduledObserver
var scheduler : SchedulerBase
var observer : ObserverBase
var lock : RLock
var is_acquired : bool
var has_faulted : bool
var queue : Array[Callable]
var disposable : SerialDisposable
func _init(scheduler_ : SchedulerBase, observer_ : ObserverBase):
super._init()
self.scheduler = scheduler_
self.observer = observer_
self.lock = RLock.new()
self.is_acquired = false
self.has_faulted = false
self.queue = []
self.disposable = SerialDisposable.new()
func _on_next_core(i):
var action : Callable = func():
self.observer.on_next(i)
self.queue.append(action)
func _on_error_core(e):
var action : Callable = func():
self.observer.on_error(e)
self.queue.append(action)
func _on_completed_core():
var action : Callable = func():
self.observer.on_completed()
self.queue.append(action)
func ensure_active():
var is_owner = false
if true:
var __ = LockGuard.new(self.lock)
if not self.has_faulted and not self.queue.is_empty():
is_owner = not self.is_acquired
self.is_acquired = true
if is_owner:
self.disposable.disposable = self.scheduler.schedule(self.run)
func run(_scheduler : SchedulerBase, _state = null):
var parent = self
var work
if true:
var __ = LockGuard.new(self.lock)
if not parent.queue.is_empty():
work = parent.queue.pop_at(0)
else:
parent.is_acquired = false
return
GDRx.try(work) \
.catch("Error", func(e):
if true:
var __ = LockGuard.new(self.lock)
parent.queue = []
parent.has_faulted = true
GDRx.raise(e)) \
.end_try_catch()
self.scheduler.schedule(self.run)
func dispose():
super.dispose()
this.disposable.dispose()