subscribeOn和observeOn负责线程切换,同时某些操作符也默认指定了线程.

我们这里不分析在线程中怎么执行的.只看如何切换到某个指定线程.

subscribeOn

Observable.subscribeOn()在方法内部生成了一个ObservableSubscribeOn对象.

主要看一下ObservableSubscribeOn的subscribeActual方法.

 @Override
  public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    //调用下游的Observer的onSubscribe方法
    observer.onSubscribe(parent);
    //通过SubscribeTask执行了上游Observable的subscribeActual方法
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
  }

scheduler.scheduleDirect(Runnable)用于执行SubscribeTask这个任务.SubscribeTask本身是Runnable的实现类.看一下其run方法.

    @Override
    public void run() {
      //上游的Observable.subscribe方法被切换到了新的线程
      source.subscribe(parent);
    }

首先可以得出结论:subscribeOn将上游的Observable的subscribe方法切换到了新的线程.

如果多次调用subscribeOn切换线程,会有什么效果?

由下往上,每次调用subscribeOn,都会导致上游的Observable的subscribeActual切换到指定的线程.那么最后一次调用的切换最上游的创建型操作符的subscribeActual的执行线程.如果操作符有默认执行线程怎么办?

操作符默认线程

如果是创建型操作符,处于最上游,那么subscribeOn的线程切换对它不起作用.天高皇帝远,县官不如现管.就是这个道理.
如果是其它操作符,会是怎样的?

以操作符timeout为例:它对应ObservableTimeoutTimed和TimeoutObserver

 @Override
    public void onNext(T t) {
      downstream.onNext(t);
      //超时计时
      startTimeout(idx   1);
    }

    void startTimeout(long nextIndex) {
      //交给操作符默认的线程执行
      task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
    }

    @Override
    public void onError(Throwable t) {
        downstream.onError(t); 
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
      }
    }

    @Override
    public void onTimeout(long idx) {
        downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
    }

//TimeoutTask.java
static final class TimeoutTask implements Runnable {

    @Override
    public void run() {
      parent.onTimeout(idx);
    }
  }

可以看到操作符默认的执行线程只用来做超时计时任务,如果超时了,会在操作符的默认线程执行onError方法..操作符默认线程对下游的observer造成什么影响要做具体对待.

observeOn

observeOn对应ObservableObserveOnObserveOnObserver.

 //ObservableObserveOn.java
 @Override
  protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
      source.subscribe(observer);
    } else {
      Scheduler.Worker w = scheduler.createWorker();
      source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
  }
 //ObserveOnObserver.java 
  @Override
    public void onSubscribe(Disposable d) {
      if (DisposableHelper.validate(this.upstream, d)) {
        if (d instanceof QueueDisposable) {
          if (m == QueueDisposable.SYNC) {
          //执行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            schedule();
            return;
          }
          if (m == QueueDisposable.ASYNC) {
           //执行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            return;
          }
        }
         //执行下游Observer的onSubscribe方法
        downstream.onSubscribe(this);
      }
    }
    @Override
    public void onNext(T t) {
     //省略
      schedule();
    }
    @Override
    public void onError(Throwable t) {
     //省略
      schedule();
    }
     void schedule() {
      if (getAndIncrement() == 0) {
      /*
      ObserveOnObserver是Runnable的实现类.交给线程池执行
      */
        worker.schedule(this);
      }
    }
    
    
    void drainNormal() {
      final Observer<? super T> a = downstream;
      for (;;) {
        for (;;) {
          T v;
          try {
            v = q.poll();
          } catch (Throwable ex) {
            a.onError(ex);
            return;
          }
          //执行下游Observer的onNext方法
          a.onNext(v);
        }
      }
    }

    void drainFused() {
      for (;;) {
        if (!delayError && d && ex != null) {
          //执行下游Observer的onError方法
          downstream.onError(error);
          return;
        }
        downstream.onNext(null);
        if (d) {
          ex = error;
          if (ex != null) {
            //执行下游Observer的onError方法
            downstream.onError(ex);
          } else {
            //执行下游Observer的onComplete方法
            downstream.onComplete();
          }
          return;
        }
      }
    }
    //执行线程任务
    @Override
    public void run() {
      if (outputFused) {
        drainFused();
      } else {
        drainNormal();
      }
    }

从上面可以看出ObservableObserveOn在其subscribeActual方法中并没有切换上游Observable的subscribe方法的执行线程.但是ObserveOnObserver在其onNext,onError和onComplete中通过schedule()方法将下游Observer的各个方法切换到了新的线程.

得出结论: observeOn负责切换的是下游Observer的各个方法的执行线程

