Reactive Programming
JavaScript--Observables Under the Hood
Jafar Husain--Async JavaScript with Reactive Extensions
Reactive programming is the combination of the GoF's Observer pattern with its Iterator pattern to make it easier to handle streams of data. It's becoming increasingly important in JavaScript for it's usefulness in handling async data, both on the frontend and on the backend. Observables will likely be introduced natively to JS in the near future.
Iterator: allows you to pull data from a collection. An iterator (like an array) will allow you to continuously pull the next piece of data in it until it's finished.
Observer: A producer of data sends data to consumer one item at a time (pub/sub).
Observables allow for a progressive sequence of values to be push and progressively processed in a way that makes sense to the developer.
The functions below return Observables. They essentially tell the Observable how to iterate over the incoming. fromArray is very straightforward--it just needs to run the boring Array.prototype.forEach on each value in the array (the same thing can be accomplished with for..of) and then call the complete method.fromEvent needs to setup a DOM event listener. fromPromise needs to call .then on it's promises. Arrays and promises have a definitive end point, but events can last forever, so fromEvent needs a way to stop listening for event (and prevent memory leaks), so it also returns a function which can be called that does just that. In all cases, you can see that the value that is received is pushed out to observer.next, so that it can be processed using other methods.
function fromEvent(element, event) {
return Observable(function (observer) {
element.addEventListener(event, observer.next);
// return function which can be called later to stop listening
return function() {
element.removeEventListener(event, observer.next);
};
});
}
function fromArray(array) {
return Observable(function (observer) {
array.forEach(observer.next);
observer.complete();
});
}
function fromPromise(promise) {
return Observable(function (observer) {
promise.then(function (val) {
observer.next(val);
observer.complete();
})
.catch(function(e) {
observer.error(e);
observer.complete();
});
});
}
The subscribe function just dictates how to handle the eventual responses that come out of the Observable. If, for example, console.log is provided as an argument, then it will log out whatever is received. Ideally, you should pass in an object with keys for next, error, and complete (whose values are functions saying what to do in each case). Please note that error and complete are optional (they'll call no-op functions by default), but next is required.
function Observable(functionThatThrowsValue) {
function subscribe(next, error, complete) {
if (typeof next === 'function') {
return subscriberFunction({
next,
error: error || () => {},
complete: complete || () => {}
});
} else {
return subscribeFunction(next);
}
}
...
return {
subscribe,
...
};
}
data$.subscribe({
next: console.log,
error: console.error,
complete() { console.log('complete') }
});
The most powerful feature of Reactive Programming is the ability to elegantly manipulate the values that are received before having to do something with the eventual result. map works just as you would expect map to: every value that is passed through has a higher order function called on it. filter is similar, it works just as any filter method would. But the ability to chain these methods together allows you to get out exactly what you want.
function Observable(functionThatThrowsValue) {
...
function map(projectionFunction) {
return Observable(function (observer) {
return subscribe({
next: function (val) {
return observer.next(projectionFunction(val))
},
error: observer.error,
complete: observer.complete
});
};
}
function filter(textFunction) {
return Observable(function (observer) {
return subscribe({
next: function (val) {
if (testFunction(val)) {
return observer.next(val);
}
},
error: observer.error,
complete: observer.complete
});
};
}
...
return {
...
map,
...
};
}
var data$ = fromArray([1,2,3])
.map(val => val * 2)
.filter(val => val % 4 !== 0)
// doubles each value coming from array, and then returns values that are not divisible by four.
// 2
// 6
mergeMap allows you to map over values to an Observable and then flatten them all. This is useful for dealing with anything that will result in nested Observables.
function Observable(functionThatThrowsValue) {
...
function mergeMap(anotherFunctionThatThrowsValue) {
return Observable(function (observer) {
return subscribe({
next(val) {
anotherFunctionThatThrowsValue(val).subscribe({
next: observer.next,
error: observer.error,
complete: observer.complete
})
},
error: observer.error,
complete: observer.complete
});
}
...
return {
...
mergeMap,
...
};
}
function promise1 (val) {
return new Promise(function(resolve, reject) {
setTimeout(() => resolve(val * 2), 3000);
});
}
function promise2 (val) {
return new Promise(function(resolve, reject) {
setTimeout(() => resolve(val + 1), 4000);
});
}
var data$ = fromArray([1,2,3])
.mergeMap(val => fromPromise(promise1(val))
.map(val => val + 5)
.mergeMap(val => fromPromise(promise2(val));
// after 7 seconds
// 8
// 10
// 12
There are many other useful methods that can be added to the Observable function:
- switchMap: only the last value is used in a stream. Useful in conjunction with debounce to just take the value of a field after a user has stopped typing, or if you're making multiple HTTP requests and you just want the last one to respond.
- concatMap: will preserve a sequences order from how it's requested. A second sequence won't be requested until the first one is done. mergeMap will not preserve the requested order. It will return values as they're received.
- zip: when multiple sequences requested, it will complete as soon as one is fully received.
- take: can specify the number of values returned.
- takeUntil: can specify a condition which will trigger a complete.