162 lines
5.2 KiB
GDScript3
Raw Permalink Normal View History

2025-01-11 00:25:52 +01:00
static func group_by_until_(
key_mapper : Callable,
duration_mapper : Callable,
element_mapper = null,
subject_mapper = null
) -> Callable:
# """Groups the elements of an observable sequence according to a
# specified key mapper function. A duration mapper function is used
# to control the lifetime of groups. When a group expires, it receives
# an OnCompleted notification. When a new element with the same key
# value as a reclaimed group occurs, the group will be reborn with a
# new lifetime request.
#
# Examples:
# >>> GDRx.op.group_by_until(func(x): return x.id, null, func() : return GDRx.obs.never())
# >>> GDRx.op.group_by_until(
# func(x): return x.id, func(x): return x.name, func(grp): return GDRx.obs.never()
# )
# >>> GDRx.op.group_by_until(
# func(x): return x.id,
# func(x): return x.name,
# func(grp): return GDRx.obs.never(),
# func(): return ReplaySubject.new()
# )
#
# Args:
# key_mapper: A function to extract the key for each element.
# duration_mapper: A function to signal the expiration of a group.
# subject_mapper: A function that returns a subject used to initiate
# a grouped observable. Default mapper returns a Subject object.
#
# Returns: a sequence of observable groups, each of which corresponds to
# a unique key value, containing all elements that share that same key
# value. If a group's lifetime expires, a new group with the same key
# value can be created once an element with such a key value is
# encountered.
# """
var element_mapper_ : Callable = element_mapper if element_mapper != null else GDRx.basic.identity
var default_subject_mapper = func(): return Subject.new()
@warning_ignore("incompatible_ternary")
var subject_mapper_ : Callable = subject_mapper if subject_mapper != null else default_subject_mapper
var group_by_until = func(source : Observable) -> Observable:
var subscribe = func(
observer : ObserverBase,
scheduler : SchedulerBase = null
) -> DisposableBase:
var writers : Dictionary = {} # TO DO OrderedDict
var group_disposable = CompositeDisposable.new()
var ref_count_disposable = RefCountDisposable.new(group_disposable)
var on_next = func(x):
var writer = RefValue.Null()
var key = RefValue.Null()
if GDRx.try(func():
key.v = key_mapper.call(x)
) \
.catch("Error", func(__):
for wrt in writers.values():
wrt.on_error(key.v)
observer.on_error(key.v)
) \
.end_try_catch(): return
var fire_new_map_entry = false
writer.v = writers.get(key.v)
if writer.v == null:
if GDRx.try(func():
writer.v = subject_mapper_.call()
) \
.catch("Error", func(e):
for wrt in writers.values():
wrt.on_error(e)
observer.on_error(e)
) \
.end_try_catch(): return
writers[key.v] = writer.v
fire_new_map_entry = true
if fire_new_map_entry:
var group : GroupedObservable = GroupedObservable.new(
key.v, writer.v.as_observable(), ref_count_disposable
)
var duration_group : GroupedObservable = GroupedObservable.new(
key.v, writer.v.as_observable()
)
var duration = RefValue.Null()
if GDRx.try(func():
duration.v = duration_mapper.call(duration_group)
) \
.catch("Error", func(e):
for wrt in writers.values():
wrt.on_error(e)
observer.on_error(e)
) \
.end_try_catch(): return
observer.on_next(group)
var sad = SingleAssignmentDisposable.new()
group_disposable.add(sad)
var expire = func():
if writers.get(key.v) != null:
writers.erase(key.v)
writer.v.on_completed()
group_disposable.remove(sad)
var on_next = func(__):
pass
var on_error = func(exn):
for wrt in writers.values():
wrt.on_error(exn)
observer.on_error(exn)
var on_completed = func():
expire.call()
sad.disposable = duration.v.pipe1(
GDRx.op.take(1)
).subscribe(
on_next, on_error, on_completed, scheduler
)
var element = RefValue.Null()
if GDRx.try(func():
element.v = element_mapper_.call(x)
) \
.catch("Error", func(e):
for wrt in writers.values():
wrt.on_error(e)
observer.on_error(e)
) \
.end_try_catch(): return
writer.v.on_next(element.v)
var on_error = func(ex):
for wrt in writers.values():
wrt.on_error(ex)
observer.on_error(ex)
var on_completed = func():
for wrt in writers.values():
wrt.on_completed()
observer.on_completed()
group_disposable.add(source.subscribe(
on_next, on_error, on_completed, scheduler
))
return ref_count_disposable
return Observable.new(subscribe)
return group_by_until