Два слова об RxJS

Два слова об RxJS

RxJS - это библиотека для работы с асинхронными и основанными на событиях программами с использованием наблюдаемых последовательностей.

Библиотека предоставляет основной тип Observable, несколько вспомогательных типов (Observer, Schedulers, Subjects) и операторы работы с событиями как с коллекциями (map, filter, reduce, every и подобные из JavaScript Array).

RxJS это Lodash для событий.

Чтобы просто попробовать rxjs достаточно подключить его с cdn или создать RxJS проект в https://stackblitz.com/

<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>

Библиотека будет доступна через глобальный объект rxjs

Несколько сравнительных примеров

Чистый JS

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

RxJS с Observable

var button = document.querySelector('button');
rxjs.fromEvent(button, 'click')
    .subscribe(() => console.log('Clicked!'));

Чистые функции

RxJS даёт возможность писать код, используя чистые функции. В этом случае код менее подвержен ошибкам.

Vanilla JS + нечистая функция

var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${++count} times`));

RxJS + чистая фукнкция = изолированное состояние

var button = document.querySelector('button');
rxjs.fromEvent(button, 'click').pipe(
    rxjs.operators.scan(count => count + 1, 0)
).subscribe(count => console.log(`Clicked ${count} times`));

Оператор scan работает так же, как reduce у массивов.

Observable

Observable - наблюдаемый объект. Нечто близкое к Iterable - итерируемый объект. Observable - объект, за которым можно наблюдать и как-то реагировать, если он меняется. Observable - это как множественный Promise.

# SingleMultiple
Pull FunctionIterator
Push PromiseObservable

В примерах выше наблюдаемый объект создавался из элемента HTML документа и события click. Но его можно создать, например, просто из массива, тогда вместо кликов будут обрабатываться элементы массива.

Примеры создания Observable

// Из одного или нескольких значений
rxjs.of('foo', 'bar');

// Из массива
rxjs.from([1,2,3]);

// Из события
rxjs.fromEvent(document.querySelector('button'), 'click');

// Из обещания
rxjs.fromPromise(fetch('/users'));

// Из колбека (колбек должен быть последним аргументом)
// fs.exists = (path, cb(exists))
var exists = rxjs.bindCallback(fs.exists);
exists('file.txt').subscribe(exists => console.log('Does file exist?', exists));

// Из колбека (колбек должен быть последним аргументом)
// fs.rename = (pathA, pathB, cb(err, result))
var rename = rxjs.bindNodeCallback(fs.rename);
rename('file.txt', 'else.txt').subscribe(() => console.log('Renamed!'));

Создания Observable вручную

Наблюдаемый объект получает значения 1, 2, 3 сразу (синхронно) при подписке, и значение 4 через одну секунду.

var observable = rxjs.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

Чтобы увидеть эти значения, необходимо вызвать Observable и подписаться на него.

var observable$ = rxjs.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable$.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

Результат в консоле

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

Чтобы положить значение в Observable снаружи, то есть не внутри функции create надо использовать вспомогательный тип Subject и его метод next().

Subject

Subject это особый тип Observable, который позволяет не только посылать что-то из своего потока, но и принимать к себе.

  • На Subject можно подписываться как на Observable
  • Subject сам может подписываться на другие Observable.

Есть важная особенность. Когда Subject что-то посылает, все подписчики получают одни и те же данные.

Лучше посмотреть на примерах.

Отличие RxJS Subjects от Observable на примерах

Чистый Observable.

var observable = rxjs.Observable.create(function(source) {
  source.next(Math.random());
});

observable.subscribe(v => console.log('consumer A: ' + v));
observable.subscribe(v => console.log('consumer B: ' + v));

/* Prints DIFFERENT values for both consumers */
// consumer A: 0.25707833297857885
// consumer B: 0.8304769607422662

Observable обернутый в Subject.

var observable = rxjs.Observable.create(function(source) {
  source.next(Math.random());
});

var subject = new rxjs.Subject();
 new rxjs.Subject();

subject.subscribe(v => console.log('consumer A: ' + v));
subject.subscribe(v => console.log('consumer B: ' + v));

observable.subscribe(subject);

/* Prints SAME values for both consumers */
// consumer A: 0.8495447073368834
// consumer B: 0.8495447073368834

P.S. Зачем колбеки?

Когда я первый раз столкнулся с RxJS, я не мог понять зачем обязательно использовать колбеки. Неужели нельзя передать переменную, в которую вернётся результат. Что-то вроде такого.

let observable$ = rxjs.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
});
let value = observable$.get(); // и пришло бы 1

Это бессмысленно. Observable работает по-другому. Если бы там и реализовали нечто подобное (а его реализовали и называется оно BehaviorSubject), то оно бы вернуло 3, а не 1. А вот 1 и 2 как раз бы потерялись. А если добавить 4 через секунду, то и 3 бы потерялось, поэтому переменные и бессмылсенны, а коллбеки позволяют обработать весь поток.

Для хорошего понимания RxJS рекомендую посмотреть видео от Андрея Алексеева


Интерактивное обучение RxJs на фруктах

Несколько хороших ссылок по RxJS (Всё на англицком)

Не RxJS, но заслуживает внимания Анимированные карточки для изучения реактивного программирования

Похожие записи

RxJS Pipeable Operators

Начиная с версии rxjs 5.5 операторы вместо цепочки вызовов применяются как параметры функции pipe.

+function ($) { "use strict"; }(window.jQuery);

  • объявляется IIFE (немедленно выполняемая функция-выражение)
  • в функцию передаётся объект библиотеки jQuery, которая внутри будет доступна через переменную с именем $
  • включается «строгий режим»

RxJS. Delay from array

import { of, from } from 'rxjs'; 
import { map, concatMap, delay } from 'rxjs/operators';

from([2,4,6,8]).pipe(
  concatMap(item => of(item).pipe(delay(1000)))
).subscribe(console.log);