Asynchronous JavaScript: Using RxJS Observables with REST APIs in Node.js

January 27, 2020
Written by
Maciej Treder
Contributor
Opinions expressed by Twilio contributors are their own

Async JS RxJS Observables Node js

ReactiveX is an electrifying programming concept. It’s widely adopted in popular programming languages, including JavaScript. ReactiveX programs can react to data as it is emitted from a source, rather than get the data from it. This is a convenient way of handling data from sources like web APIs or WebSockets.

In this post you’ll get hands-on experience doing ReactiveX programming with RxJS: ReactiveX for JavaScript. You’ll learn how to perform REST API calls to retrieve data asynchronously, manipulate it as it arrives, and perform subsequent calls based on the emitted data. You’ll also see how to perform other actions whenever data is emitted by an Observable.

This post focuses on how to utilize RxJS Observables with REST API calls. The previous post in this series on Asynchronous JavaScript explains RxJS fundamentals:

Understanding the case study project

In this post you will see how to perform a sequence of REST API calls using the Rx-HTTP library. The task which you are going to accomplish is to answer the question “What’s the best movie by Quentin Tarantino?” based on data available in a mock-up REST API.

You will use the following RxJS operators:

  • map – to manipulate responses from the API
  • flatMap – to create new Observable basing on the data emitted by another Observable
  • combineLatest – to combine together multiple Observables into one Observable
  • tap – to perform side actions whenever an Observable emits new data

Prerequisites

To accomplish the tasks in this post you will need the following:

You should also have a working knowledge of the core elements of JavaScript and asynchronous JavaScript mechanics.

There is a companion repository for this post available on GitHub.

Understanding the case study project

The case study project for this post determines the “best” movie by Quentin Tarantino based on review scores. To accomplish this, the code will retrieve data from REST API mockups on GitHub.io. It achieves the same goal with RxJS objects as the case study used in previous posts in this series did with JavaScript Promises and callbacks. You don’t need to have read the prior posts to understand the case study, but if you’d like to learn more about working with Promises or callbacks these posts will be helpful:

You can simulate the case study process manually by performing GET calls with your browser. Give the following steps a try to familiarize yourself with the APIs and the data:

  1. Go to https://maciejtreder.github.io/asynchronous-javascript/directors/  to see a JSON object containing a list of directors. Quentin Tarantino has id 2.
  2. Go to https://maciejtreder.github.io/asynchronous-javascript/directors/2/movies/ to see the list of Tarantino’s movies.
  3. See the reviews for the first Tarantino movie by going to https://maciejtreder.github.io/asynchronous-javascript/movies/5/reviews and calculate average score. (Because GitHub Pages are static, it isn’t possible to pass the movie ID value as a query parameter, rather than as part of the URI, so the API isn’t fully RESTful in this respect.)
  4. Repeat step 3 for each of Quentin Tarantino’s movies.
  5. Determine which movie is the best by finding the highest average score.

Setting up the project

If you have completed the project from the previous post in this series, Asynchronous JavaScript: Introducing ReactiveX and RxJS Observables, you can continue with that code.

If you are familiar with the basics of Observables, or want to start fresh, you can get the code from GitHub. Clone the project by executing the following command-line instructions in the directory where you would like to create the project root directory:

git clone https://github.com/maciejtreder/asynchronous-javascript.git
cd asynchronous-javascript
git checkout step12
npm install
cd rxjs

Retrieving REST API data using rx-http-request and Observables

To perform REST API calls, you are going to use the @akanass/rx-http-request library available in the npm repository. It is an RxJS compliant implementation of the popular HTTP client. The @akanass/rx-http-request request API performs calls and returns Observables, basic RxJS objects that represent the HTTP response.

The API performs all the common HTTP methods: GET, POST, PUT, PATCH, DELETE, HEAD, and OPTIONS. The library also provides a mechanism to work with HTTP cookies.

In your project, install the @akanass/rx-http-request by running the following command:

npm install @akanass/rx-http-request

In the rxjs directory, create a file called rx-http-request.js and insert the following JavaScript code:

import { RxHR } from '@akanass/rx-http-request';
import { map } from 'rxjs/operators';

const BASE_PATH = `https://maciejtreder.github.io/asynchronous-javascript`;

const directors$ = RxHR.get(`${BASE_PATH}/directors/`, {json: true});

