Observer Pattern

观察者模式定义

观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。

我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:

  • 期刊出版方 - 负责期刊的出版和发行工作

  • 订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知

在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。

观察者模式结构

观察者模式实战

Subject 类定义

class Subject {
    
    constructor() {
        this.observerCollection = [];
    }
    
    addobserver(observer) { // 添加观察者
        this.observerCollection.push(observer);
    }
    
    deleteObserver(observer) { // 移除观察者
        let index = this.observerCollection.indexOf(observer);
        if(index >= 0) this.observerCollection.splice(index,1);
    }
    
    notifyObservers() { // 通知观察者
        this.observerCollection.forEach((observer)=>observer.notify());
    }
}

Observer 类定义

class Observer {
    constructor(name) {
        this.name = name;
    }
    
    notify() {
        console.log(`${this.name} has been notified.`);
    }
}

使用示例

let subject = new Subject(); // 创建主题对象

let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker'
let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo'

subject.addobserver(observer1); // 注册观察者A
subject.addobserver(observer2); // 注册观察者B
 
subject.notifyObservers(); // 通知观察者

subject.deleteObserver(observer1); // 移除观察者A

subject.notifyObservers(); // 验证是否成功移除

以上代码成功运行后控制台的输出结果:

semlinker has been notified.
lolo has been notified.
lolo has been notified.

Observable subscribe

在介绍 RxJS - Subject 之前,我们先来看个示例:

const interval$ = Rx.Observable.interval(1000).take(3);

interval$.subscribe({
  next: value => console.log('Observer A get value: ' + value);
});

setTimeout(() => {
  interval$.subscribe({
      next: value => console.log('Observer B get value: ' + value);
  });
},1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 0
Observer A get value: 2
Observer B get value: 1
Observer B get value: 2

通过以上示例,我们可以得出以下结论:

  • Observable 对象可以被重复订阅

  • Observable 对象每次被订阅后,都会重新执行

上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:

function interval() {
  setInterval(() => console.log('..'),1000);
}

interval();

setTimeout(() => {
  interval();
},1000);

Observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播 (multicast),那我们要怎么实现呢 ?回想一下我们刚才介绍过观察者模式,你脑海中是不是已经想到方案了。没错,我们可以通过自定义 Subject 来实现上述功能。

自定义 Subject

Subject 类定义

class Subject {   
    constructor() {
        this.observers = [];
    }
    
    addobserver(observer) { 
        this.observers.push(observer);
    }
    
    next(value) {  
        this.observers.forEach(o => o.next(value));    
    }
   
    error(error){ 
        this.observers.forEach(o => o.error(error));
    }
    
    complete() {
        this.observers.forEach(o => o.complete());
    }
}

使用示例

const interval$ = Rx.Observable.interval(1000).take(3);
let subject = new Subject();

let observerA = {
    next: value => console.log('Observer A get value: ' + value),error: error => console.log('Observer A error: ' + error),complete: () => console.log('Observer A complete!')
};

var observerB = {
    next: value => console.log('Observer B get value: ' + value),error: error => console.log('Observer B error: ' + error),complete: () => console.log('Observer B complete!')
};

subject.addobserver(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
   subject.addobserver(observerB); // 添加观察者B
},1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer B get value: 1
Observer A get value: 2
Observer B get value: 2
Observer A complete!
Observer B complete!

通过自定义 Subject,我们实现了前面提到的功能。接下来我们进入正题 - RxJS Subject。

RxJS Subject

首先我们通过 RxJS Subject 来重写一下上面的示例:

const interval$ = Rx.Observable.interval(1000).take(3);
let subject = new Rx.Subject();

let observerA = {
    next: value => console.log('Observer A get value: ' + value),complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
   subject.subscribe(observerB); // 添加观察者B
},1000);

RxJS Subject 源码片段

/**
 * Suject继承于Observable 
 */
export class Subject extends Observable {
    constructor() {
        super();
        this.observers = []; // 观察者列表
        this.closed = false;
        this.isstopped = false;
        this.hasError = false;
        this.thrownError = null;
    }
  
