前言:

RxJava中提供了大量的操作符,这大大提高了了我们的开发效率。其中最基本的两个变换操作符就是mapflatMap。而其他变换操作符的原理基本与map类似。

  • map和flatMap都是接受一个函数作为参数(Func1)并返回一个被观察者Observable
  • Func1的< I,O >I,O模版分别为输入和输出值的类型,实现Func1的call方法对I类型进行处理后返回O类型数据,只是flatMap中执行的方法的返回类型为Observable类型

作用

map对Observable发射的每一项数据应用一个函数,执行变换操作。对原始的Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。

flatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射

使用方法:

通过代码来看一下两者的使用用方法:

map

Observable.just(new User("白瑞德"))
                  .map(new Function<User, String>() {
                      @Override
                      public String apply(User user) throws Throwable {
                          return user.getName();
                      }
                  })
                  .subscribe(new Consumer<String>() {
                      @Override
                      public void accept(String s) throws Throwable {
                          System.out.println(s);
                      }
                  });
<<<白瑞德

这段代码接受一个User对象,最后打印出User中的name。

flatMap

假设存在一个需求,图书馆要打印每个User借走每一本书的名字: User结构如下:

class User {
    private String       name;
    private List<String> book;
}

我们来看一下map的实现方法:

Observable.fromIterable(userList)
                  .map(new Function<User, List<String>>() {
                      @Override
                      public List<String> apply(User user) throws Throwable {
                          return user.getBook();
                      }
                  })
                  .subscribe(new Consumer<List<String>>() {
                      @Override
                      public void accept(List<String> strings) throws Throwable {
                          for (String s : strings) {
                              System.out.println(s);
                          }
                      }
                  });

可以看到,map的转换总是一对一,只能单一转换。我们不得不借助循环进行打印。 下面我们来看一下flatMap的实现方式:

Observable.fromIterable(userList)
                  .flatMap(new Function<User, ObservableSource<String>>() {
                      @Override
                      public ObservableSource<String> apply(User user) throws Throwable {
                          return Observable.fromIterable(user.getBook());
                      }
                  })
                  .subscribe(new Consumer<String>() {
                      @Override
                      public void accept(String  o) throws Throwable {
                          System.out.println(o);
                      }
                  });

flatmap既可以单一转换也可以一对多/多对多转换。flatMap使用一个指定的函数对原始Observable发射的每一项数据之行相应的变换操作,这个函数返回一个本身也发射数据的Observable,因此可以再内部再次进行事件的分发。然后flatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

源码分析

下面我们就结合源码来分析一下这两个操作符。为了降低代码阅读难道,这里只保留核心代码:

map

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    //接受一个Function实例,并返回一个ObservableMap
    return new ObservableMap<T, R>(this, mapper);
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        //调用用父类构造方法,初始化父类中的downstream
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            v = mapper.apply(t);
            downstream.onNext(v);
        }
    }
}

这段代码是去掉map源码中一些校验和其它相关回调后的精简代码。接下来分析一下代码流程:

  • 当在调用map时,map接受一个匿名内部类Function的实例,并返回一个ObservableMap对象。
  • ObservableMap本质上是一个Observable,也是一个被观察者,其构造方法接受最外层的那个被Observable实例,和Function实例。ObservableMap重写了subscribeActual方法,在subscribeActual中使用新建了一个MapObserver实现了对原始Observable的观察。
  • 原始的Observable中的数据变会被发送到MapObserver的实例中。
  • MapObserver构造方法接收原始Observable的观察者actual,和Function的实例mapper
  • MapObserver在其onNext方法中调用mapperapply方法,获得该方法的返回值v apply方法就是map实例中: public String apply(User user) throws Throwable { return user.getName(); }
  • 调用downstream的onNext方法,并传入实参v。其中downstreamMapObserver父类中定义的变量,在MapObserver构造方法中super(actual);时初始化,其本身就是传入的actual,本质上就是最原始的Observable

整个流程可以概括如下: 存在一个原始的ObservableA和一个观察者ObserverA,当原始的被观察者ObservableA调用map,并传入一个匿名内部类实例化的’function‘,map新建并返回了一个被观察者ObservableB,通过subscribe让观察者ObserverA对其进行订阅。并重写subscribeActual方法,在其被订阅时创建一个新的观察者ObserverB其接受的,并用ObserverB对原始的ObservableA进行订阅观察。当原始的ObservableA发出事件,调用ObserverBonNext方法,subscribeActual接受的观察者便是最原始的观察者ObserverAObserverB变执行通过匿名内部类实例化的’function‘的apply方法得到数据v,紧接着调用原始的ObservableAonNext方法,并传入实参vObserverA观察到事件。 一句话概括:一个原始的被观察者和观察者,但是让原始的观察者去订阅一个新的观察者,当新的被观察者被订阅的时候,创建一个新的观察者去订阅原始的被观察者,并在监听的事件之后执行指定的操作后再通知原始观察者。所以这里面一共涉及到两对观察者和被观察者,map方法会创建一对新的观察者和被观察者作为原始观察者和被观察者通讯的纽带,并在其中做一些数据变换。

用图片显示流程如下:

蓝色框内就是map创建的观察者和被观察者。实际上我们的原始ObserverA并没有对ObservableA进行订阅。

flatMap

faltMapmap的基本原理类似,其代码如下:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;
    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
        MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
            ...   
            this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
        }
        @Override
        public void onSubscribe(Disposable d) {
            downstream.onSubscribe(this);
        }
        @Override
        public void onNext(T t) {
            ObservableSource<? extends U> p;
            p = mapper.apply(t);    
            subscribeInner(p);
        }
        @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId  );
                p.subscribe(inner);
        }
        void drain() {    
            drainLoop();
        }
        void drainLoop() {
            final Observer<? super U> child = this.downstream;
            child.onNext(o);
        }
    }
    static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {
        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;
        volatile boolean done;
        volatile SimpleQueue<U> queue;
        int fusionMode;
        InnerObserver(MergeObserver<T, U> parent, long id) {
            this.id = id;
            this.parent = parent;
        }
        @Override
        public void onNext(U t) {
            parent.drain();
        }
    }
}

上述代码即是faltMap精简后的源码,其中大部分代码的运作流程和前文中的map源码一致,我们继续延续上文中讲解中的观察者和被观察者。重点关注其不同的地方: faltMap返回一个新的被观察者ObservableB,重写ObservableBsubscribeActual方法在原始的观察者ObserverA对其进行订阅时新建一个观察者ObserverB对原始的ObservableA进行订阅。新的观察者ObserverB持有原始的ObserverAfaltMap接收的匿名对象实例function。当ObserverB监听到原始的被观察者ObservableA的事件时,ObserverB调用functionapply方法获得新新的被观察者ObservableC,再创建一个新的观察者ObserverCObservableC进行订阅,ObserverC持有原始的观察者ObserverA,在ObserverC观察到被观察者ObservableC的时间时,调用原始的观察者ObserverA的方法。

概括就是:faltMap方法要求调用者提供一个Observable,最原始的Observable在调用faltMap后,faltMap会创建一个新的Observable,并对原始的进行订阅。当拿到订阅后,会通过flatMap接收的函数拿到调用者传入的Observable,并用最原始的观察者对它进行订阅。这期间涉及三对观察者和被观察者,flatMap会创建一对,同时也接收一对用户创建的。flatMap创建的和Map中的作用一样,不过flatMap连接的是原始的和用户通过flatMap提供的两对观察者和被观察者。而原始的观察者最终是对用户通过flatMap提供的那个观察者进行订阅。

用图片显示流程如下:

和Map的流程很相似,只不过是需要用户再提供一对观察者和被观察者。最终实现对用户提供的被观察者进行订阅。

结语

至此,map和flatMap已基本分析完毕,其中map的代码比较简单易懂,flatMap中还涉及到大量辅助操作,文中并未涉及到其中的合并等操作,阅读起来有些困难。如果仅仅是为了了解二者的原理,可以阅读Single<T>中的代码。其中的代码量远远少于Observable中的代码量。如果对RxJava基本的模式还不了解,可以阅读 手写极简版的Rxjava

以上就是RxJava中map和flatMap的用法区别源码解析的详细内容,更多关于RxJava map flatMap区别的资料请关注Devmax其它相关文章!

RxJava中map和flatMap的用法区别源码解析的更多相关文章

  1. js中值类型和引用类型的区别介绍

    这篇文章介绍了js中值类型和引用类型的区别,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  2. Swift 进阶 —— map 和 flatMap的使用

    这篇文章主要介绍了Swift map和flatMap的相关资料,帮助大家更好的理解和使用Swift,感兴趣的朋友可以了解下

  3. PHP中$GLOBALS与global的区别详解

    今天小编就为大家分享一篇关于PHP中$GLOBALS与global的区别详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

  4. jQuery和AngularJS的区别浅析

    这篇文章主要介绍了jQuery和AngularJS的区别浅析,本文着重讲解一个熟悉jQuery开的程序员如何应对AngularJS中的一些编程思想的转变,需要的朋友可以参考下

  5. react中使用forEach或map两种方式遍历数组

    这篇文章主要介绍了react中使用forEach或map两种方式遍历数组,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  6. Vue中构造数组数据之map和forEach方法实现

    数组操作是前端最重要的数据操作,构造数组数据,又是数组操作中很常见的,本文将梳理下map和forEach方法在Vue项目中的使用,感兴趣的朋友跟随小编一起看看吧

  7. Rxjava+Retrofit+Okhttp进行网络访问及数据解析

    这篇文章主要介绍了Rxjava+Retrofit+Okhttp进行网络访问及数据解析,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下

  8. Vue.js和Vue.runtime.js区别浅析

    这篇文章主要介绍了Vue.js和Vue.runtime.js区别浅析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  9. PHP7引入的"??"和"?:"的区别讲解

    今天小编就为大家分享一篇关于PHP7引入的"??"和"?:"的区别讲解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

  10. 一文让你彻底弄懂js中undefined和null的区别

    JavaScript是一门动态类型语言,元素除了表示存在的空值外,还有可能根本就不存在,这就是undefined存在的原因,这篇文章主要给大家介绍了关于undefined和null区别的相关资料,需要的朋友可以参考下

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部