directors$.subscribe(console.log);

Run the program:

node -r esm rx-http-request.js

You should see the following command line output. Ellipsis (“...”) in code represents a section redacted for brevity.

{ 
   response:
   ...
     httpVersion: '1.1',
     complete: true,
     headers:
      { server: 'GitHub.com',
        'content-type': 'application/json; charset=utf-8',
        'last-modified': 'Tue, 25 Jun 2019 10:44:43 GMT',
        etag: '"5d11fb1b-ff"',
        'access-control-allow-origin': '*',
        expires: 'Sun, 05 Jan 2020 13:46:34 GMT',
        'cache-control': 'max-age=600',
        'x-proxy-cache': 'MISS',
        'x-github-request-id': '75DE:01F6:81A222:AAC1F8:5E11E662',
        'content-length': '255',
        'accept-ranges': 'bytes',
        date: 'Sun, 05 Jan 2020 13:38:10 GMT',
        via: '1.1 varnish',
        age: '96',
        connection: 'close',
        'x-served-by': 'cache-hhn4074-HHN',
        'x-cache': 'HIT',
        'x-cache-hits': '1',
        'x-timer': 'S1578231490.317462,VS0,VE1',
        vary: 'Accept-Encoding',
        'x-fastly-request-id': '8276f84d66b4508e5b9754c48980095213a45479' },
...
          statusCode: 200,
     statusMessage: 'OK',
...
     request:
      Request {
        port: 443,
        host: 'maciejtreder.github.io',
        path: '/asynchronous-javascript/directors/'
},
     body: [ [Object], [Object], [Object], [Object] ] },
  body: {
   [ { id: 1, name: 'James Cameron' },
     { id: 2, name: 'Quentin Tarantino' },
     { id: 3, name: 'Wes Anderson' },
     { id: 4, name: 'Stanley Kubrick' } ] }

You have just performed your first HTTP call with the rx-http-request library and Observables!

As you can see, there is a lot of information in the JSON object that represents the HTTP response: the header data, HTTP status code, and the response body. For the purpose of this tutorial you just need the response body, which you can easily retrieve from the Observable by using the pipe method together with the map operator.

The pipe method enables you to specify a list of operators that will act on the data emitted by the Observable. You need to use the pipe method even if you only apply one operator. The map operator is used to modify the object emitted by an Observable before passing that object to subscribers.

Replace the const directors$ declaration in the rx-http-request.js:

const directors$ = RxHR.get(`${BASE_PATH}/directors/`, {json: true}).pipe(map(response => response.body));

