Declarative RxJS with Custom Operators
Improving the readability of RxJS streams by using custom operators
Usually, RxJS streams tend to get a bit messy and unreadable when a lot of pipe operators are needed. One reason for this unreadability is the imperative nature of these RxJS streams. Therefore, making them declarative by using custom operators makes your streams a lot more clean and readable.
Declarative vs Imperative Programming
Not necessarily specific to Angular or Typescript, the discussion of declarative vs imperative programming is a general discussion abroad all programming languages. In its core it is just about making code more naturally readable with the downside of doing more code extractions. In general, a declarative approach is considered to be more clean, because it is the more readable one. It basically hides a lot of low-level details away through function extractions and defines very explicit and readable function names.const arr = [1, 2, 3, 4, 5, 6];
const even = [];
for (const item of arr) {
if (item % 2 === 0) {
even.push(item);
}
}
console.log(even); // 2, 4, 6
const arr = [1, 2, 3, 4, 5, 6];
const even = arr.filter(item => isEven(item));
console.log(even); // 2, 4, 6
function isEven(value: number): boolean {
return value % 2 === 0;
}
Both examples are totally valid and executable code, but the latter is better in terms of readability, because it extracts a lot of the low level functionality into distinct functions that have an explicit naming. But how does this concept translate to RxJS? Isn't all RxJS code by nature declarative? Well yes, but also no...
We use pipe operators and chain them in whichever order, which by itself is declarative, but when more logic is added inside the operators, it tends to be more imperative. In order to prevent the imperative paradigm, we can create custom pipe operators and extract the logic into them.
Custom Pipe Operators
Writing custom operators is a lot easier than one might expect. The simplest operators simply are functions that get an observable and return an observable. Therefore, we could simply return the passed observable and just pipe it and add some operators there. But to be more precise, we should write a higher order function that returns a function which then gets an observable and returns a new observable. This is a lot of meaning packed into one sentence, but don't worry, because the code below will explain this a lot better:
export function log<T>(): (source$: Observable<T>) => Observable<T> {
return (source$) => source$.pipe(tap(console.log));
}
We use a generic such that the typing is not changed through our custom pipe operator. Then we return an anonymous function that gets the source$ , which basically is just the outer observable, and then return a pipe of it.
Example
This example contains a form control in which the user can put a command for calculating numbers. Such a command could look like these:
- add,1,2,3,4
- subtract,1,1,1,2
- multiply,3,4,5
I. Before
result$ = this.control.valueChanges.pipe(
debounceTime(800),
distinctUntilChanged(),
filter((v): v is string => !!v && typeof v === 'string'),
map((v) => v.split(',')),
filter((v) => v.length >= 1),
map((v) => {
if (v[0] === 'add') {
return v
.slice(1)
.map((v) => +v)
.reduce((acc: number, current: number) => {
return acc + current;
}, 0);
} else if (v[0] === 'subtract') {
return v
.slice(1)
.map((v) => +v)
.reduce((acc: number, current: number) => {
return acc - current;
}, 0);
} else if (v[0] === 'multiply') {
return v
.slice(1)
.map((v) => +v)
.reduce((acc: number, current: number) => {
return acc * current;
}, 1);
}
return null;
})
);
II. After
import {
assertNumber,
assertString,
command,
lookAhead,
split,
} from './custom-operators';
[...]
result$ = this.control.valueChanges.pipe(
lookAhead(),
assertString(),
split(),
command(),
assertNumber()
);
export function lookAhead<T>(): (source$: Observable<T>) => Observable<T> {
return (source$) => source$.pipe(debounceTime(800), distinctUntilChanged());
}
export function assertString(): (
source$: Observable<unknown>
) => Observable<string> {
return (source$) =>
source$.pipe(
filter((value): value is string => !!value && typeof value === 'string')
);
}
export function assertNumber(): (
source$: Observable<unknown>
) => Observable<number> {
return (source$) =>
source$.pipe(
filter((value): value is number => !!value && typeof value === 'number')
);
}
export function split(): (source$: Observable<string>) => Observable<string[]> {
return (source$) =>
source$.pipe(
map((v) => v.split(',')),
filter((v) => v.length >= 1)
);
}
export function command(): (
source$: Observable<string[]>
) => Observable<number | null> {
return (source$) =>
source$.pipe(
map((v) => {
if (v[0] === 'add') {
return v
.slice(1)
.map((v) => +v)
.reduce((acc: number, current: number) => {
return acc + current;
}, 0);
} else if (v[0] === 'subtract') {
return v
.slice(1)
.map((v) => +v)
.reduce((acc: number, current: number) => {
return acc - current;
}, 0);
} else if (v[0] === 'multiply') {
return v
.slice(1)
.map((v) => +v)
.reduce((acc: number, current: number) => {
return acc * current;
}, 1);
}
return null;
})
);
}