RxJs simplifies working with event streams. In Angular, we get notified of almost all events and changes by subscribing to RxJs Observable(s)
Ex (ActvatedRoute#params , HttpClient#get).
We seldom create our own Observable(s)
from an actual event source. Unless, you consider emitting known values, with of and from as we usually do in our tests.
import { Observable, of } from 'rxjs';
Observable<String> one = Observable.of('1');
Observable<String> oneTwoThree = Observable.from(['1','2', '3']);
RxJs provides us handy utility function from(...) : Observable<T>
to create Observable(s)
from well known event sources, (Ex. a native dom-event).
Here is an example of**_Observable.from_**_(input[click])_
, implementing a text-input search-box, which can notify us of changing text.
import {AfterViewInit, Component, ElementRef, EventEmitter, OnDestroy, OnInit, Output, ViewChild} from '@angular/core';
import {fromEvent, Observable, Subscription} from 'rxjs';
import {debounceTime, distinctUntilChanged, map, startWith, tap} from 'rxjs/operators';
@Component({
selector: 'search-box',
template: `
<div>
<input placeholder="search" #searchInput autocomplete="off"/>
</div>
`
})
export class SearchBoxComponent implements AfterViewInit, OnDestroy {
@Output('onSearch')
onSearch = new EventEmitter<string>();
@ViewChild('searchInput')
input: ElementRef;
private subscription: Subscription;
ngAfterViewInit(): void {
const terms$ = fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(
map(event => event.target.value),
startWith(''),
debounceTime(400),
distinctUntilChanged()
);
this.subscription = term$
.subscribe(
criterion => {
this.onSearch.emit(criterion);
}
);
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
Sometimes source of your event(s) is not well known, and likely RxJs wouldn’t have any stock functions to create Observable(s)
of it.
Sometimes source of your event(s) is not well known, and likely RxJs wouldn’t have any stock functions to create Observable(s)
of it.
Observable.create(function(observer) {
// create or listen to an event-source (ex promises)
// decide when to call observer.(next|error|complete)
})
For example, Let us try creating our own Observable
the works like Angular’s Http.get
using the browser’s native fetch
-api
import {Observable} from 'rxjs';
export function createHttp$(url:string) {
return Observable.create(observer => {
const controller = new AbortController();
const signal = controller.signal;
fetch(url, {signal})
.then(response => {
if (response.ok) {
return response.json();
}
else {
observer.error(`Failed HTTP : response.status`);
}
})
.then(body => {
observer.next(body);
observer.complete();
})
.catch(err => {
observer.error(err);
//observable which immediately errors out
});
return () => controller.abort()
// this return function? executed on unsubscribe
});
}
This is how we can use it
const http$ = createHttp$('/some/url');
http$.subscribe({
next: (value: any) => console.log(`next ${value}`),
complete: () => console.log(`complete`)};
);
Everything in the above code-snippet is straightforward besides the function returned by Observable.create(…)
which is the one **that’s called when you unsubscribe to the observable created **created by createHttp$(...)
.
Browser’s fetch(…)
API gives us way to cancel ongoing requests by sending an abort signal. Therefore we send the abort signal when the subscriber unsubscribes before the the created observable before it completes. This will cancel long running http-get-request to which no one now is subscribing.
const url = '/some/entity';
const http$ = this.activatedRoute.params.pipe(
switchMap( id => createHttp$(`${url}/${id}`))
).subscribe({
next: (value: any) => console.log(`next ${value}`),
complete: () => console.log(`complete`)};
);
In the example above, we listen to changing URL parameters emitted from Router’s
from activatedRoute.params
and issue corresponding http requests with newly emitted param value {id}
using Observables(s)
created by callingcreateHttp$(
${url}/${id}))
However since we are using switchMap(…)
to emit new create_Http$(…)_
Observales
, the previously emitted Observable
is first unsubscribed (cancelled) before a new one is created and emitted by switchMap(…)
.
Since Observable(s)
created from createHttp$(...)
implements and returns function for cancellation, if http request inside was still on going in the Observable
being unsubscribed, it will get aborted before new _createHttp$(...)_
Observable
is created and emitted by switchMap
with in which a new http request gets issued.