RxJS. Переиспользуемые операторы

Существует возможность сгруппировать любые RxJS операторы для дальнейшего повторного использования.

Первый пример

import { of, pipe, range } from 'rxjs'; 
import { filter, map, reduce, take } from 'rxjs/operators';

const filterOutEvens = filter((x: number) => Boolean(x % 2));
const sum = reduce((acc, next) => acc + next, 0);
const doubleBy = x => map((value: number) => value * x);

const complicatedLogic = pipe(
  filterOutEvens,
  doubleBy(2),
  sum
);

range(0,10).pipe(complicatedLogic).subscribe(console.log); // 50

Stackblitz

Второй пример

import { pipe } from "rxjs";
import { debounceTime, distinctUntilChanged } from "rxjs/operators";

const debounceInput = pipe(
  debounceTime<string>(400),
  distinctUntilChanged()
);

valueChanges.pipe(debounceInput)

Третий пример

/**
 * Retry request when error occured
 * @param delayDuration - delay between requests
 * @param retryCount - request count
 */
export function retryRequestOnError<T>(
  retryCount: number = 5,
  delayDuration: number = 2000
): MonoTypeOperatorFunction<T> {
  return retryWhen(errors =>
    concat(
      errors.pipe(
        delay(delayDuration),
        take(retryCount)
      ),
      throwError(new Error(`Retry limit ${retryCount} exceeded`))
    )
  );
}

Дополнение от 18 мая 2019

На недавно прошедшей конференции ngConf Ben Lesh & Tracy Lee представили доклад на схожую тему "How To Build Your Own RxJS Operators

Затрагивается важный момент про читаемость кода.


Похожие записи

RxJS Pipeable Operators

Начиная с версии rxjs 5.5 операторы вместо цепочки вызовов применяются как параметры функции pipe.

RxJs Subjects

Выдержки из доклада Андрея Алексеева (Tinkoff) про RxJs (Subject, Behaviour Subject, Replay Subject, Async Subject). Применение в Angular.

Поисковый запрос с помощью RxJS

Показательная и востребованная задача. Получение набираемого запроса из поля ввода через полсекунды после того, как пользователь закончил ввод с показом лоадера.

Angular. Когда не надо отписываться в RxJS?

В async pipe за вас отпишется Angular. Во всех остальных случаях лучше отписываться самостоятельно. Допускается не отписываться в потоках, где будет гарантировано вызван complete.

RxJS. Delay from array

import { of, from } from 'rxjs'; 
import { map, concatMap, delay } from 'rxjs/operators';

from([2,4,6,8]).pipe(
  concatMap(item => of(item).pipe(delay(1000)))
).subscribe(console.log);