Simple Example for RXJS Flattening Operators

Angular&NodeEnthusiast
6 min readMar 21, 2020

--

Flattening operators come to our rescue when we have a nested subscription i.e subscribing to an observable within another subscription.

This can be pretty annoying to track and debug. Its similar to “Callback hell” scenario where we have nested callbacks.

Lets understand the problem first with an example:

const source=of(“Pete”,”Mike”);

I have created an Outer Observable named source emitting 2 string values.

innerObsv(name){
return of(”My name is”+name);
}
const obsv=source.map(x=>this.innerObsv(x));

Next I have mapped each value emitted by the source observable to an inner observable via the innerObsv(). This function returns the inner observable.

So now we have 2 observables to take care of, to extract the data.

obsv.subscribe(data=>{data.subscribe(innerdata=>{console.log(innerdata);},innererr=>{console.log(innererr);},()=>{console.log(“inner observable completed”);})},err=>{console.log(err);},()=>{console.log(“outer observable completed”);})

Result:

My name is Pete
inner observable completed
My name is Mike
inner observable completed
outer observable completed

I have first subscribed to the outer observable. The result will be the inner observable returned by innerObsv(). Now again I need to subscribe to this inner observable to fetch the results.

Thus we can understand that the problem of nested subscriptions arises when we have an observable returned within an observable.

The solution to this problem are the 4 flattening operators.

  1. MergeMap
  2. ConcatMap
  3. ExhaustMap
  4. SwitchMap

These operators are slightly different from each other but the basic concept of these operators is the same:

=>Map the value emitted by the outer observable to an inner observable.

=>Flatten the inner observable i.e subscribe to the inner observable.

So all I need to take care of is subscription to the outer observable. The rest of the work is taken care by these operators.

Now lets see how these operators differ from each other and where each can be used.

  1. Switch Map

Lets say an outer observable emits a value. This value is mapped to an inner observable via a function.

The inner observable is subscribed to by switchMap operator.

If the outer observable emits a new value before the currently subscribed inner observable completes, then the switchMap operator unsubscribes from current inner observable and subscribes to the inner observable mapped from the new value.

This kind of behavior is useful in search functionality.

<input type=”text” [(ngModel)]=”somedata” name=”somedata” id=”random” maxlength="1">

Above is a text box in which a user enters an id to get user data corresponding to the id from the server. Our aim is to check how switchMap improves the efficiency of this search operation.

Component:

fromEvent(document.getElementById(‘random’),’keyup’).pipe(debounceTime(3000),switchMap((x:any)=>this.serv.getSpecificUser(x.target.value))).subscribe(data=>{console.log(data);})

Service:

getSpecificUser(id:number):Observable<User>{return this.http.get<User>(‘https://jsonplaceholder.typicode.com/users/'+id)
.pipe(
map(res=>res),
catchError(err=>throwError(err))
)}

When the user enters some number in the text box, every time there is a keyup event, an outer observable is created via the fromEvent operator.

The value emitted by this outer observable is an object. The object’s target.value property is the id entered by the user in the text box.

switchMap is mapping this id to an inner observable via the getSpecificUser() in the service.

The switchMap also subscribes to this inner observable and then we are able to log the user data corresponding to the id entered .

debounceTime operator delays the emission of the id values from the outer observable. In this example it delays it by 3 seconds.

Every 3 secs a check is performed and the id entered by the user after those 3 secs is mapped to an inner observable.

Any id entered by the user in those 3 secs is discarded.

How is the switchMap helping here?

The switchMap unsubscribes from the currently subscribed incomplete inner observable when the outer observable emits a new id entered by the user.

This ensures that previous search id is discarded and only the most recent search id is sent to the server.

2. Concat Map

Concat Map is used to perform sequential operations.

Consider an example of saving form automatically each time it is edited. We want each edit to be done one at a time.

Unlike switchMap,concatMap wont subscribe to a new inner observable untill the previous inner observable is completed. It will subscribe to an inner observable only after the previous one is completed.

To understand how this operator works we need to check what is observable concatenation.

const source1 = of("abc, "pqr);const source2 = of("mno", "xyz);const result = concat(source1, source1);result.subscribe(data=>{console.log(data);})

There are 2 observables: source1 and source2 which are passed to the concat().

concat will first subscribe to source1. The 2 values abc and pqr will be emitted.

When source1 completes, concat will subscribe to source2 and the values mno and xyz will be emitted.

When source2 completes, then the result observable also completes.

Similarly consider the form below:

<form [formGroup]=”UserPosts”><input type=”text” formControlName=”userId” name=”userId”><input type=”text” formControlName=”id” name=”id”><textarea width=”500px” height=”500px” formControlName=”title” name=”title” placeholder=”Enter title”></textarea><textarea width=”500px” height=”500px” formControlName=”body” name=”body” placeholder=”Enter body”></textarea></form>

Every time the data in the form changes, I want to submit the updated data to the server one at a time.

Component:

this.UserPosts.valueChanges.pipe(debounceTime(3000),concatMap(form=>this.serv.submitPost(form))).subscribe(data=>{console.log(data);})

Service:

submitPost(post:Post):Observable<Post>{return this.http.post<Post>(‘https://jsonplaceholder.typicode.com/posts',post).pipe(map(res=>res),catchError(err=>throwError(err)))}

If any field in the form changes,a valueChanges event is triggered. It returns an outer observable which emits an object containing the latest form field values.

debounceTime operator allows a new form object to be emitted only every 3 secs in this example.

The concatMap operator maps the updated form object emitted by the outer observable to an inner observable via the submitPost().

The inner observable is then subscribed to by the concatMap operator,completing the save operation.

If a 2nd form object is emitted by the outer observable before 1st inner observable completes, then the 2nd form object wont be immediately mapped to an inner observable. This mapping will wait untill the 1st inner observable completes.

We are ensuring that an HTTP save is not made before the previous ongoing save completes i.e an inner observable needs to complete before the next inner observable is mapped and subscribed to.

3. Exhaust Map:

Consider the same example of form save but I need to click a button to save a form to the server.

If I click 10 times, 10 saves requests will be sent at a time which I would like to avoid.

ExhaustMap is useful in such scenarios.

When an exhaust map subscribes to an inner observable and the inner observable has still not completed,then it discards any other values emitted by the outer observable in that duration.

<form [formGroup]=”UserPosts”><input type=”text” formControlName=”userId” name=”userId”><input type=”text” formControlName=”id” name=”id”><textarea width=”500px” height=”500px” formControlName=”title” name=”title” placeholder=”Enter title”></textarea><textarea width=”500px” height=”500px” formControlName=”body” name=”body” placeholder=”Enter body”></textarea><button id="but">Save form</button></form>

Component:

fromEvent(document.getElementById(‘but’),’click’).pipe(exhaustMap(form=>{return this.serv.submitPost(this.UserPosts.value)})).subscribe(data=>{console.log(data);})

Service:

submitPost(post:Post):Observable<Post>{return this.http.post<Post>(‘https://jsonplaceholder.typicode.com/posts',post).pipe(map(res=>res),catchError(err=>throwError(err)))}

When I click on the button,an outer observable is created via the fromEvent operator. In this scenario we are not mapping any value emitted by this observable.

Instead we are mapping the form object to an inner observable via the submitPost() function.

Before this inner observable completes, if the user clicks the button a few more times, then these requests wont be considered and will be discarded.

Any form save requests made post the completion of the currently subscribed inner observable will be considered for mapping and subscription.

4.Merge Map:

Merge Map is used for parallel operations, where we dont wait for the previous inner observable to complete.

Each value of the outer Observable is mapped into an inner Observable.

The inner Observable is subscribed to by mergeMap.

As the inner Observables emit new values, they are immediately reflected in the output.

In the case of mergeMap we don’t have to wait for the previous inner Observable to complete before triggering/subscribing to the next inner Observable

We can have multiple inner Observables overlapping over time, emitting values in parallel.

Please let me know your comments:)

--

--

Angular&NodeEnthusiast
Angular&NodeEnthusiast

Written by Angular&NodeEnthusiast

Loves Angular and Node. I wish to target issues that front end developers struggle with the most.

No responses yet