如果下游多次通过observeOn切换线程,会有什么效果?

每次切换都会对其下游造成影响,直到遇到下一个observeOn为止.

Observer(onSubscribe,onNext,onError,onComplete)

onNext,onError,onComplete与上游最近的observeOn所切换的线程保持一致.onSubscribe则不同.
遇到线程切换的时候,会首先在对应的Observable的subscribeActual方法内,先调用observer.onSubscribe方法.而observer.onSubscribe会逐级向上传递直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法内调用,这是在主线程执行的.所以onSubscribe方法无论如何都是在主线程执行.

doOnSubscribe

.doOnSubscribe(new Consumer<Disposable>() {
          @Override
          public void accept(Disposable disposable) throws Exception {
           
          }
        })

我们要看的是方法accept的执行线程.

通过源码找到对应的DisposableLambdaObserver.

 @Override
  public void onSubscribe(Disposable d) {
  //在这里调用了accept方法.
      onSubscribe.accept(d);
  }

这就要看上游在哪个线程执行了Observer.onSubscribe(disposable)方法.

在创建型操作符的subscribeActual方法和subscribeOn对应的Observable的subscribeActual方法内调用了Observer.onSubscribe(disposable)方法.那么这两处的执行线程就决定了onSubscribe.accept(d);的执行线程.

doFinally

对应ObservableDoFinally和DoFinallyObserver

 //DoFinallyObserver.java
 @Override
    public void onError(Throwable t) {
      runFinally();
    }

    @Override
    public void onComplete() {
      runFinally();
    }

    @Override
    public void dispose() {
      runFinally();
    }
    
     void runFinally() {
       onFinally.run();
    }

可以看到与它所对应的DoFinallyObserver的onError,onComplete,dispose方法的执行线程有关,这三个方法的执行线程又受到上游的observeOn的影响.如果没有observeOn,则会受到最上游的observable.subscribeActual方法影响.

doOnError

对应ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java
 @Override
    public void onError(Throwable t) {
        onError.accept(t);
    }

和自身对应的observer.onError所在线程保持一致.

doOnNext

对应ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java
 @Override
    public void onNext(T t) {
        onNext.accept(t);
    }

和自身对应的observer.onNext所在线程保持一致.

操作符对应方法参数的执行线程

包io.reactivex.functions下的接口类一般用于处理上游数据然后往下传递.这些接口类的方法一般在对应的observer.onNext中调用.所以他们的线程保持一致.

总结:

subscribeOn由下往上逐级切换Observable.subscribe的执行线程,不受observeOn影响,也不受具有默认指定线程的非创建型操作符影响,但是会被更上游的subscribeOn夺取线程切换的权利,直到最上游.如果最上游的创建型操作符也有默认执行线程,那么任何一个subscribeOn的线程切换不起作用.subscribeOn由下向上到达最上游后,然后由上往下影响下游的observer的执行线程.遇到observeOn会被夺取线程切换的权利.observeOn影响的是下游的observer的执行线程,由上往下,遇到另一个observeOn会移交线程控制权力,遇到指定默认线程非创建型的操作符,要视具体情况对待.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持Devmax。

