Declarative RxJS with Custom Operators

Improving the readability of RxJS streams by using custom operators

cover-image
portrait-image

Usually, RxJS streams tend to get a bit messy and unreadable when a lot of pipe operators are needed. One reason for this unreadability is the imperative nature of these RxJS streams. Therefore, making them declarative by using custom operators makes your streams a lot more clean and readable.

Declarative vs Imperative Programming

Not necessarily specific to Angular or Typescript, the discussion of declarative vs imperative programming is a general discussion abroad all programming languages. In its core it is just about making code more naturally readable with the downside of doing more code extractions. In general, a declarative approach is considered to be more clean, because it is the more readable one. It basically hides a lot of low-level details away through function extractions and defines very explicit and readable function names.
const arr = [1, 2, 3, 4, 5, 6];
const even = [];

for (const item of arr) {
  if (item % 2 === 0) {
    even.push(item);
  }
}

console.log(even); // 2, 4, 6
Imperative style
const arr = [1, 2, 3, 4, 5, 6];

const even = arr.filter(item => isEven(item));

console.log(even); // 2, 4, 6

function isEven(value: number): boolean {
  return value % 2 === 0;
}
Declarative style

Both examples are totally valid and executable code, but the latter is better in terms of readability, because it extracts a lot of the low level functionality into distinct functions that have an explicit naming. But how does this concept translate to RxJS? Isn't all RxJS code by nature declarative? Well yes, but also no...

We use pipe operators and chain them in whichever order, which by itself is declarative, but when more logic is added inside the operators, it tends to be more imperative. In order to prevent the imperative paradigm, we can create custom pipe operators and extract the logic into them.

Custom Pipe Operators

Writing custom operators is a lot easier than one might expect. The simplest operators simply are functions that get an observable and return an observable. Therefore, we could simply return the passed observable and just pipe it and add some operators there. But to be more precise, we should write a higher order function that returns a function which then gets an observable and returns a new observable. This is a lot of meaning packed into one sentence, but don't worry, because the code below will explain this a lot better:

export function log<T>(): (source$: Observable<T>) => Observable<T> {
  return (source$) => source$.pipe(tap(console.log));
}

We use a generic such that the typing is not changed through our custom pipe operator. Then we return an anonymous function that gets the source$ , which basically is just the outer observable, and then return a pipe of it.

Example

This example contains a form control in which the user can put a command for calculating numbers. Such a command could look like these:

  • add,1,2,3,4
  • subtract,1,1,1,2
  • multiply,3,4,5
Before the refactoring the result$ stream is very large and contains all the logic in one place. Therefore it is not easy to read initially without having a closer look. After the refactoring the result$ stream is a lot more declarative because the custom pipe operators hide the implementation details and only tell with their name, what the operator is doing. Hence it is simple to read directly without having to deal with the low-level implementation details.

I. Before

result$ = this.control.valueChanges.pipe(
  debounceTime(800),
  distinctUntilChanged(),
  filter((v): v is string => !!v && typeof v === 'string'),
  map((v) => v.split(',')),
  filter((v) => v.length >= 1),
  map((v) => {
    if (v[0] === 'add') {
      return v
        .slice(1)
        .map((v) => +v)
        .reduce((acc: number, current: number) => {
          return acc + current;
        }, 0);
    } else if (v[0] === 'subtract') {
      return v
        .slice(1)
        .map((v) => +v)
        .reduce((acc: number, current: number) => {
          return acc - current;
        }, 0);
    } else if (v[0] === 'multiply') {
      return v
        .slice(1)
        .map((v) => +v)
        .reduce((acc: number, current: number) => {
          return acc * current;
        }, 1);
    }
    return null;
  })
);

II. After

import {
  assertNumber,
  assertString,
  command,
  lookAhead,
  split,
} from './custom-operators';

[...]

result$ = this.control.valueChanges.pipe(
  lookAhead(),
  assertString(),
  split(),
  command(),
  assertNumber()
);
export function lookAhead<T>(): (source$: Observable<T>) => Observable<T> {
    return (source$) => source$.pipe(debounceTime(800), distinctUntilChanged());
  }
  
  export function assertString(): (
    source$: Observable<unknown>
  ) => Observable<string> {
    return (source$) =>
      source$.pipe(
        filter((value): value is string => !!value && typeof value === 'string')
      );
  }
  
  export function assertNumber(): (
    source$: Observable<unknown>
  ) => Observable<number> {
    return (source$) =>
      source$.pipe(
        filter((value): value is number => !!value && typeof value === 'number')
      );
  }
  
  export function split(): (source$: Observable<string>) => Observable<string[]> {
    return (source$) =>
      source$.pipe(
        map((v) => v.split(',')),
        filter((v) => v.length >= 1)
      );
  }
  
  export function command(): (
    source$: Observable<string[]>
  ) => Observable<number | null> {
    return (source$) =>
      source$.pipe(
        map((v) => {
          if (v[0] === 'add') {
            return v
              .slice(1)
              .map((v) => +v)
              .reduce((acc: number, current: number) => {
                return acc + current;
              }, 0);
          } else if (v[0] === 'subtract') {
            return v
              .slice(1)
              .map((v) => +v)
              .reduce((acc: number, current: number) => {
                return acc - current;
              }, 0);
          } else if (v[0] === 'multiply') {
            return v
              .slice(1)
              .map((v) => +v)
              .reduce((acc: number, current: number) => {
                return acc * current;
              }, 1);
          }
          return null;
        })
      );
  }

GitHub Repository

Comments