Run the program. You should see the following command-line output:

 { id: 1

You’ve just completed the first step from the case study to-do list.

Creating a new Observable from the data emitted by an Observable

When you’re working with Observables it’s important to keep in mind that Observables are asynchronous pipelines for data: they emit data as it becomes available rather than containing data. A subscriber to an Observable receives any new data as it becomes available. The subscriber does not receive any data from the Observable emitted before the subscription, except in the case of a couple of implementations of the Observable interface that broadcast historical data to new subscribers.

Using the list of directors emitted by the directors$ observable, you can determine the ID for Quentin Tarantino and retrieve the list of his movies by performing a request to the /directors/[id]/movies endpoint. Use the JavaScript find method to select the appropriate element from the array emitted by the directors$ observable and the map operator to select the ID number from the record. The pipe method converts the synchronous results of the JavaScript find method to an Observable:

const directorId$ = directors$.pipe(
    map(directors => directors.find(director => director.name === "Quentin Tarantino").id)
);
directorId$.subscribe(console.log);

The above code will produce an Observable emitting the appropriate director ID when the API call returns values to the directors$ observable.

Once you have the ID you are ready to perform a GET request to https://maciejtreder.github.io/asynchronous-javascript/directors/2/movies/ and retrieve a list of Quentin Tarantino’s movies. Because you’ll want to continue processing the sequence of API calls asynchronously, you’ll want to produce an Observable with the results of the API call. You will have a new Observable, rather than modified output from the source Observable.

The directorId$ constant declaration uses a synchronous function, the JavaScript find function, on the values  emitted by the directors$ observable. But the retrieval of the list of movies will perform an operation on the source Observable, directorId$, creating a new Observable, directorMovies$ by using the flatMap operator.

Modify the rxjs/operators import statement in the rx-http-request.js file by adding flatMap to it:

import { map, flatMap } from 'rxjs/operators';

Add the following constant declaration to the bottom of the rx-http-request.js file:

const directorMovies$ = directorId$.pipe(
   flatMap(id => {
        return RxHR.get(`${BASE_PATH}/directors/${id}/movies`, {json: true}) 
   }),
   map(resp => resp.body)
);
directorMovies$.subscribe(console.log);

Run the program. In the output you should see a list of movies directed by Quentin Tarantino:

 { id: 4

The program flow is depicted in the following diagram:

flowchart of program execution from source API to the map operator and thence to the flatMap operator
You’ve done the second step in the case study!

If you haven’t been following along with the coding and want to catch up to this step using the code from the GitHub repository, execute the following commands in the directory where you’d like to create the project directory:

git clone https://github.com/maciejtreder/asynchronous-javascript.git
cd asynchronous-javascript
git checkout step13
npm install
cd rxjs

Wait for multiple Observables to emit values

Now comes the tricky part. You’re going to take the asynchronous results of multiple Observables, perform an action on them, and combine them.

To do this, you’ll need two pieces of code:

1) a function, getAverageScore, that returns the average review score for a movie passed as an argument, which it does by:

  1. using RxHR to call the appropriate API endpoint and return an observable,
  2. use the JavaScript reduce method to aggregate the scores for each movie within the map operator to return the average score,
  3. use the .map operator a second time to return the title and average score as an observable.
  4. The .pipe operator passes the results of the first .map to the second one.

2) a constant declaration, `moviesRatings$`, that:

  1. calls `getAverageScore` function for each movie in the `directorMovies$` observable,
  2. adds the observable containing the results to an array of observables (`observables$`), and
  3. uses the combineLastest operator to return an updated observable, `movieRatings$`, containing the average score for each movie.

Add the following import statement at the top of rx-http-request.js file:

import { combineLatest } from 'rxjs';

And following code to the bottom of rx-http-request.js:

function getAverageScore(movie) {
    const reducer = (accumulator, currentValue) => accumulator + currentValue.rating;
    return RxHR.get(`${BASE_PATH}/movies/${movie.id}/reviews`, {json: true}).pipe(
        map(response => response.body.reduce(reducer, 0) / response.body.length),
        map(response => { return {title: movie.title, averageScore: response}; })
    );
} 

const moviesRatings$ = directorMovies$.pipe(
    flatMap(movies => {
        const observables$ = [];
        movies.forEach(movie => observables$.push(getAverageScore(movie)));
        return combineLatest(observables$);
    })
);

moviesRatings$.subscribe(console.log);

The above code is iterating through Quentin Tarantino’s movie list. For each entry it invokes the getAverageScore function, which calls the movies URI and calculates the average score from the response. The getAverageScore function returns an Observable; this is why you are using the flatMap rather than the map operator.

You’re ready to determine the best movie. In this case study the static APIs you’re using are fast and reliable, but in production systems you’ll have to account for APIs that may be slow to respond. The code above makes multiple API calls to compute average review scores, so you’ll need a mechanism to accumulate these responses as they become available. This is where the combineLatest function is useful.

The combineLatest function is a “container” for Observables. The function accepts an array of Observables as a parameter, and returns one Observable which represents all of them. When all of the passed Observables emit at least one value, then the Observable returned by combineLatest emits the first set of data. Subsequently, whenever one of the passed Observables emit new data, the combineLatest observable updates the collection and emits it again. The state of the Observable emitted by the combineLatest observable changes to complete once the state of all of the inner Observables is complete.

Notice that you did not subscribe to the inner Observables anywhere in your code. The combineLatest function subscribes to them automatically once there is at least one subscriber to the combineLatest observable.

The logic implemented by this part of code is depicted in the following diagram:

