代码示例:
Subject<Story> mStoryEmitter = PublishSubject.create();
private void getStory(int storyID) {
HNApi.Factory.getInstance().getStory(storyID).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getStoryObserver());
}
mStoryListemitter.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(disposable d) {}
@Override
public void onNext(List<Integer> value) {
if(mRecyclerView != null) {
mRecyclerView.setAdapter(null);
if(mAdapter != null) {
mAdapter.clear();
mAdapter = null;
}
}
mAdapter = new SimpleRecyclerViewAdapter();
mRecyclerView.setAdapter(mAdapter);
for(Integer storyID : value) {
getStory(storyID);
}
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
private disposableObserver<Story> getStoryObserver() {
disposableObserver<Story> observer = new disposableObserver<Story>() {
@Override
public void onNext(Story value) {
mStoryEmitter.onNext(value);
dispose();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
return observer;
}
错误:
Throwing OutOfMemoryError "Could not allocate JNI Env"
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.
at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
Caused by: java.lang.OutOfMemoryError: Could not allocate JNI Env
at java.lang.Thread.nativeCreate(Native Method)
at java.lang.Thread.start(Thread.java:1063)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:921)
at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1556)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:310)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:543)
at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:642)
at io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:120)
at io.reactivex.internal.schedulers.IoScheduler$EventLoopWorker.schedule(IoScheduler.java:221)
at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:130)
at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:109)
AppData::create pipe(2) Failed: Too many open files
at io.reactivex.internal.operators.observable.ObservableSubscribeOn.subscribeActual(ObservableSubscribeOn.java:36)
at io.reactivex.Observable.subscribe(Observable.java:10514)
at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:44)
at io.reactivex.Observable.subscribe(Observable.java:10514)
at com.example.MainActivity.getStory(MainActivity.java:100)
at com.example.MainActivity.access$300(MainActivity.java:25)
at com.example.MainActivity$2.onNext(MainActivity.java:67)
at com.example.MainActivity$2.onNext(MainActivity.java:49)
at io.reactivex.subjects.PublishSubject$Publishdisposable.onNext(PublishSubject.java:263)
at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:182)
at com.example.MainActivity$5.onNext(MainActivity.java:147)
at com.example.MainActivity$5.onNext(MainActivity.java:138)
at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainnormal(ObservableObserveOn.java:198)
at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:250)
at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:109)
... 7 more
AppData::create pipe(2) Failed: Too many open files
FATAL EXCEPTION: main
Process: com.example,PID: 15857
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.
at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
Caused by: java.lang.OutOfMemoryError: Could not allocate JNI Env
at java.lang.Thread.nativeCreate(Native Method)
at java.lang.Thread.start(Thread.java:1063)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:921)
at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1556)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:310)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:543)
at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:642)
at io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:120)
at io.reactivex.internal.schedulers.IoScheduler$EventLoopWorker.schedule(IoScheduler.java:221)
at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:130)
at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:109)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn.subscribeActual(ObservableSubscribeOn.java:36)
at io.reactivex.Observable.subscribe(Observable.java:10514)
at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:44)
at io.reactivex.Observable.subscribe(Observable.java:10514)
at com.example.MainActivity.getStory(MainActivity.java:100)
at com.example.MainActivity.access$300(MainActivity.java:25)
at com.example.MainActivity$2.onNext(MainActivity.java:67)
at com.example.MainActivity$2.onNext(MainActivity.java:49)
at io.reactivex.subjects.PublishSubject$Publishdisposable.onNext(PublishSubject.java:263)
at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:182)
at com.example.MainActivity$5.onNext(MainActivity.java:147)
at com.example.MainActivity$5.onNext(MainActivity.java:138)
at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainnormal(ObservableObserveOn.java:198)
at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:250)
at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:109)
... 7 more
解决方法
而JakeWharton的回复实际上是在加速
The problem is that Schedulers.io() uses a cached thread pool without a limit and thus is trying to create 1500 threads. You should consider using a Scheduler that has a fixed limit of threads,or using RxJava 2.x’s parallel() operator to parallelize the operation to a fixed number of workers.
If you’re using raw Retrofit by default it uses OkHttp’s dispatcher which limits the threads to something like 64 (with a max of 5 per host). That’s why you aren’t seeing it fail.
If you use createAsync() when creating the RxJava2CallAdapterFactory it will create fully-async Observable instances that don’t require a subscribeOn and which use OkHttp’s dispatcher just like Retrofit would otherwise. Then you only need observeOn to move back to the main thread,and you avoid all additional thread creation.