Understand observables by building a toy library
2019
Observables / RxJS
It’s easy to start building things with an observales library such as RxJS and then to realise that you’re not really sure how these “observables” actually work under the hood.
The goal of this tutorial is to build a small toy library inspired by RxJS.
This is an intentionally simplified implementation of observables for the purpose of understanding how observables can be constructed from scratch.
We’re also going to implement pipeable operators so that we can transform the values being produced by our observables.
Table of contents
Assumptions
I will assume that you have some basic experience with an observables library such as RxJS.
A simple observable
First of all, we need a simple class, which we’ll call Observable
of course:
class Observable {
}
We’ll accept a function as an argument to the constructor that will “produce” values. Let’s call it producer
.
class Observable {
constructor(producer) {
this._producer = producer
}
}
Now, we want to be able to subscribe to the produced values, so we’ll add a subscribe
method.
When we subscribe, we will supply a callback function that handles the produced values. Let’s call it observer
.
We’ll pass along the observer
to the producer
function:
class Observable {
constructor(producer) {
this._producer = producer
}
subscribe(observer) {
this._producer(observer)
}
}
This effectively means that we can call .subscribe()
on our observable and actually be invoking the producer
function:
const obs$ = new Observable(producer)
obs$.subscribe( ... ) // Invoke "producer"
So, our producer
function takes a callback function, observer
, that we can invoke with whatever values we want.
Let’s subscribe to a new observable to see how it works:
function producer(observer) {
observer('HELLO')
observer('WORLD')
}
function observer(value) {
console.log(value)
}
const obs$ = new Observable(producer)
obs$.subscribe(observer) // Passes "observer" to "producer"
// HELLO
// WORLD
…or to make it a little cleaner:
const obs$ = new Observable(observer => {
observer('HELLO')
observer('WORLD')
})
obs$.subscribe(v => console.log(v))
// HELLO
// WORLD
Observables are lazy. This is a good thing as they don’t start producing any values until we subscribe. It’s only when we subscribe that we invoke the producer
function.
Observables from arrays
We can take this a step further by looping over an array of values and passing each one to the observer
:
const obs$ = new Observable(observer => {
const values = [1, 2, 3, 4, 5]
values.forEach(v => observer(v))
})
obs$.subscribe(v => console.log(v))
// 1
// 2
// 3
// 4
// 5
We can now package this up into a handy function by taking the array of values as an argument instead, and then returning the new observable:
function fromArray(values) {
return new Observable(observer => {
values.forEach(v => observer(v))
})
}
We can now pass in an array of values and subscribe to handle them like this:
const obs$ = fromArray([1, 2, 3, 4, 5])
obs$.subscribe(v => console.log(v))
// 1
// 2
// 3
// 4
// 5
Observables from DOM events
We can also create observables from an ongoing stream of events, such as from a DOM element.
Let’s create a new function that takes a DOM element and the name of an event to listen for.
We can then assign the observer
as the event handler:
function fromEvent(element, eventName) {
return new Observable(observer => {
element.addEventListener(eventName, observer)
})
}
We can test this out creating an HTML page and importing our JavaScript file.
Now let’s add a simple button to the page:
<button id="btn">CLICK ME</button>
…and then create an observable that emits when the button is clicked:
const btn = document.querySelector('#btn')
const obs$ = fromEvent(btn, 'click')
obs$.subscribe(v => console.log(v))
// [Event object]
// [Event object]
// [Event object]
// ...
Unsubscribing
Observables from events are different from observables from an array in that they can continue indefinitely, where as the observable from an array produces all of its values immediately.
Therefore, we are going to need a way to unsubscribe from the observable.
We’ll do this by returning an object from our producer
function containing an unsubscribe
method that takes care of cleaning up by removing the event listener:
function fromEvent(element, eventName) {
return new Observable(observer => {
element.addEventListener(eventName, observer)
return {
unsubscribe() {
element.removeEventListener(eventName, observer)
}
}
})
}
We’ll need to now make sure that we return this from our subscribe
method in our Observable
:
class Observable {
constructor(producer) {
this._producer = producer
}
subscribe(observer) {
// Return result of "producer" containing "unsubscribe"
return this._producer(observer)
}
}
We can test this by unsubscribing after a timeout period. Clicking the button will log event objects to the console until the timeout has passed:
const btn = document.querySelector('#btn')
const obs$ = fromEvent(btn, 'click')
const subscription = obs$.subscribe(v => console.log(v))
setTimeout(() => {
subscription.unsubscribe()
}, 5000)
Our fromArray
function produces all of its values at once so we’ll never actually need to unsubscibe. However, to keep the interface consistent we can simply return a function that does nothing:
function fromArray(values) {
return new Observable(observer => {
values.forEach(v => observer(v))
return {
unsubscribe() {} // No-op
}
})
}
Pipeable operators
If you’ve used RxJS, then you no doubt will have come across the concept of operators as they are the true power of observables
Operators are a way to manipulate the data being produced before eventually being handled with a subscription.
We want to be able to pass our values through a series of transformations before we eventually subscribe.
For example, a map
operator will take each value and pass it through a transformation function and a filter
operator will take each value and only pass it along if the condition passes:
const obs$ = new Observable( ... )
.pipe(
filter( ... ), // Filter out some values
map( ... ) // Transform values
)
obs$.subscribe(v => console.log(v))
Creating operators
Our map
and filter
functions need to take in a function as an argument, which will be used to transform or filter the values later.
They then need to return a new function, our “operator”.
This “operator” function will, in turn, be passed a source observable. This source observable will be either the original observable that we call pipe
on, or the previous operator in the pipeline:
function filter(predicateFn) {
return function operator(srcObservable) {
// ...
}
}
function map(transformFn) {
return function operator(srcObservable) {
// ...
}
}
This should become a little clearer in the next section.
Chaining operators
Let’s now add our pipe
functionality to our Observable
class:
class Observable {
constructor(producer) {
this._producer = producer
}
subscribe(observer) {
return this._producer(observer)
}
pipe(...operators) {
// ...
}
}
Our pipe
function gathers all of its arguments into an array, which we’ve called operators
. These are the “operator” functions that we return from map
and filter
above.
Remember, each operator is a function that takes in a source observable.
Now back to map
and filter
.
The operator function should return a new observable:
function filter(predicateFn) {
return function operator(srcObservable) {
return new Observable(observer => {
// ...
})
}
}
function map(transformFn) {
return function operator(srcObservable) {
return new Observable(observer => {
// ...
})
}
}
Now, each new observable returned from an operator will be passed into the next operator as its source observable, srcObservable
.
We want to string our operators together by passing each one into the next. Something like this:
operator(operator(operator(initialObservable))) // ...and so on
We can use reduce
to achieve this:
class Observable {
constructor(producer) {
this._producer = producer
}
subscribe(observer) {
return this._producer(observer)
}
pipe(...operators) {
return operators.reduce(
(src, operator) => operator(src),
this // Initial source observable
)
}
}
The reduce function starts with the observable that we are calling the pipe
function on, this
.
It then loops over the array of operators and passes each new observable into the next operator.
When there are no more operators left in the pipeline, the last observable will be returned from the pipeline and can eventually be subscribed to in order to handle the final, transformed values.
Filter
We now need each new observable in the pipeline to perform its specific operation and pass along the transformed values to the next observable in the chain.
The new observable created by our filter
operator should filter out values that don’t meet a specified condition.
We can do this by subscribing to the source observable and, when it produces a value, pass that value along to the observer
only if the predicate function returns true:
function filter(predicateFn) {
return function operator(srcObservable) {
return new Observable(observer => {
const subscription = srcObservable.subscribe(v => {
if (predicateFn(v)) {
observer(v) // Pass on value unchanged
}
})
return subscription
})
}
}
We also need to return the subscription so that we can eventually unsubscribe, if needed.
We can test this out with an example:
<input id="textbox" type="text" />
const textbox = document.querySelector('#textbox')
const obs$ = fromEvent(textbox, 'keyup')
.pipe(
filter(e => e.keyCode === 13) // "Enter" key
)
obs$.subscribe(v => console.log(v))
Type in the text box and you will see an event object logged to the console, only when you hit the “Enter” key.
Map
We can now do the same as above for map
but, this time, we will pass along transformed values by passing each source value through our transform function before passing it along to the observer
:
function map(transformFn) {
return function operator(srcObservable) {
return new Observable(observer => {
const subscription = srcObservable.subscribe(v => {
observer(transformFn(v))
})
return subscription
})
}
}
We can now extend our previous example by adding map
to the pipeline:
<input id="textbox" type="text" />
const textbox = document.querySelector('#textbox')
const obs$ = fromEvent(textbox, 'keyup')
.pipe(
filter(e => e.keyCode === 13), // "Enter" key
map(e => e.target.value)
)
obs$.subscribe(v => console.log(v))
Now if you type in the text box and hit the “Enter” key, instead of an event object you will see the value of the textbox logged to the console.
Unsubscribing from all
Let’s extend our example again to unsubscribe after a timeout:
<input id="textbox" type="text" />
const textbox = document.querySelector('#textbox')
const obs$ = fromEvent(textbox, 'keyup')
.pipe(
filter(e => e.keyCode === 13), // "Enter" key
map(e => e.target.value)
)
const subscription = obs$.subscribe(v => console.log(v))
setTimeout(() => {
subscription.unsubscribe()
}, 5000)
After the timeout, we call unsubscribe
on our subscription, which is actually the subscription from the map
operator, as it was the last observable in the chain.
This subscription from the map
operator is actually the subscription returned from its source observable, which was returned from the filter
operator.
The subscription from the filter
operator is actually the subscription from its source observable, which was returned from fromEvent
.
Therefore, we have actually unsubscribed from the original observable and no more values will be emitted.
End
That’s it. Hopefully this has helped to give a better understanding of how observables work.
Feel free to build upon this library and add your own functions and operators.