    next(value) {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        if (!this.isstopped) {
            const { observers } = this;
            const len = observers.length;
            const copy = observers.slice();
            for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
                copy[i].next(value);
            }
        }
    }
  
    error(err) {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        this.hasError = true;
        this.thrownError = err;
        this.isstopped = true;
        const { observers } = this;
        const len = observers.length;
        const copy = observers.slice();
        for (let i = 0; i < len; i++) { // 循环调用观察者error方法
            copy[i].error(err);
        }
        this.observers.length = 0;
    }
  
    complete() {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        this.isstopped = true;
        const { observers } = this;
        const len = observers.length;
        const copy = observers.slice();
        for (let i = 0; i < len; i++) { // 循环调用观察者complete方法
            copy[i].complete();
        }
        this.observers.length = 0; // 清空内部观察者列表
    }
}

通过 RxJS Subject 示例和源码片段,对于 Subject 我们可以得出以下结论:

  • Subject 既是 Observable 对象,又是 Observer 对象

  • 当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast)

Angular 2 RxJS Subject 应用

在 Angular 2 中,我们可以利用 RxJS Subject 来实现组件通信,具体示例如下:

message.service.ts

import { Injectable } from '@angular/core';
import {Observable} from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';

@Injectable()
export class MessageService {
    private subject = new Subject<any>();

    sendMessage(message: string) {
        this.subject.next({ text: message });
    }

    clearMessage() {
        this.subject.next();
    }

    getMessage(): Observable<any> {
        return this.subject.asObservable();
    }
}

home.component.ts

import { Component } from '@angular/core';

import { MessageService } from '../_services/index';

@Component({
    moduleId: module.id,templateUrl: 'home.component.html'
})

export class HomeComponent {
    constructor(private messageService: MessageService) {}
    
    sendMessage(): void { // 发送消息
        this.messageService.sendMessage('Message from Home Component to App Component!');
    }

    clearMessage(): void { // 清除消息
        this.messageService.clearMessage();
    }
}

app.component.ts

import { Component,OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs/Subscription';

import { MessageService } from './_services/index';

@Component({
    moduleId: module.id,selector: 'app',templateUrl: 'app.component.html'
})

export class AppComponent implements OnDestroy {
    message: any;
    subscription: Subscription;

    constructor(private messageService: MessageService) {
        this.subscription = this.messageService.getMessage()
              .subscribe(message => { this.message = message; });
    }

    ngOnDestroy() {
        this.subscription.unsubscribe();
    }
}

以上示例实现的功能是组件之间消息通信,即 HomeComponent 子组件,向 AppComponent 父组件发送消息。代码运行后,浏览器的显示结果如下:

Plunker 示例

Subject 存在的问题

因为 Subject 在订阅时,是把 observer 存放到观察者列表中,并在接收到新值的时候,遍历观察者列表并调用观察者上的 next 方法,具体如下:

next(value) {
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        if (!this.isstopped) {
            const { observers } = this;
            const len = observers.length;
            const copy = observers.slice();
            for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
                copy[i].next(value);
            }
        }
}

这样会有一个大问题,如果某个 observer 在执行时出现异常,却没进行异常处理,就会影响到其它的订阅者,具体示例如下:

const source = Rx.Observable.interval(1000);
const subject = new Rx.Subject();

const example = subject.map(x => {
    if (x === 1) {
        throw new Error('oops');
    }
    return x;
});
subject.subscribe(x => console.log('A',x));
example.subscribe(x => console.log('B',x));
subject.subscribe(x => console.log('C',x));

source.subscribe(subject);

以上代码运行后,控制台的输出结果:

A 0
B 0
C 0
A 1
Rx.min.js:74 Uncaught Error: oops

JSBin - Subject Problem Demo

在代码运行前,大家会认为观察者B 会在接收到 1 值时抛出异常,观察者 A 和 C 仍会正常运行。但实际上,在当前的 RxJS 版本中若观察者 B 报错,观察者 A 和 C 也会停止运行。那么应该如何解决这个问题呢?目前最简单的方式就是为所有的观察者添加异常处理,更新后的代码如下:

const source = Rx.Observable.interval(1000);
const subject = new Rx.Subject();

const example = subject.map(x => {
    if (x === 1) {
        throw new Error('oops');
    }
    return x;
});

subject.subscribe(
    x => console.log('A',x),error => console.log('A Error:' + error)
);
    
