-
-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathcombineLatestHigherOrderObject.ts
104 lines (100 loc) · 3.19 KB
/
combineLatestHigherOrderObject.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/
/*tslint:disable:rxjs-no-nested-subscribe*/
import { Observable, Observer, OperatorFunction, Subscription } from "rxjs";
interface Source<T> {
completed: boolean;
key: string;
nexted: boolean;
observable: Observable<T>;
subscription?: Subscription;
value?: T;
}
function combine<T>(
sources: Source<T>[],
observer: Observer<Record<string, T>>
): void {
if (sources.every(({ nexted }) => nexted)) {
observer.next(
sources.reduce((acc, { key, value }) => ({ ...acc, [key]: value }), {})
);
}
}
export function combineLatestHigherOrderObject<T>(): OperatorFunction<
Record<string, Observable<T>>,
Record<string, T>
> {
return (higherOrder) =>
new Observable<Record<string, T>>((observer) => {
let lasts: Source<T>[] = [];
let nexts: Source<T>[] = [];
let higherOrderCompleted = false;
const higherOrderSubscription = new Subscription();
higherOrderSubscription.add(
higherOrder.subscribe(
(observables) => {
const subscribes: (() => void)[] = [];
nexts = Object.keys(observables).map((key) => {
const observable = observables[key];
const index = lasts.findIndex(
(last) => last.observable === observable && last.key === key
);
if (index !== -1) {
const next = lasts[index];
lasts.splice(index, 1);
return next;
}
const next: Source<T> = {
completed: false,
key,
nexted: false,
observable,
};
subscribes.push(() => {
if (higherOrderSubscription.closed) {
return;
}
next.subscription = next.observable.subscribe(
(value) => {
next.nexted = true;
next.value = value;
combine(nexts, observer);
},
(error) => observer.error(error),
() => {
next.completed = true;
if (
higherOrderCompleted &&
nexts.every(({ completed }) => completed)
) {
observer.complete();
}
}
);
higherOrderSubscription.add(next.subscription);
});
return next;
});
lasts.forEach(({ subscription }) => {
if (subscription) {
subscription.unsubscribe();
}
});
lasts = nexts;
combine(nexts, observer);
subscribes.forEach((subscribe) => subscribe());
},
(error) => observer.error(error),
() => {
if (lasts.every(({ completed }) => completed)) {
observer.complete();
}
higherOrderCompleted = true;
}
)
);
return higherOrderSubscription;
});
}