IObservablePipe
Types
interface IObservablePipe<GIn, GOut> {
(subscribe: IObservable<GIn>): IObservable<GOut>;
}
Definition
An ObservablePipe receives a value, performs some operation on it, and may emit something else.
It is similar to an ObserverPipe but works with Observables instead.
Somehow, this is both a lazy push destination (returned Observable), and a lazy push source (received Observable)
This is equivalent to the Pipeable Operators (only the pipeable ones).
info
A best practice consist of post-fixing your ObservablePipes with $$
.
It's a convenient way to distinguish them from the other variables.
Example: const oddFilter$$ = filter$$$(x => x % 2 === 1)
Example
ObservablePipe that re-emits only distinct received values
const distinct = <GValue>(subscribe: IObservable<GValue>): IObservable<GValue> => {
return (emit: IObserver<GValue>): IUnsubscribe => {
let previousValue: GValue;
return subscribe((value: GValue): void => {
if (value !== previousValue) {
previousValue = value;
emit(value);
}
});
};
}
const subscribe: IObservable<number> = distinct(of(1, 2, 2, 3));
const unsubscribe: IUnsubscribe = subscribe((value: string) => {
console.log('value:', value);
});
Output:
value: 1
value: 2
value: 3
RxJS equivalent
of(1, 2, 2, 3)
.pipe(
distinct(),
)
.subscribe(() => {
console.log('value:', value);
});