2024-12-27 21:00:07 +01:00

172 lines
4.9 KiB
GDScript

static func group_join_(
right : Observable,
left_duration_mapper : Callable,
right_duration_mapper : Callable
) -> Callable:
# """Correlates the elements of two sequences based on overlapping
# durations, and groups the results.
#
# Args:
# right: The right observable sequence to join elements for.
# left_duration_mapper: A function to select the duration (expressed
# as an observable sequence) of each element of the left observable
# sequence, used to determine overlap.
# right_duration_mapper: A function to select the duration (expressed
# as an observable sequence) of each element of the right observable
# sequence, used to determine overlap.
#
# Returns:
# An observable sequence that contains elements combined into a tuple
# from source elements that have an overlapping duration.
# """
var nothing = func(__):
return null
var group_join = func(left : Observable) -> Observable:
var subscribe = func(
observer : ObserverBase,
scheduler : SchedulerBase = null
) -> DisposableBase:
var group = CompositeDisposable.new()
var rcd = RefCountDisposable.new(group)
var left_map : Dictionary = {} # TO DO OrderedDict()
var right_map : Dictionary = {}
var left_id = [0]
var right_id = [0]
var on_next_left = func(value):
var subject : Subject = Subject.new()
var _id
if true:
var __ = LockGuard.new(left.lock)
_id = left_id[0]
left_id[0] += 1
left_map[_id] = subject
var result = RefValue.Null()
if GDRx.try(func():
result.v = Tuple.new([value, GDRx.util.add_ref(subject.as_observable(), rcd)])
) \
.catch("Error", func(e):
push_error("*** Error: ", e)
for left_value in left_map.values():
left_value.on_error(e)
observer.on_error(e)
) \
.end_try_catch(): return
observer.on_next(result.v)
for right_value in right_map.values():
subject.on_next(right_value)
var md = SingleAssignmentDisposable.new()
group.add(md)
var expire = func():
if left_map.has(_id):
left_map.erase(_id)
subject.on_completed()
group.remove(md)
var duration = RefValue.Null()
if GDRx.try(func():
duration.v = left_duration_mapper.call(value)
) \
.catch("Error", func(e):
for left_value in left_map.values():
left_value.on_error(e)
observer.on_error(e)
) \
.end_try_catch(): return
var on_error = func(error):
for left_value in left_map.values():
left_value.on_error(error)
observer.on_error(error)
md.disposable = duration.v.pipe1(GDRx.op.take(1)).subscribe(
nothing, on_error, expire, scheduler
)
var on_error_left = func(error):
for left_value in left_map.values():
left_value.on_error(error)
observer.on_error(error)
group.add(
left.subscribe(
on_next_left,
on_error_left,
observer.on_completed,
scheduler
)
)
var send_right = func(value):
var _id
if true:
var __ = LockGuard.new(left.lock)
_id = right_id[0]
right_id[0] += 1
right_map[_id] = value
var md = SingleAssignmentDisposable.new()
group.add(md)
var expire = func():
right_map.erase(_id)
group.remove(md)
var duration = RefValue.Null()
if GDRx.try(func():
duration.v = right_duration_mapper.call(value)
) \
.catch("Error", func(e):
for left_value in left_map.values():
left_value.on_error(e)
observer.on_error(e)
) \
.end_try_catch(): return
var on_error = func(error):
var _guard = LockGuard.new(left.lock)
for left_value in left_map.values():
left_value.on_error(error)
observer.on_error(error)
md.disposable = duration.v.pipe1(GDRx.obs.take(1)).subscribe(
nothing, on_error, expire,
scheduler
)
if true:
var __ = LockGuard.new(left.lock)
for left_value in left_map.values():
left_value.on_next(value)
var on_error_right = func(error):
for left_value in left_map.values():
left_value.as_observer().on_error(error)
observer.on_error(error)
group.add(right.subscribe(
send_right, on_error_right, func():return, scheduler
))
return rcd
return Observable.new(subscribe)
return group_join