Если вы новичок в общении с RxJava или пытались разобраться в этом, но не довели дело до конца, то ниже вы найдете для себя кое-что новое.
Оригинал статьи написан 29 ноября 2017. Перевод вольный.
Нам в GO-JEK требуется выполнять большое количество асинхронных операций в приложениях и мы не можем позволить себе идти на компромиссы в ущерб скорости работы и плавности пользовательского интерфейса.
Написание сложных многопоточных Android приложений может быть достаточно трудоемким процессом, который время от времени будет вас сильно перегружать из-за необходимости заботиться о большом количестве связанных друг с другом вещей. Это и многие другие причины убедили нас использовать RxJava в разрабатываемых Android приложениях.
В этой статье мы поговорим о том как мы использовали реальные возможности работы с многопоточностью в RxJava для того, чтобы сделать процесс разработки приложения максимально простым, легким и веселым. Во всех примерах кода ниже будет использоваться RxJava 2, но описанные концепции можно будет применять и в других реактивных расширениях.
Observable.just(1, 2, 3, 4, 5)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
println("Emitting item on: " + currentThread().getName());
}
})
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
println("Processing item on: " + currentThread().getName());
return integer * 2;
}
})
.subscribeWith(new DisposableObserver<Integer>() {
@Override
public void onNext(@NonNull Integer integer) {
println("Consuming item on: " + currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
doOnNext()
? Это не что иное, как side-effect оператор. Он помогает внедряться в цепочку объектов observable
и выполнять грязные (impure) операции. Например, внедрять дополнительный код в цепочке вызовов для отладки. Прочитать больше можно здесь.Book
сетевым запросом и показать его в основном потоке приложения. Довольно общий и понятный пример для начала.getBooks().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Book>() {
@Override
public void onNext(@NonNull Book book) {
// Вы можете получить доступ к объектам Book здесь
}
@Override
public void onError(@NonNull Throwable e) {
// Отлавливаем ошибки здесь
}
@Override
public void onComplete() {
// Все объекты Book получены. Готово!
}
});
getBooks()
, который осуществляет сетевой вызов и собирает список книг для нас. Сетевой вызов занимает время (несколько миллисекунд или секунд), поэтому мы используем subscribeOn()
и указываем планировщик Schedulers.io()
для выполнения операции в потоке ввода-вывода.observeOn()
вместе с планировщиком AndroidSchedulers.mainThread()
для того, чтобы обрабатывать результат в основном потоке и показать список книг в пользовательском интерфейсе приложения.Executor
. Может возникнуть ситуация, в которой необходимо будет выполнять определенные задачи в планировщике на основании собственной логики распределения потоков.Scheduler.from(Executors.newFixedThreadPool(n))
) и использовать его во всех местах, связанных с сетевыми вызовами.Looper
, связанный с основным потоком, но есть возможность переопределения: AndroidSchedulers.from(Looper looper)
.Schedulers.io()
. Всегда есть риск бесконечного роста количества потоков.subscribeOn()
, то все события происходят в том потоке, в котором произошел вызов кода (в нашем случае — main
поток).subscribeOn()
и планировщика Schedulers.computation()
. Когда вы запустите нижеследующий пример кода, то увидите, что события происходят в одном из вычислительных потоков, доступных в пуле — RxComputThreadPool-1
.DisposableSubscriber
, так как нам не нужно переопределять onError()
и onComplete()
. Воспользуемся doOnNext()
и лямбдами.Observable.just(1, 2, 3, 4, 5, 6)
.subscribeOn(Schedulers.computation())
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
subscribeOn()
. Он работает только с наблюдаемым источником (source observable), и контролирует в какой поток наблюдаемый источник передает события.map()
и filter()
), а оператор subscribeOn()
помещен в конце цепочки вызовов. Но как только вы запустите этот код, то заметите, что все события будут возникать в потоке, указанном в subscribeOn()
. Это станет более понятным при добавлении observeOn()
в цепь вызовов. И даже если мы разместим subscribeOn()
ниже observeOn()
, то логика работы не изменится. subscribeOn()
работает только с наблюдаемым источником (source observable).Observable.just(1, 2, 3, 4, 5, 6)
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.map(integer -> integer * 3)
.filter(integer -> integer % 2 == 0)
.subscribeOn(Schedulers.computation())
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
subscribeOn()
несколько раз в одной цепочке вызовов. Можно, конечно, написать ещё раз, но никаких изменений это не повлечет. В примере ниже мы последовательно вызываем три различных планировщика, можете ли вы догадаться, какой планировщик сработает при запуске?Observable.just(1, 2, 3, 4, 5, 6)
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
Schedulers.io()
, то вы правы! Даже если делать вызов многократно — сработает только первый subscribeOn()
, вызванный после observable-источника. Schedulers.io()
? Обычно все думают, что сработает Schedulers.newThread()
, так как этот вызов находится в конце цепочки.Observable
. Код ниже поможет нам разобраться в этом. Это ранее рассмотренный пример, но расписанный подробнее.Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> o2 = o1.filter(integer -> integer % 2 == 0);
Observable<Integer> o3 = o2.map(integer -> integer * 10);
o3.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
subscribe()
у observable объекта o3
, который затем делает неявный вызов subscribe()
у своего родительского observable объекта o2
. Реализация наблюдателя (observer), предоставляемая объектом o3
, умножает переданные числа на 10.o2
неявно вызывает subscribe()
у объекта o1
, передавая реализацию наблюдателя, которая позволяет обрабатывать только четные числа. Теперь мы достигли корневого элемента (o1
), у которого нет родителя для последующего вызова subscribe()
. На этом этапе завершается цепочка наблюдаемых (observable) элементов, после чего observable-источник начинает передавать (emit) элементы.subscribeOn()
указывает observable-источнику передавать элементы в определенный поток и этот поток будет отвечать за продвижение элементов вплоть до подписчика (Subscriber). Поэтому, по умолчанию, подписчик получает обработанные элементы в этом же потоке.Observable
, который осуществляет сетевой вызов в потоке ввода-вывода и передает результат подписчику. Если вы используете только subscribeOn(Schedulers.io())
, то целевой подписчик будет обрабатывать результат в том же потоке ввода-вывода. И нам не повезло, так как работать с пользовательским интерфейсом в Android можно только в основном потоке.observeOn()
. Когда observeOn()
встречается в цепочке вызовов, то передаваемые observable-источником элементы незамедлительно перебрасываются в поток, указанный в observeOn()
.getIntegersFromRemoteSource()
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
observeOn()
будет вызываться несколько раз для переключения потоков в процессе обработки данных.getIntegersFromRemoteSource()
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(integer -> {
println("Mapping item " + integer + " on: " + currentThread().getName());
return integer * integer;
})
.observeOn(Schedulers.newThread())
.filter(integer -> {
println("Filtering item " + integer + " on: " + currentThread().getName());
return integer % 2 == 0;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
subscribeOn()
вместе с Schedulers.io()
. Далее мы хотим преобразовать каждый элемент, используя оператор map()
, но сделать это нужно в вычислительном потоке. Для этого используем observeOn()
вместе с Schedulers.computation()
перед вызовом map()
для переключения потока и передачи элементов в целевой вычислительный поток.observeOn()
, но уже в паре с Schedulers.newThread()
перед вызовом оператора filter()
для передачи каждого элемента в новый поток.observeOn()
вместе с планировщиком AndroidSchedulers.mainThread()
.observeOn()
несколько раз последовательно? В примере ниже в каком потоке подписчик получит результат? getIntegersFromRemoteSource()
.doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.single())
.observeOn(Schedulers.computation())
.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
RxComputationThreadPool-1
. Это значит, что сработал последний вызванный observeOn()
. Интересно почему?Obsevable
, но с передачей событий (emissions) всё происходит наоборот, то есть в обычном порядке, как записан код. Вызов происходит от observable-источника и далее вниз по цепочке вызова вплоть до подписчика.observeOn()
всегда работает в прямом порядке, поэтому последовательно происходит переключение потоков и последним происходит переключение на вычислительный поток (observeOn(Schedulers.computation())
). Итак, когда нужно переключить поток для обработки данных в новом потоке, то просто сначала вызовите observeOn()
, а далее обрабатывайте элементы. Синхронизация, исключение состояния гонки, всё это и многие другие сложности многопоточности RxJava обрабатывает за вас.К сожалению, не доступен сервер mySQL