example.subscribe(
    x => console.log('B',error => console.log('B Error:' + error)
);

subject.subscribe(
    x => console.log('C',error => console.log('C Error:' + error)
);

source.subscribe(subject);

JSBin - RxJS Subject Problem Solved Demo

RxJS Subject & Observable

Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,Subject 对象会把订阅者添加到观察者列表中,每当有 subject 对象接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next() 方法,把值一一送出。

Subject 之所以具有 Observable 中的所有方法,是因为 Subject 类继承了 Observable 类,在 Subject 类中有五个重要的方法:

  • next - 每当 Subject 对象接收到新值的时候,next 方法会被调用

  • error - 运行中出现异常,error 方法会被调用

  • complete - Subject 订阅的 Observable 对象结束后,complete 方法会被调用

  • subscribe - 添加观察者

  • unsubscribe - 取消订阅 (设置终止标识符、清空观察者列表)

BehaviorSubject

BehaviorSubject 定义

BehaviorSubject 源码片段

export class BehaviorSubject extends Subject {
    constructor(_value) { // 设置初始值
        super();
        this._value = _value;
    }
    get value() { // 获取当前值
        return this.getValue();
    }
    _subscribe(subscriber) {
        const subscription = super._subscribe(subscriber);
        if (subscription && !subscription.closed) {
            subscriber.next(this._value); // 为新的订阅者发送当前最新的值
        }
        return subscription;
    }
    getValue() {
        if (this.hasError) {
            throw this.thrownError;
        }
        else if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        else {
            return this._value;
        }
    }
    next(value) { // 调用父类Subject的next方法,同时更新当前值
        super.next(this._value = value);
    }
}

BehaviorSubject 应用

有些时候我们会希望 Subject 能保存当前的最新状态,而不是单纯的进行事件发送,也就是说每当新增一个观察者的时候,我们希望 Subject 能够立即发出当前最新的值,而不是没有任何响应。具体我们先看一下示例:

var subject = new Rx.Subject();

var observerA = {
    next: value => console.log('Observer A get value: ' + value),complete: () => console.log('Observer B complete!')
};

subject.subscribe(observerA);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  subject.subscribe(observerB); // 1秒后订阅
},1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3

通过输出结果,我们发现在 observerB 订阅 Subject 对象后,它再也没有收到任何值了。因为 Subject 对象没有再调用 next() 方法。但很多时候我们会希望 Subject 对象能够保存当前的状态,当新增订阅者的时候,自动把当前最新的值发送给订阅者。要实现这个功能,我们就需要使用 BehaviorSubject。

BehaviorSubject 跟 Subject 最大的不同就是 BehaviorSubject 是用来保存当前最新的值,而不是单纯的发送事件。BehaviorSubject 会记住最近一次发送的值,并把该值作为当前值保存在内部的属性中。接下来我们来使用 BehaviorSubject 重新一下上面的示例:

var subject = new Rx.BehaviorSubject(0); // 设定初始值

var observerA = {
    next: value => console.log('Observer A get value: ' + value),1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 0
Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 3

JSBin - BehaviorSubject

ReplaySubject

ReplaySubject 定义

ReplaySubject 源码片段

export class ReplaySubject extends Subject {
    constructor(bufferSize = Number.POSITIVE_INFINITY,windowTime = Number.POSITIVE_INFINITY,scheduler) {
        super();
        this.scheduler = scheduler;
        this._events = []; // ReplayEvent对象列表
        this._bufferSize = bufferSize < 1 ? 1 : bufferSize; // 设置缓冲区大小
        this._windowTime = windowTime < 1 ? 1 : windowTime;
    }
  
    next(value) {
        const Now = this._getNow();
        this._events.push(new ReplayEvent(Now,value));
        this._trimBufferThenGetEvents();
        super.next(value);
    }
  
  _subscribe(subscriber) {
        const _events = this._trimBufferThenGetEvents(); // 过滤ReplayEvent对象列表
        let subscription;
        if (this.closed) {
            throw new ObjectUnsubscribedError();
        }
        ...
        else {
            this.observers.push(subscriber);
            subscription = new SubjectSubscription(this,subscriber);
        }
          ...
        const len = _events.length;
        // 重新发送设定的最后bufferSize个值
        for (let i = 0; i < len && !subscriber.closed; i++) {
            subscriber.next(_events[i].value);
        }
        ...
        return subscription;
    }
}

class ReplayEvent {
    constructor(time,value) {
        this.time = time;
        this.value = value;
    }
}

ReplaySubject 应用

有些时候我们希望在 Subject 新增订阅者后,能向新增的订阅者重新发送最后几个值,这时我们就可以使用 ReplaySubject ,具体示例如下:

var subject = new Rx.ReplaySubject(2); // 重新发送最后2个值

var observerA = {
    next: value => console.log('Observer A get value: ' + value),1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 2
Observer B get value: 3

可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放。

JSBin - ReplaySubject

AsyncSubject

AsyncSubject 定义

AsyncSubject 源码片段

export class AsyncSubject extends Subject {
    constructor() {
        super(...arguments);
        this.value = null;
        this.hasNext = false;
        this.hasCompleted = false; // 标识是否已完成
    }
    _subscribe(subscriber) {
        if (this.hasError) {
            subscriber.error(this.thrownError);
            return Subscription.EMPTY;
        }
        else if (this.hasCompleted && this.hasNext) { // 等到完成后,才发出最后的值
            subscriber.next(this.value);
            subscriber.complete();
            return Subscription.EMPTY;
        }
        return super._subscribe(subscriber);
    }
    next(value) {
        if (!this.hasCompleted) { // 若未完成,保存当前的值
            this.value = value;
            this.hasNext = true;
        }
    }
}

AsyncSubject 应用

AsyncSubject 类似于 last 操作符,它会在 Subject 结束后发出最后一个值,具体示例如下:

var subject = new Rx.AsyncSubject();

  var observerA = {
    next: value => console.log('Observer A get value: ' + value),complete: () => console.log('Observer A complete!')
  };

  var observerB = {
    next: value => console.log('Observer B get value: ' + value),complete: () => console.log('Observer B complete!')
  };

  subject.subscribe(observerA);

  subject.next(1);
  subject.next(2);
  subject.next(3);

  subject.complete();

  setTimeout(() => {
    subject.subscribe(observerB); // 1秒后订阅
  },1000);

以上代码运行后,控制台的输出结果:

Observer A get value: 3
Observer A complete!
Observer B get value: 3
Observer B complete!

JSBin - AsyncSubject

参考资源

  • Understanding Subjects in RxJS

  • 30 天精通 RxJS (22) - 什么是 Subject

  • Communicating Between Components with Observable & Subject

RxJS - Subject的更多相关文章

  1. ios – Objective-C管理观察者的设计模式

    有一种方法来检测UI控制器是否由父控制器发布,而不使用viewWilldisappear方法?有最好的做法来解决这种情况吗?

  2. Swift-ReactiveCocoa3.0二SignalProducer 2

    lift运算符|>内部也是调用了lift方法,作用是把原producer的结果transform完返回新的类型/值,再封装成新的producer返回。只有在第一个producer销毁后才会响应第二个producer。之后,每当其中一个再sendNext,都会在next回调zipwith压缩两个信号,每当两个都sendNext一次才回在next回调一次。例子:sampleOn采样,当sampleOn的信号sendNext一次,就取一次producer1的最新一次sendNext的值进行next回调takeu

  3. 如何在Swift中调用C函数

    “,选择Yes,创建SwiftCallC-Bridging-Header.h文件给工程建立一个C语言文件。跟上述步骤3类似,只不过这里选择的是C文件,这里的文件取名为CFile.c,同时自动生成CFile.h文件开始编写代码。在SwiftCallC-Bridging-Header.h文件中声明C函数,这里是voidcallCDemo()在CFile.c中定义这个函数在main.swift中调用这个C函数编译运行

  4. Swift--UINavigationController

    代码目录AppDelegate.swiftViewController.swiftNext.swift效果图

  5. Swift设计模式之观察者模式

    转自Swift设计模式原文Design-Patterns-In-Swift

  6. Swift如何取得View所属的ViewController

    从VC取得View很容易,但有些情况下我们需要从View反向获取VC.不过在一些特殊的场合,Cocoa库帮我们想的很周到,比如在自定义view过渡动画的时候:系统在回调我们的animateTransition方法时,会传入一个context参数,从它我们可以轻松取得参与动画的toView,fromView以及它们对应的VC:但不是所有情况系统都会帮你考虑的这么周到,所以有时候还得需要自己从View

  7. 观察者模式 swift

    在MVC里,观察者模式意味着需要允许Model对象和View对象进行交流,而不能有直接的关联。Cocoa使用两种方式实现了观察者模式:Notification和Key-ValueObserving。Apple对于通知的使用很频繁,比如当键盘弹出或者收起的时候,系统会分别发送UIKeyboardWillShowNotification/UIKeyboardWillHideNotification的通知。在LibaratyAPI.swift里加上取消订阅的代码:deinit{NSNotificationCen

  8. RxSwift使用教程大全 韩俊强的博客

    记录大多数ReactiveX的概念和操作符。我们还需要使用KVO来检测变量的值改变。Rx就是为解决这些问题而生的。Observable理解RxSwift的关键是理解Observable的概念。使用variable的好处是variable将不会显式的发送Error或者Completed。

  9. KVO原理分析及使用进阶

    本篇文章对KVO的实现原理进行了详细的分析,并且简单的实现了一个KVO,来当做技术交流。概述KVO全称keyvalueObserving,是苹果提供的一套事件通知机制。由于KVO的实现机制,所以对属性才会发生作用,一般继承自NSObject的对象都默认支持KVO。实际应用KVO主要用来做键值观察操作,想要一个值发生改变后通知另一个对象,则用KVO实现最为合适。下面是KVO前后打印的关键信息,我们在下面做详细分析。

  10. swift – 全局函数序列(state:next :)和类型推断

    3,让seq2=4…

随机推荐

  1. Angular2 innerHtml删除样式

    我正在使用innerHtml并在我的cms中设置html,响应似乎没问题,如果我这样打印:{{poi.content}}它给了我正确的内容:``但是当我使用[innerHtml]=“poi.content”时,它会给我这个html:当我使用[innerHtml]时,有谁知道为什么它会剥离我的样式Angular2清理动态添加的HTML,样式,……

  2. 为Angular根组件/模块指定@Input()参数

    我有3个根组件,由根AppModule引导.你如何为其中一个组件指定@input()参数?也不由AppModalComponent获取:它是未定义的.据我所知,你不能将@input()传递给bootstraped组件.但您可以使用其他方法来做到这一点–将值作为属性传递.index.html:app.component.ts:

  3. angular-ui-bootstrap – 如何为angular ui-bootstrap tabs指令指定href参数

    我正在使用角度ui-bootstrap库,但我不知道如何为每个选项卡指定自定义href.在角度ui-bootstrap文档中,指定了一个可选参数select(),但我不知道如何使用它来自定义每个选项卡的链接另一种重新定义问题的方法是如何使用带有角度ui-bootstrap选项卡的路由我希望现在还不算太晚,但我今天遇到了同样的问题.你可以通过以下方式实现:1)在控制器中定义选项卡href:2)声明一个函数来改变控制器中的散列:3)使用以下标记:我不确定这是否是最好的方法,我很乐意听取别人的意见.

  4. 离子框架 – 标签内部的ng-click不起作用

    >为什么标签标签内的按钮不起作用?>但是标签外的按钮(登陆)工作正常,为什么?>请帮我解决这个问题.我需要在点击时做出回复按钮workingdemo解决方案就是不要为物品使用标签.而只是使用divHTML

  5. Angular 2:将值传递给路由数据解析

    我正在尝试编写一个DataResolver服务,允许Angular2路由器在初始化组件之前预加载数据.解析器需要调用不同的API端点来获取适合于正在加载的路由的数据.我正在构建一个通用解析器,而不是为我的许多组件中的每个组件设置一个解析器.因此,我想在路由定义中传递指向正确端点的自定义输入.例如,考虑以下路线:app.routes.ts在第一个实例中,解析器需要调用/path/to/resourc

  6. angularjs – 解释ngModel管道,解析器,格式化程序,viewChangeListeners和$watchers的顺序

    换句话说:如果在模型更新之前触发了“ng-change”,我可以理解,但是我很难理解在更新模型之后以及在完成填充更改之前触发函数绑定属性.如果您读到这里:祝贺并感谢您的耐心等待!

  7. 角度5模板形式检测形式有效性状态的变化

    为了拥有一个可以监听其包含的表单的有效性状态的变化的组件并执行某些组件的方法,是reactiveforms的方法吗?

  8. Angular 2 CSV文件下载

    我在springboot应用程序中有我的后端,从那里我返回一个.csv文件WheniamhittingtheURLinbrowsercsvfileisgettingdownloaded.现在我试图从我的角度2应用程序中点击此URL,代码是这样的:零件:服务:我正在下载文件,但它像ActuallyitshouldbeBook.csv请指导我缺少的东西.有一种解决方法,但您需要创建一个页面上的元

  9. angularjs – Angular UI-Grid:过滤后如何获取总项数

    提前致谢:)你应该避免使用jQuery并与API进行交互.首先需要在网格创建事件中保存对API的引用.您应该已经知道总行数.您可以使用以下命令获取可见/已过滤行数:要么您可以使用以下命令获取所选行的数量:

  10. angularjs – 迁移gulp进程以包含typescript

    或者我应该使用tsc作为我的主要构建工具,让它解决依赖关系,创建映射文件并制作捆绑包?

返回
顶部