depiction of data from multiple APIs being combined into an array by the combineLatest operator
Run the program. In the console window you should see the following output:
 { title: 'Django Unchained'

If you haven’t been following along with the coding and you want to catch up to this step using the code from the GitHub repository, execute the following commands in the directory where you’d like to create the project directory:

git clone https://github.com/maciejtreder/asynchronous-javascript.git
cd asynchronous-javascript
git checkout step14
npm install
cd rxjs

Determine the verdict

Now you are ready to take the last step (5) and determine which movie is the best. To do this you’ll sort the array of movies produced by the combineLatest observable by the average score field from highest to lowest and display the first element in the array:

const best$ = moviesRatings$.pipe(map(movies => movies.sort((m1, m2) => m2.averageScore - m1.averageScore)[0].title));

best$.subscribe(result => console.log(`The best movie by Quentin Tarantino is... ${result}!`));

If you’d like to verify that you’ve done all the coding correctly up to this point, you can check your code against the corresponding file in the companion repository at this URL:

https://github.com/maciejtreder/asynchronous-javascript/blob/step15/rxjs/rx-http-request.js 

Run the program:

node -r esm rx-http-request.js

You should see the following output in your terminal window:

The best movie by Quentin Tarantino is... Inglourious Basterds!

If you haven’t been following along with the coding and you want to catch up to this step using the code from the GitHub repository, execute the following commands in the directory where you’d like to create the project directory:

git clone https://github.com/maciejtreder/asynchronous-javascript.git
cd asynchronous-javascript
git checkout step15
npm install
cd rxjs

Performing an ancillary Observable action: adding an interactive element to the command line output

The program you’ve created thus far is the functional solution. Unfortunately, APIs and internet connections can be slow. When that happens, users waiting for results may get the feeling that the program is hung up. To avoid this, it’s worth adding a user interface element, like a spinner to show users that a background process is running. For Node.js command line programs you can use the ora library for that purpose.

If you have been following this series of posts on Asynchronous JavaScript and coding this application as you go, you’ll already have the ora library dependency installed. If not, install it with the following command:

npm install ora

At the top of the rx-http-request.js file, add statements to import the ora library and initialize the spinner:

import ora from 'ora';

const spinner = ora().start(`The best movie by Quentin Tarantino is...`);

When the program is started the spinner will be displayed on the command line.

The last step is to implement a mechanism which will stop the spinner once the results are ready. Modify the import statement from rxjs/operators and add the tap operator to it:

import { map, flatMap, tap } from 'rxjs/operators';

The tap operator is used to perform an action which does not affect the Observable output whenever the Observable emits a value. Use it at the end of the file to stop spinner by replacing the current best$ subscription with the following code:

best$
.pipe(tap(_ => spinner.stop())) //this has been added
.subscribe(result => console.log(`The best movie by Quentin Tarantino is... ${result}!`));

Run the program.

The output should show the spinner, followed by “The best movie by Quentin Tarantino is… “. When the data retrieval from the API endpoints is complete the spinner will disappear and the name of the best movie will appear, as shown in the illustration below:

Animated GIF depicting the use of the ora library spinner on a CLI interface.

 

That’s it! You’ve used RxJS Observables to asynchronously get data from a sequence of APIs to determine the best movie by Quentin Tarantino, based on the review data available.

If you haven’t been following along with the coding and you want to get the completed project code from the GitHub repository, execute the following commands in the directory where you’d like to create the project directory:

git clone https://github.com/maciejtreder/asynchronous-javascript.git
cd asynchronous-javascript
git checkout step16
npm install
cd rxjs

Summary

This post introduced you to using ReactiveX programming with REST APIs using the @akanass/rx-http-request library. You saw how to manipulate retrieved data. You also learned how to perform other actions when data is emitted by Observables.

Additional Resources

ReactiveX introductions and tutorials – The ReactiveX website has an extensive list of resources for learning more about Reactive Programming and the various language implementations. They’re 3rd-party productions, so watch and read at your own discretion.

RxJS – The RxJS website is the place for canonical information about the library for JavaScript.

Which Operator do I use? – A helpful tool for choosing the best Observables operator for a desired action. 

If you want to learn more about JavaScript Promises, which are a better asynchronous tool for some programming situations, check out the following posts here on the Twilio Blog:

Maciej Treder is a Senior Software Development Engineer at Akamai Technologies. He is also an international conference speaker and the author of @ng-toolkit, an open-source toolkit for building Angular progressive web apps (PWAs), serverless apps, and Angular Universal apps. Check out the repo to learn more about the toolkit, contribute, and support the project. You can learn more about the author at https://www.maciejtreder.com. You can also contact him at: contact@maciejtreder.com or @maciejtreder on GitHub, Twitter, StackOverflow, and LinkedIn.

 

Updated code 2020-01-27