reactive-x

An introduction to RxJS

by

Diego Barahona

We have been using promises for quite a lot now, and they work perfectly for what they were created: resolve ONE async value and handle the errors it might bring in a straightforward and compact way.
But what if you need to deal with multiple values through time. Once you fulfill a promise you no longer can use it to deliver new data. What to do in those cases?

Well, we have Observables. A TC39 standard proposal for next generations of javascript.
But this proposal is still in early phases, so let’s better talk about a more mature option. RxJS is an open-source library for a variety of languages (javascript included) dedicated to deal with Observables. LoDash is to Arrays what RxJS is to Observables.

So what’s an Observable? Well, we can describe it as an asynchronous array.
So you have an empty array and you start adding elements to it and make operations over it asynchronously. That’s an Observable.

Imagine you have a button, whenever you click on it, it makes an API call to donate $1 to NinjaDevs. So you’ll obviously want to click it a lot of times.
You can write an event listener with a callback to call the API with another callback (or promise) and you’re done.
But what if the scenario becomes more complex? What if you need to wait one second to fetch the API again, or what if you want to count double clicks or N clicks and make different donation depending on that? You can’t do that kind of logic with an event listener, at least not easily.

Instead, you can rely on the Observable pattern. You have one observer (the function to call the API) and one observable (the click event). Remember I said Observables are like async Arrays? Well, you have a lot of array functions you can use in Observables that are the same in arrays, map for example.

const clickStream = Observable.fromEvent(button, 'click');

clickStream
  .buffer(clickStream.debounce(250)) // wait 250ms from click to click
  .map(list => list.length) // map each list of clicks within less than 250ms of separation into a number
  .map(number => `http://ninjadevs.io/donate?amount${number}`) // convert the number to an URL
  .concatMap(url => Observable.fromPromise(fetch(url)))
  .subscribe(() => {
    // donation successfull
  });

So what we did is:

  • Collect the stream of clicks
  • Creates a buffer of click events, this buffer will be reseted after 250ms of their previous event
  • Map the list of clicks collected in the buffer to a number
  • Map the number to an actual URL
  • Map each URL into an Observable (concatMap is the right one to use here)
  • Finally subscribe to each of the success donations done

More graphically, it might looks lik

  Click Stream ---c-------c-c-------c-------c-c-c--
 Buffer Stream --(c)------(cc)-----(c)------(ccc)--
Counter Stream ---1--------2--------1---------3----
    URL Stream --URL1-----URL2-----URL1------URL3--
  Fetch Stream -{...}----{...}----{...}------{...}-

It’s worth mention that observables are immutable and any operator produces a new observable.

RxJS has a very rich API with lots of handy functions to do virtually anything you want. But let’s end this quick introduction here for today. Stay tuned if you want to learn more about RxJS functions and how to use them.

Comments

comments

Powered by Facebook Comments