RxJs Subjects

Для изучению RxJs Subjects рекомендую доклад Андрея Алексеева из Tinkoff. Я сам, как преподаватель, могу сказать, что Алексей рассказвает весьма профессионально: просто и интересно, на основе грамотно подготовленного материала. Он ведёт курс по фронтенду в Tinkoff Fintech School.

В докладе вся теория по Subjects и практика их применения в Angular-разработке.

Примеры из видео

Несколько презентаций от автора:

// Про Observable

  • Холодные Observable создают независимые потоки под каждую подписку. Подписчики получают разные значения.
  • Горячие Observable разделяют поток друг с другом. Подписчики получают одинаковые значения.

Subject

Объединение Observable и Observer. Передает значение всем подписчикам.

Behaviour Subject (value)

Один из основных Subject'ов

Всегда хранит текущее значение. При инициализации требует начальное значение. При подписке наблюдатель незамедлительно получает текущее значение.

Replay Subject (buffer)

Cохраняет буфер произошедших событий. При подписке наблюдатель незамедлительно получит текущий буфер. Нет начального значения, как в Behaviour Subject.

Async Subject

Передает только последнее значение и только после завершения. Похож на Promise.

Приминение 1. Инкапсуляция Subject

Рассмотрим простой сервис:

import {Injectable} from "@angular/core";

export enum AuthState {
  LOGGED,
  NONE
}

@Injectable()
export class StateService {
  /**
   * Сервис авторизации с внутренним состоянием
   */
  private authState: AuthState = AuthState.NONE;

  /**
   * Получить текущее состояние
   */
  getCurrentState(): AuthState {
    return this.authState;
  }

  /**
   * Установить состояние
   */
  setAuthState(state: AuthState) {
    this.authState = state;
  }
}

При использовании такого сервиса необходимо перед каждой операцией вручную проверять состояние через метод getCurrentState(). Этот сервис можно переписать в реактивном стиле:

import {Injectable} from "@angular/core";
import {BehaviorSubject} from "rxjs/BehaviorSubject";
import {Observable} from "rxjs/Observable";

export enum AuthState {
  LOGGED,
  NONE
}

@Injectable()
export class StateService {
  /**
   * Создаем сервис авторизации с внутренним состоянием через BehaviorSubject
   * Изначально пользователь авторизован.
   *
   * Поскольку authSubject имеет статус private, он недоступен для внешнего мира —
   * изменить его можно только методами сервиса. Таким образом никто извне не сможет завершить authSubject или
   * записать в authSubject другой объект.
   */
  private authSubject = new BehaviorSubject<AuthState>(AuthState.NONE);

  /**
   * Получить Observable состояний, чтобы реагировать на изменения.
   */
  getStateChange(): Observable<AuthState> {
    return this.authSubject.asObservable();
  }

  /**
   * Получить текущее состояние
   */
  getCurrentState(): AuthState {
    return this.authSubject.getValue();
  }

  /**
   * Установить состояние
   */
  setAuthState(state: AuthState) {
    this.authSubject.next(state);
  }
}

При использовании сервиса с Subject достаточно 1 раз подписаться на getStateChange() и всегда иметь актуальное состояние авторизации.

Приминение 2. Unsubsribe через Subject

import { Component, OnInit } from "@angular/core";
import { Observable } from "rxjs/Observable";
import { getRandomColor } from "../../shared/utils";
import { Subject } from "rxjs/Subject";

@Component({
  selector: "app-fixed-subject",
  templateUrl: "./fixed-subject.component.html",
  styleUrls: ["./fixed-subject.component.css"]
})
export class FixedSubjectComponent implements OnInit {
  private color = getRandomColor();
  /**
   * Создаем специальный сабджект для отписки
   */
  private destroyStream = new Subject<void>();

  constructor() { }

  ngOnInit() {
    /**
     * Оператор takeUntil автоматически завершит поток, когда в его аргумент прийдет значение
     */
    Observable.interval(200)
      .take(20)
      .takeUntil(this.destroyStream)
      .subscribe((value) => {
        console.log("%c " + value, `color: ${this.color}`);
      });
  }

  /**
   * передаем значение в destroyStream
   */
  ngOnDestroy() {
    this.destroyStream.next();
  }

}

Похожие записи

Angular. Manually retry http request

На память. Некоторое время назад я решил достаточно необычную задачу, но в последствии на backend`е переделали логику и код был удалён из проекта.

RxJS. Delay from array

import { of, from } from 'rxjs'; 
import { map, concatMap, delay } from 'rxjs/operators';

from([2,4,6,8]).pipe(
  concatMap(item => of(item).pipe(delay(1000)))
).subscribe(console.log);