RxJs Pitfalls: Passing in a Observer's next function as a callback to pipe operators
When working with observable streams, often one uses RxJs
operators to pipe into the stream (i.e. using pipe-able operators). Some of these operators take observers as an argument. An observer is an object that consumes values delivered by an observable and often implements one or more of the next
, error
and complete
functions.
The RxJs Subject
is a type of observable that is also an observer. A common pattern I find myself implementing in several projects is defining a Subject
, often also a BehaviorSubject
which then holds different pieces of data to be consumed in differnt parts of an application. In order to pass data to the Subject
, which is also an observer, we call the .next
with the data the Subject should hold. A simple example would be while using the tap
operator to perform side effects in our observable stream.
A common pitfall is then passing Subject.next
directly as the argument to a pipeable operator. For instance, when using tap
, calling tap(new Subject().next)
.
We will see how this can cause unexpected errors that are may be hard to debug and how to avoid it.
Suppose you have an RxJs subject that is keeping track of the value in an observable stream (say called stream$
).
import { Subject, timer, take, tap } from 'rxjs'
const stream$ = timer(0, 1000).pipe(take(5))
One way to pass the current value to the subject is using the tap
operator that accepts an observer, i.e. an object containing the next
, error
and complete
functions.
If we only pass a callback function that logs out the current value in the observable stream, we would have something that looks like:
If we have a subject called _count
where we would like to keep track of the current value in the stream, the first instinct would be to replace console.log
with _count.next
:
import { Subject, timer, take, tap } from 'rxjs'
const _count = new Subject<number>()
const stream$ = timer(0, 1000).pipe(take(5))
stream$.pipe(
tap(_count.next)
).subscribe()
However, you'll notice that the above implementation does not work, resulting in the error: TypeError: _this._throwIfClosed is not a function
.
The reason being that RxJs's Subject
is a class whose next
implementation relies on this
keyword which refers to the _count
instance. You can view the source code here. Passing just the _count.next
function would reference this
which refers to the global scope and not the _count
instance.
We can see this in action by implementing our own observer that references this
:
const observerStore = {
store: new Array<number>(),
next(value: number) {
this.store.push(value)
},
}
/* Replacing the _count with our observer would then look like below👇🏻 */
stream$.pipe(
tap(observerStore.next)
).subscribe()
The implementation above would also give us an error: TypeError: Cannot read properties of undefined (reading 'push')
. For the same reason that the this
reference would refer to the global
object and not the observerStore
object.
For instance, if our observerStore
's next
function just logged out the value, then passing in the observerStore.next
to tap
would work as expected since the next
function does not reference this
:
const observerStore = {
store: new Array<number>(),
next(value: number) {
console.log(value)
},
}
/* Works! */
stream$.pipe(tap<number>(_count.next)).subscribe()
Solutions
-
Pass in the observer object
The straightforward solution would be to simply to pass in the observer object into the
tap
operator:/* This works */ stream$.pipe( tap(observerStore) ).subscribe()
-
Bind the observer to
this
One could use
Function.prototype.bind
which is available to the function prototype to bindthis
to the observer object so that whenthis
is referenced, the function references the observer instead of the globalthis
object:/* _count Subjct */ stream$.pipe( tap(_count.next.bind(_count)) ).subscribe() /* the observerStore */ stream$.pipe( tap(observerStore.next.bind(observerStore)) ).subscribe()
As the MDN docs state, “The
bind()
method creates a new function that, when called, has itsthis
keyword set to the provided value, with a given sequence of arguments preceding any provided when the new function is called.”
Although both of the solutions work, passing the observer object is much more clear to another reader of the code on what is going on whereas the latter would cause someone who didn't write the code to stop and ask why we are calling .bind
in the first place.
Happy hacking!