RxJava2 线程调度的方法的更多相关文章

  1. iOS:核心图像和多线程应用程序

    我试图以最有效的方式运行一些核心图像过滤器.试图避免内存警告和崩溃,这是我在渲染大图像时得到的.我正在看Apple的核心图像编程指南.关于多线程,它说:“每个线程必须创建自己的CIFilter对象.否则,你的应用程序可能会出现意外行为.”这是什么意思?我实际上是试图在后台线程上运行我的过滤器,所以我可以在主线程上运行HUD(见下文).这在coreImage的上下文中是否有意义?

  2. ios – 多个NSPersistentStoreCoordinator实例可以连接到同一个底层SQLite持久性存储吗?

    我读过的关于在多个线程上使用CoreData的所有内容都讨论了使用共享单个NSPersistentStoreCoordinator的多个NSManagedobjectContext实例.这是理解的,我已经使它在一个应用程序中工作,该应用程序在主线程上使用CoreData来支持UI,并且具有可能需要一段时间才能运行的后台获取操作.问题是NSPersistentStoreCoordinator会对基础

  3. ios – XCode断点应该只挂起当前线程

    我需要调试多线程错误.因此,为了获得生成崩溃的条件,我需要在代码中的特定点停止一个线程,并等待另一个线程到达第二个断点.我现在遇到的问题是,如果一个线程遇到断点,则所有其他线程都被挂起.有没有办法只停止一个线程,让其他线程运行,直到它们到达第二个断点?)其他更有趣的选择:当你点击第一个断点时,你可以进入控制台并写入这应该在该断点处暂停当前上下文中的线程一小时.然后在Xcode中恢复执行.

  4. ios – 在后台线程中写入Realm后,主线程看不到更新的数据

    >清除数据库.>进行API调用以获取新数据.>将从API检索到的数据写入后台线程中的数据库中.>从主线程上的数据库中读取数据并渲染UI.在步骤4中,数据应该是最新数据,但我们没有看到任何数据.解决方法具有runloops的线程上的Realm实例,例如主线程,updatetothelatestversionofthedataintheRealmfile,因为通知被发布到其线程的runloop.在后台

  5. ios – NSURLConnectionLoader线程中的奇怪崩溃

    我们开始看到我们的应用启动时发生的崩溃.我无法重现它,它只发生在少数用户身上.例外情况是:异常类型:EXC_BAD_ACCESS代码:KERN_INVALID_ADDRESS位于0x3250974659崩溃发生在名为com.apple.NSURLConnectionLoader的线程中在调用时–[NSBlockOperationmain]这是该线程的堆栈跟踪:非常感谢任何帮助,以了解可能导致这种崩

  6. ios – 合并子上下文时的NSObjectInaccessbileExceptions

    我尝试手动重现,但失败了.是否有其他可能发生这种情况的情况,是否有处理此类问题的提示?解决方法在创建子上下文时,您可以尝试使用以下行:

  7. ios – 从后台线程调用UIKit时发出警告

    你如何处理项目中的这个问题?

  8. ios – 在SpriteKit中,touchesBegan在与SKScene更新方法相同的线程中运行吗?

    在这里的Apple文档AdvancedSceneProcessing中,它描述了更新方法以及场景的呈现方式,但没有提到何时处理输入.目前尚不清楚它是否与渲染循环位于同一个线程中,或者它是否与它并发.如果我有一个对象,我从SKScene更新方法和touchesBegan方法(在这种情况下是SKSpriteNode)更新,我是否要担心同步对我的对象的两次访问?解决方法所以几天后没有回答我设置了一些实验

  9. ios – 在后台获取中加载UIWebView

    )那么,有一种方法可以在后台加载UIWebView吗?解决方法如果要从用户界面更新元素,则必须在应用程序的主队列(或线程)中访问它们.我建议您在后台继续获取所需的数据,但是当需要更新UIWebView时,请在主线程中进行.你可以这样做:或者您可以创建一个方法来更新UIWebView上的数据,并使用以下方法从后台线程调用它:这将确保您从正确的线程访问UIWebView.希望这可以帮助.

  10. ios – 何时使用Semaphore而不是Dispatch Group?

    我会假设我知道如何使用DispatchGroup,为了解问题,我尝试过:结果–预期–是:为了使用信号量,我实现了:并在viewDidLoad方法中调用它.结果是:从概念上讲,dispachGroup和Semaphore都有同样的目的.老实说,我不熟悉:什么时候使用信号量,尤其是在与dispachGroup合作时–可能–处理问题.我错过了什么部分?

随机推荐

  1. Flutter 网络请求框架封装详解

    这篇文章主要介绍了Flutter 网络请求框架封装详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  2. Android单选按钮RadioButton的使用详解

    今天小编就为大家分享一篇关于Android单选按钮RadioButton的使用详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

  3. 解决android studio 打包发现generate signed apk 消失不见问题

    这篇文章主要介绍了解决android studio 打包发现generate signed apk 消失不见问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

  4. Android 实现自定义圆形listview功能的实例代码

    这篇文章主要介绍了Android 实现自定义圆形listview功能的实例代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  5. 详解Android studio 动态fragment的用法

    这篇文章主要介绍了Android studio 动态fragment的用法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  6. Android用RecyclerView实现图标拖拽排序以及增删管理

    这篇文章主要介绍了Android用RecyclerView实现图标拖拽排序以及增删管理的方法,帮助大家更好的理解和学习使用Android,感兴趣的朋友可以了解下

  7. Android notifyDataSetChanged() 动态更新ListView案例详解

    这篇文章主要介绍了Android notifyDataSetChanged() 动态更新ListView案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下

  8. Android自定义View实现弹幕效果

    这篇文章主要为大家详细介绍了Android自定义View实现弹幕效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  9. Android自定义View实现跟随手指移动

    这篇文章主要为大家详细介绍了Android自定义View实现跟随手指移动,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  10. Android实现多点触摸操作

    这篇文章主要介绍了Android实现多点触摸操作,实现图片的放大、缩小和旋转等处理,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部