Это ликбез, а не руководство по библиотке.

RxJS - это библиотека для работы с асинхронными и основанными на событиях программами с использованием наблюдаемых последовательностей.

Библиотека предоставляет основной тип Observable, несколько вспомогательных типов (Observer, Schedulers, Subjects) и операторы работы с событиями как с коллекциями (map, filter, reduce, every и подобные из JavaScript Array).

RxJS это Lodash для событий.

Несколько сравнительных примеров

Чистый JS

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

RxJS с Observable

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .subscribe(() => console.log('Clicked!'));

Чистота

Что делает RxJS действительно крутым - это возможность писать код, используя чистые функции. В этом случае код менее подвержен ошибкам.

Чистый JS + нечистая функция

var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked ${++count} times`));

RxJS + чистая фукнкция = изолированное состояние

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .scan(count => count + 1, 0)
  .subscribe(count => console.log(`Clicked ${count} times`));

Оператор scan работает так же, как reduce у массивов.

Observable

Observable - наблюдаемый объект. Нечто близкое к Iterable - итерируемый объект. Observable - объект, за которым можно наблюдать и как-то реагировать, если он меняется. Observable - это как множественный Promise.

# Single Multiple
Pull Function Iterator
Push Promise Observable

В примерах выше наблюдаемый объект создавался из элемента HTML документа и события click. Но его можно создать, например, просто из массива, тогда вместо кликов будут обрабатываться элементы массива.

Примеры создания Observable

// Из одного или нескольких значений
Rx.Observable.of('foo', 'bar');

// Из массива
Rx.Observable.from([1,2,3]);

// Из события
Rx.Observable.fromEvent(document.querySelector('button'), 'click');

// Из обещания
Rx.Observable.fromPromise(fetch('/users'));

// Из колбека (колбек должен быть последним аргументом)
// fs.exists = (path, cb(exists))
var exists = Rx.Observable.bindCallback(fs.exists);
exists('file.txt').subscribe(exists => console.log('Does file exist?', exists));

// Из колбека (колбек должен быть последним аргументом)
// fs.rename = (pathA, pathB, cb(err, result))
var rename = Rx.Observable.bindNodeCallback(fs.rename);
rename('file.txt', 'else.txt').subscribe(() => console.log('Renamed!'));

Создания Observable вручную

Наблюдаемый объект получает значения 1, 2, 3 сразу (синхронно) при подписке, и значение 4 через одну секунду.

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

Чтобы увидеть эти значения, необходимо вызвать Observable и подписаться на него.

var observable$ = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable$.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

Результат в консоле

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

Чтобы положить значение в Observable снаружи, то есть не внутри функции create надо использовать вспомогательный тип Subject и его метод onNext().

Subject

Subject это особый тип Observable, который позволяет не только посылать что-то из своего потока, но и принимать к себе.

  • На Subject можно подписываться как на Observable
  • Subject сам может подписываться на другие Observable.

Есть важная особенность. Когда Subject что-то посылает, все подписчики получают одни и те же данные.

Лучше посмотреть на примерах.

Отличие RxJS Subjects от Observable на примерах

Чистый Observable.

var observable = Rx.Observable.create(function(source) {
  source.next(Math.random());
});

observable.subscribe(v => console.log('consumer A: ' + v));
observable.subscribe(v => console.log('consumer B: ' + v));

/* Prints DIFFERENT values for both consumers */
// consumer A: 0.25707833297857885
// consumer B: 0.8304769607422662

Observable обернутый в Subject.

var observable = Rx.Observable.create(function(source) {
  source.next(Math.random());
});

var subject = new Rx.Subject();

subject.subscribe(v => console.log('consumer A: ' + v));
subject.subscribe(v => console.log('consumer B: ' + v));

observable.subscribe(subject);

/* Prints SAME values for both consumers */
// consumer A: 0.8495447073368834
// consumer B: 0.8495447073368834

P.S. Зачем колбеки?

Когда я первый раз столкнулся с RxJS, я не мог понять зачем обязательно использовать колбеки. Неужели нельзя передать переменную, в которую вернётся результат. Что-то вроде такого.

let observable$ = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
});
let value = observable$.get(); // и пришло бы 1

Это бессмысленно. Observable работает по-другому. Если бы там и реализовали нечто подобное, то оно бы вернуло 3, а не 1. А вот 1 и 2 как раз бы потерялись. А если добавить 4 через секунду, то и 3 бы потерялось, поэтому переменные и бессмылсенны, а коллбеки позволяют обработать весь поток.


Несколько хороших ссылок по RxJS (Всё на англицком)

Не RxJS, но заслуживает внимания Анимированные карточки для изучения реактивного программирования