Steve Holgado

Understand observables by building a toy library

2019Observables / RxJS

It’s easy to start building things with an observales library such as RxJS and then to realise that you’re not really sure how these “observables” actually work under the hood.

The goal of this tutorial is to build a small toy library inspired by RxJS.

This is an intentionally simplified implementation of observables for the purpose of understanding how observables can be constructed from scratch.

We’re also going to implement pipeable operators so that we can transform the values being produced by our observables.

Table of contents

Assumptions

I will assume that you have some basic experience with an observables library such as RxJS.

A simple observable

First of all, we need a simple class, which we’ll call Observable of course:

class Observable {

}

We’ll accept a function as an argument to the constructor that will “produce” values. Let’s call it producer.

class Observable {
  constructor(producer) {
    this._producer = producer
  }
}

Now, we want to be able to subscribe to the produced values, so we’ll add a subscribe method.

When we subscribe, we will supply a callback function that handles the produced values. Let’s call it observer.

We’ll pass along the observer to the producer function:

class Observable {
  constructor(producer) {
    this._producer = producer
  }

  subscribe(observer) {
    this._producer(observer)
  }
}

This effectively means that we can call .subscribe() on our observable and actually be invoking the producer function:

const obs$ = new Observable(producer)

obs$.subscribe( ... ) // Invoke "producer"

So, our producer function takes a callback function, observer, that we can invoke with whatever values we want.

Let’s subscribe to a new observable to see how it works:

function producer(observer) {
  observer('HELLO')
  observer('WORLD')
}

function observer(value) {
  console.log(value)
}

const obs$ = new Observable(producer)

obs$.subscribe(observer) // Passes "observer" to "producer"

// HELLO
// WORLD

…or to make it a little cleaner:

const obs$ = new Observable(observer => {
  observer('HELLO')
  observer('WORLD')
})

obs$.subscribe(v => console.log(v))

// HELLO
// WORLD

Observables are lazy. This is a good thing as they don’t start producing any values until we subscribe. It’s only when we subscribe that we invoke the producer function.

Observables from arrays

We can take this a step further by looping over an array of values and passing each one to the observer:

const obs$ = new Observable(observer => {
  const values = [1, 2, 3, 4, 5]
  values.forEach(v => observer(v))
})

obs$.subscribe(v => console.log(v))

// 1
// 2
// 3
// 4
// 5

We can now package this up into a handy function by taking the array of values as an argument instead, and then returning the new observable:

function fromArray(values) {
  return new Observable(observer => {
    values.forEach(v => observer(v))
  })
}

We can now pass in an array of values and subscribe to handle them like this:

const obs$ = fromArray([1, 2, 3, 4, 5])

obs$.subscribe(v => console.log(v))

// 1
// 2
// 3
// 4
// 5

Observables from DOM events

We can also create observables from an ongoing stream of events, such as from a DOM element.

Let’s create a new function that takes a DOM element and the name of an event to listen for.

We can then assign the observer as the event handler:

function fromEvent(element, eventName) {
  return new Observable(observer => {
    element.addEventListener(eventName, observer)
  })
}

We can test this out creating an HTML page and importing our JavaScript file.

Now let’s add a simple button to the page:

<button id="btn">CLICK ME</button>

…and then create an observable that emits when the button is clicked:

const btn = document.querySelector('#btn')

const obs$ = fromEvent(btn, 'click')

obs$.subscribe(v => console.log(v))

// [Event object]
// [Event object]
// [Event object]
// ...

Unsubscribing

Observables from events are different from observables from an array in that they can continue indefinitely, where as the observable from an array produces all of its values immediately.

Therefore, we are going to need a way to unsubscribe from the observable.

We’ll do this by returning an object from our producer function containing an unsubscribe method that takes care of cleaning up by removing the event listener:

function fromEvent(element, eventName) {
  return new Observable(observer => {
    element.addEventListener(eventName, observer)

    return {
      unsubscribe() {
        element.removeEventListener(eventName, observer)
      }
    }
  })
}

We’ll need to now make sure that we return this from our subscribe method in our Observable:

class Observable {
  constructor(producer) {
    this._producer = producer
  }

  subscribe(observer) {
    // Return result of "producer" containing "unsubscribe"
    return this._producer(observer)
  }
}

We can test this by unsubscribing after a timeout period. Clicking the button will log event objects to the console until the timeout has passed:

const btn = document.querySelector('#btn')

const obs$ = fromEvent(btn, 'click')

const subscription = obs$.subscribe(v => console.log(v))

setTimeout(() => {
  subscription.unsubscribe()
}, 5000)

Our fromArray function produces all of its values at once so we’ll never actually need to unsubscibe. However, to keep the interface consistent we can simply return a function that does nothing:

function fromArray(values) {
  return new Observable(observer => {
    values.forEach(v => observer(v))

    return {
      unsubscribe() {} // No-op
    }
  })
}

Pipeable operators

If you’ve used RxJS, then you no doubt will have come across the concept of operators as they are the true power of observables

Operators are a way to manipulate the data being produced before eventually being handled with a subscription.

We want to be able to pass our values through a series of transformations before we eventually subscribe.

For example, a map operator will take each value and pass it through a transformation function and a filter operator will take each value and only pass it along if the condition passes:

const obs$ = new Observable( ... )
  .pipe(
    filter( ... ), // Filter out some values
    map( ... )     // Transform values
  )

obs$.subscribe(v => console.log(v))

Creating operators

Our map and filter functions need to take in a function as an argument, which will be used to transform or filter the values later.

They then need to return a new function, our “operator”.

This “operator” function will, in turn, be passed a source observable. This source observable will be either the original observable that we call pipe on, or the previous operator in the pipeline:

function filter(predicateFn) {
  return function operator(srcObservable) {
    // ...
  }
}

function map(transformFn) {
  return function operator(srcObservable) {
    // ...
  }
}

This should become a little clearer in the next section.

Chaining operators

Let’s now add our pipe functionality to our Observable class:

class Observable {
  constructor(producer) {
    this._producer = producer
  }

  subscribe(observer) {
    return this._producer(observer)
  }

  pipe(...operators) {
    // ...
  }
}

Our pipe function gathers all of its arguments into an array, which we’ve called operators. These are the “operator” functions that we return from map and filter above.

Remember, each operator is a function that takes in a source observable.

Now back to map and filter.

The operator function should return a new observable:

function filter(predicateFn) {
  return function operator(srcObservable) {
    return new Observable(observer => {
      // ...
    })
  }
}

function map(transformFn) {
  return function operator(srcObservable) {
    return new Observable(observer => {
      // ...
    })
  }
}

Now, each new observable returned from an operator will be passed into the next operator as its source observable, srcObservable.

We want to string our operators together by passing each one into the next. Something like this:

operator(operator(operator(initialObservable))) // ...and so on

We can use reduce to achieve this:

class Observable {
  constructor(producer) {
    this._producer = producer
  }

  subscribe(observer) {
    return this._producer(observer)
  }

  pipe(...operators) {
    return operators.reduce(
      (src, operator) => operator(src),
      this // Initial source observable
    )
  }
}

The reduce function starts with the observable that we are calling the pipe function on, this.

It then loops over the array of operators and passes each new observable into the next operator.

When there are no more operators left in the pipeline, the last observable will be returned from the pipeline and can eventually be subscribed to in order to handle the final, transformed values.

Filter

We now need each new observable in the pipeline to perform its specific operation and pass along the transformed values to the next observable in the chain.

The new observable created by our filter operator should filter out values that don’t meet a specified condition.

We can do this by subscribing to the source observable and, when it produces a value, pass that value along to the observer only if the predicate function returns true:

function filter(predicateFn) {
  return function operator(srcObservable) {
    return new Observable(observer => {

      const subscription = srcObservable.subscribe(v => {
        if (predicateFn(v)) {
          observer(v) // Pass on value unchanged
        }
      })

      return subscription
    })
  }
}

We also need to return the subscription so that we can eventually unsubscribe, if needed.

We can test this out with an example:

<input id="textbox" type="text" />
const textbox = document.querySelector('#textbox')

const obs$ = fromEvent(textbox, 'keyup')
  .pipe(
    filter(e => e.keyCode === 13) // "Enter" key
  )

obs$.subscribe(v => console.log(v))

Type in the text box and you will see an event object logged to the console, only when you hit the “Enter” key.

Map

We can now do the same as above for map but, this time, we will pass along transformed values by passing each source value through our transform function before passing it along to the observer:

function map(transformFn) {
  return function operator(srcObservable) {
    return new Observable(observer => {
      
      const subscription = srcObservable.subscribe(v => {
        observer(transformFn(v))
      })

      return subscription
    })
  }
}

We can now extend our previous example by adding map to the pipeline:

<input id="textbox" type="text" />
const textbox = document.querySelector('#textbox')

const obs$ = fromEvent(textbox, 'keyup')
  .pipe(
    filter(e => e.keyCode === 13), // "Enter" key
    map(e => e.target.value)
  )

obs$.subscribe(v => console.log(v))

Now if you type in the text box and hit the “Enter” key, instead of an event object you will see the value of the textbox logged to the console.

Unsubscribing from all

Let’s extend our example again to unsubscribe after a timeout:

<input id="textbox" type="text" />
const textbox = document.querySelector('#textbox')

const obs$ = fromEvent(textbox, 'keyup')
  .pipe(
    filter(e => e.keyCode === 13), // "Enter" key
    map(e => e.target.value)
  )

const subscription = obs$.subscribe(v => console.log(v))

setTimeout(() => {
  subscription.unsubscribe()
}, 5000)

After the timeout, we call unsubscribe on our subscription, which is actually the subscription from the map operator, as it was the last observable in the chain.

This subscription from the map operator is actually the subscription returned from its source observable, which was returned from the filter operator.

The subscription from the filter operator is actually the subscription from its source observable, which was returned from fromEvent.

Therefore, we have actually unsubscribed from the original observable and no more values will be emitted.

End

That’s it. Hopefully this has helped to give a better understanding of how observables work.

Feel free to build upon this library and add your own functions and operators.


Hi, I'm a Senior Front-End Developer based in London. Feel free to contact me with questions, opportunities, help with open source projects, or anything else :)

You can find me on GitHub, Stack Overflow or email me directly.