Два слова об RxJS

Два слова об RxJS

Это ликбез, а не руководство по библиотке.

RxJS - это библиотека для работы с асинхронными и основанными на событиях программами с использованием наблюдаемых последовательностей.

Библиотека предоставляет основной тип Observable, несколько вспомогательных типов (Observer, Schedulers, Subjects) и операторы работы с событиями как с коллекциями (map, filter, reduce, every и подобные из JavaScript Array).

RxJS это Lodash для событий.

Несколько сравнительных примеров

Чистый JS

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

RxJS с Observable

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .subscribe(() => console.log('Clicked!'));

Чистота

RxJS даёт возможность писать код, используя чистые функции. В этом случае код менее подвержен ошибкам.

Vanilla JS + нечистая функция

var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${++count} times`));

RxJS + чистая фукнкция = изолированное состояние

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .scan(count => count + 1, 0)
  .subscribe(count => console.log(`Clicked ${count} times`));

Оператор scan работает так же, как reduce у массивов.

Observable

Observable - наблюдаемый объект. Нечто близкое к Iterable - итерируемый объект. Observable - объект, за которым можно наблюдать и как-то реагировать, если он меняется. Observable - это как множественный Promise.

# SingleMultiple
Pull FunctionIterator
Push PromiseObservable

В примерах выше наблюдаемый объект создавался из элемента HTML документа и события click. Но его можно создать, например, просто из массива, тогда вместо кликов будут обрабатываться элементы массива.

Примеры создания Observable

// Из одного или нескольких значений
Rx.Observable.of('foo', 'bar');

// Из массива
Rx.Observable.from([1,2,3]);

// Из события
Rx.Observable.fromEvent(document.querySelector('button'), 'click');

// Из обещания
Rx.Observable.fromPromise(fetch('/users'));

// Из колбека (колбек должен быть последним аргументом)
// fs.exists = (path, cb(exists))
var exists = Rx.Observable.bindCallback(fs.exists);
exists('file.txt').subscribe(exists => console.log('Does file exist?', exists));

// Из колбека (колбек должен быть последним аргументом)
// fs.rename = (pathA, pathB, cb(err, result))
var rename = Rx.Observable.bindNodeCallback(fs.rename);
rename('file.txt', 'else.txt').subscribe(() => console.log('Renamed!'));

Создания Observable вручную

Наблюдаемый объект получает значения 1, 2, 3 сразу (синхронно) при подписке, и значение 4 через одну секунду.

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

Чтобы увидеть эти значения, необходимо вызвать Observable и подписаться на него.

var observable$ = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable$.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

Результат в консоле

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

Чтобы положить значение в Observable снаружи, то есть не внутри функции create надо использовать вспомогательный тип Subject и его метод onNext().

Subject

Subject это особый тип Observable, который позволяет не только посылать что-то из своего потока, но и принимать к себе.

  • На Subject можно подписываться как на Observable
  • Subject сам может подписываться на другие Observable.

Есть важная особенность. Когда Subject что-то посылает, все подписчики получают одни и те же данные.

Лучше посмотреть на примерах.

Отличие RxJS Subjects от Observable на примерах

Чистый Observable.

var observable = Rx.Observable.create(function(source) {
  source.next(Math.random());
});

observable.subscribe(v => console.log('consumer A: ' + v));
observable.subscribe(v => console.log('consumer B: ' + v));

/* Prints DIFFERENT values for both consumers */
// consumer A: 0.25707833297857885
// consumer B: 0.8304769607422662

Observable обернутый в Subject.

var observable = Rx.Observable.create(function(source) {
  source.next(Math.random());
});

var subject = new Rx.Subject();

subject.subscribe(v => console.log('consumer A: ' + v));
subject.subscribe(v => console.log('consumer B: ' + v));

observable.subscribe(subject);

/* Prints SAME values for both consumers */
// consumer A: 0.8495447073368834
// consumer B: 0.8495447073368834

P.S. Зачем колбеки?

Когда я первый раз столкнулся с RxJS, я не мог понять зачем обязательно использовать колбеки. Неужели нельзя передать переменную, в которую вернётся результат. Что-то вроде такого.

let observable$ = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
});
let value = observable$.get(); // и пришло бы 1

Это бессмысленно. Observable работает по-другому. Если бы там и реализовали нечто подобное, то оно бы вернуло 3, а не 1. А вот 1 и 2 как раз бы потерялись. А если добавить 4 через секунду, то и 3 бы потерялось, поэтому переменные и бессмылсенны, а коллбеки позволяют обработать весь поток.


Несколько хороших ссылок по RxJS (Всё на англицком)

Не RxJS, но заслуживает внимания Анимированные карточки для изучения реактивного программирования

Похожие записи

Axios или fetch

Сравнение на примере GET/POST запросов, обработке ошибок и возможности задавать базовую конфигурацию.