Skip to main content

RXJS

References

BehaviorSubjects

Requires an initial value and emits the current value to new subscribers.

https://www.learnrxjs.io/learn-rxjs/subjects/behaviorsubject

import { BehaviorSubject } from 'rxjs';

// Creates new BehaviorSubject with an initial value of 123.
// Subscribers to this subject will get the current value.
const subject = new BehaviorSubject(123);

// Subscribe to the subject to get it's value
subject.subscribe(console.log);

// Change the current value to 250. All current subscribers and new subscribers will receive this value.
subject.next(250);

// Complete the subject if needed
subject.complete();

Clean up Subscriptions

Subscription.unsubscribe()

import { OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';

// Set ...implements OnDestroy on your component's/view's class definition

// Define Subscription
apiCallSubscription: Subscription;

// Assign the Subscription
this.apiCallSubscription = this.someService.getSomeData(...).subscribe(...);

// Clean up the Subscription
ngOnDestroy() {
if(this.apiCallSubscription) {
this.apiCallSubscription.unsubscribe();
}
}

distinct()

Emits items emitted that are distinct based on any previously emitted item.

https://www.learnrxjs.io/learn-rxjs/operators/filtering/distinct

import { from } from 'rxjs';
import { distinct } from 'rxjs/operators';

const obj1 = { id: 3, name: 'name 1' };
const obj2 = { id: 4, name: 'name 2' };
const obj3 = { id: 3, name: 'name 3' };
const vals = \[obj1, obj2, obj3\];

from(vals)
.pipe(distinct(e => e.id))
.subscribe(console.log);

/\*
OUTPUT:
{id: 3, name: "name 1"}
{id: 4, name: "name 2"}
\*/

filter()

https://rxjs.dev/api/operators/filter

https://www.learnrxjs.io/learn-rxjs/operators/filtering/filter

import { filter } from 'rxjs/operators';

//source is an observable containing an array of {name, age} object items

const example = source.pipe(filter(person => person.age >= 30));

from()

Creates an Observable from an Array.

https://rxjs.dev/api/index/function/from

import { from } from 'rxjs';

const source = from(\[
{ name: 'Joe', age: 31 },
{ name: 'Bob', age: 25 }
\]);

forkJoin - Taking action after multiple observables complete

import { forkJoin } from 'rxjs';

forkJoin(
this.coreServiceWrp.getPhaListForSearch(),
this.coreServiceWrp.getFaListForSearch()
).subscribe(resp => {
this.filterSource = { phas: resp[0], fas: resp[1] };
this.initFilter();


// Set focus to the search input field
this.searchInput.nativeElement.focus();
});

take

import { take } from 'rxjs/operators';

.pipe(take(1)).subscribe(...)

takeUntil

import { OnDestroy } from '@angular/core';

import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';


/**
* @ignore
*/
destroy = new Subject();


.pipe(takeUntil(this.destroy)).subscribe(...)


/**
* Perform a little cleanup
*/
ngOnDestroy() {
// Clean up the subscriptions
this.destroy.next();
this.destroy.complete();
}