Notification
As you may have noticed so far, the Observables don't have any state like complete
or error
.
They are just raw streams of data.
However, some streams have a final state, and we need a way to represent them with Observables.
For example, an HTTP request finishes with a success state (and an associated Response's Body) or an error state (with an associated error) if it failed.
@lirx/core
uses Notifications as a state replacement for Observables.
Definition of a Notification
interface INotification<GName extends string, GValue> {
readonly name: GName;
readonly value: GValue;
}
A Notification is simply a tuple composed of a name
and a value
.
You may create one using a plain object or prefer the function createNotification.
The three essentials Notifications are:
- INextNotification: to emit a value.
- ICompleteNotification: to notify of a 'complete' state.
- IErrorNotification: to notify of an 'error' state.
These notifications will be passed as values for our Observables.
An example, is worth a thousand words.
Example
Let's create an Observable based on a Promise:
- If the Promise fulfills, we will send a
next
Notification containing the Promise's result, followed by acomplete
one. - If the Promise rejects, we will send an
error
Notification containing the Promise's error.
We will start by defining the type of value that our Observable will emit:
type IObservableFromPromiseNotifications<GValue> =
| INextNotification<GValue>
| ICompleteNotification
| IErrorNotification
;
It's just a union of different kind of notifications.
Then, we will handle the Promise's result and emit the corresponding Notifications:
promise.then(
(value: GValue) => {
emit(createNextNotification<GValue>(value));
emit(createCompleteNotification());
},
(error: any) => {
emit(createErrorNotification<any>(error));
},
);
Let's wrap everything in a function:
function fromPromise<GValue>(
promise: Promise<GValue>,
): IObservable<IObservableFromPromiseNotifications<GValue>> {
type GNotificationsUnion = IObservableFromPromiseNotifications<GValue>;
return (emit: IObserver<GNotificationsUnion>): IUnsubscribe => {
let running: boolean = true;
promise.then(
(value: GValue) => {
if (running) {
emit(createNextNotification<GValue>(value));
}
if (running) {
emit(createCompleteNotification());
}
},
(error: any) => {
if (running) {
emit(createErrorNotification<any>(error));
}
},
);
return (): void => {
running = false;
};
};
}
And voilà ! We've created an Observable from a Promise 🎊.
You may import and use immediately fromPromise and fromPromiseFactory
from @lirx/core
, instead of re-implementing it.
Now we have to consume our Observable:
const subscribe = fromPromise(Promise.resolve(5));
subscribe((notification: IObservableFromPromiseNotifications<number>) => {
switch (notification.name) {
case 'next':
console.log('next', notification.value);
break;
case 'complete':
console.log('resolved');
break;
case 'error':
console.log('rejected', notification.value);
break;
}
});
We simply switch on the incoming Notification's name.
We may also use the function defaultNotificationObserver if we prefer a shorter syntax:
subscribe(
defaultNotificationObserver(
/* next */ (value: number) => {
console.log('next', value);
},
/* complete */ () => {
console.log('resolved');
},
/* error */ (error: any) => {
console.log('rejected', error);
},
),
);
Observables based on Notifications
There are plenty of Observables and ObservablePipes based on Notifications on the reference page.
For example, we could create an Observable from an HTTP request, and then display the result:
const request$ = pipe$$(fromFetch(`https://www.w3.org/TR/PNG/iso_8859-1.txt`), [
fulfilled$$$((response: Response): IObservable<IFromPromiseObservableNotifications<string>> => {
if (response.ok) {
return fromPromise(response.text());
} else {
return throwError(createNetworkError());
}
}),
]);
toPromiseLast(request$)
.then((text: string) => {
console.log(text);
});
😰 Yes, in this case, using Promises is easier and shorter.
However, with Observables we may create far more powerful data flow - like streaming the response body:
const request$ = pipe$$(fromFetch(`https://www.w3.org/TR/PNG/iso_8859-1.txt`), [
fulfilled$$$((response: Response): IObservable<IFromPromiseObservableNotifications<string>> => {
if (response.ok) {
return fromReadableStream<Uint8Array>(response.body);
} else {
return throwError(createNetworkErrorFromResponse(response));
}
}),
]);
request$(
defaultNotificationObserver(
/* next */ (chunk: Uint8Array) => {
console.log('chunk', chunk);
},
/* complete */ () => {
console.log('done !');
},
/* error */ (error: any) => {
console.log('something wrong append', error);
},
),
);
It's something that can't be done with Promises, but can be done very easily with Observables.
Observables are extremely powerful. When you'll master them, we guaranty you'll do some magic.
Comparison with RxJS
Why @lirx/core's Observables are not based on 'next', 'complete' and 'error' events likes RxJS ?
This choice is due to a simple observation: most Observables don't have or need a final state.
They just send raw values, without ever finishing until unsubscribed (ex: fromEventTarget
, interval
, etc.).
So complete
or error
states are unnecessary in the majority of use cases.
However, the RxJS's Observables are build exclusively on these three "events". It results in tortuous interactions of the data in the pipes, elaborated state management, and complex algorithms. All of these impacts negatively RxJS: it means more lines of code in the library, resulting in a bigger bundle size and decreased performances.
Finally, the Observables should not be restricted to these 3 "events": we may easily imagine an XHR
Observable emitting upload-progress
and download-progress
events. Something impossible with RxJS.
These use cases are perfectly handled with @lirx/core
, with even faster performances.
Thanks to the Notifications' approach.