0%
menu toggle

Lau de Bugs

logo
1

When working with observable streams, often one uses RxJs operators to pipe into the stream (i.e. using pipe-able operators). Some of these operators take observers as an argument. An observer is an object that consumes values delivered by an observable and often implements one or more of the next, error and complete functions.

The RxJs Subject is a type of observable that is also an observer. A common pattern I find myself implementing in several projects is defining a Subject, often also a BehaviorSubject which then holds different pieces of data to be consumed in differnt parts of an application. In order to pass data to the Subject, which is also an observer, we call the .next with the data the Subject should hold. A simple example would be while using the tap operator to perform side effects in our observable stream.

A common pitfall is then passing Subject.next directly as the argument to a pipeable operator. For instance, when using tap, calling tap(new Subject().next). We will see how this can cause unexpected errors that are may be hard to debug and how to avoid it.


Suppose you have an RxJs subject that is keeping track of the value in an observable stream (say called stream$).

COPIED
import { Subject, timer, take, tap } from 'rxjs'
const stream$ = timer(0, 1000).pipe(take(5))

One way to pass the current value to the subject is using the tap operator that accepts an observer, i.e. an object containing the next, error and complete functions.

If we only pass a callback function that logs out the current value in the observable stream, we would have something that looks like:

COPIED
stream$.pipe(
tap(console.log)
).subscribe()
/*
// Result would look like:
0
1
2
3
4
*/

If we have a subject called _count where we would like to keep track of the current value in the stream, the first instinct would be to replace console.log with _count.next:

COPIED
const _count = new Subject<number>()
stream$.pipe(
tap(_count.next)
).subscribe()

However, you'll notice that the above implementation does not work, resulting in the error: TypeError: _this._throwIfClosed is not a function.

The reason being that RxJs's Subject is a class whose next implementation relies on this keyword which refers to the _count instance. You can view the source code here. Passing just the _count.next function would reference this which refers to the global scope and not the _count instance.

We can see this in action by implementing our own observer that references this:

COPIED
const observerStore = {
store: new Array<number>(),
next(value: number) {
this.store.push(value)
},
}
/* Replacing the _count with our observer would then look like below👇🏻 */
stream$.pipe(
tap(observerStore.next)
).subscribe()

The implementation above would also give us an error: TypeError: Cannot read properties of undefined (reading 'push'). For the same reason that the this reference would refer to the global object and not the observerStore object.

It's worth noting that if the observer's next implementation does not reference the this keyword, then passing in the .next function would work as expected.

For instance, if our observerStore's next function just logged out the value, then passing in the observerStore.next to tap would work as expected since the next function does not reference this:

COPIED
const observerStore = {
store: new Array<number>(),
next(value: number) {
console.log(value)
},
}
/* Works! */
stream$.pipe(tap<number>(_count.next)).subscribe()

Solutions

  1. Pass in the observer object

    The straightforward solution would be to simply to pass in the observer object into the tap operator:

    COPIED
    /* This works */
    stream$.pipe(
    tap(observerStore)
    ).subscribe()
  2. Bind the observer to this

    One could use Function.prototype.bind which is available to the function prototype to bind this to the observer object so that when this is referenced, the function references the observer instead of the global this object:

    COPIED
    /* _count Subjct */
    stream$.pipe(
    tap(_count.next.bind(_count))
    ).subscribe()
    /* the observerStore */
    stream$.pipe(
    tap(observerStore.next.bind(observerStore))
    ).subscribe()

    As the MDN docs state, “The bind() method creates a new function that, when called, has its thiskeyword set to the provided value, with a given sequence of arguments preceding any provided when the new function is called.”

Although both of the solutions work, passing the observer object is much more clear to another reader of the code on what is going on whereas the latter would cause someone who didn't write the code to stop and ask why we are calling .bind in the first place.

Happy hacking!

console.log('right here')

References


🤔 Suggest an edit on GitHub by submitting a pull request open_in_new
Last modified on: Mon Feb 06 2023