APP下载

Java响应式程式设计RxJava简单介绍

消息来源:baojiabao.com 作者: 发布时间:2026-05-22

报价宝综合消息Java响应式程式设计RxJava简单介绍

原文:https://www.cnblogs.com/WoodJim/p/5970937.html

RxJava简介

Rx(ReactiveX,响应式程式设计)是一种事件驱动的基于异步资料流的程式设计模式,整个资料流就像一条河流,它可以被观测(监听),过滤,操控或者与其他资料流合并为一条新的资料流。而RxJava是.Net Rx在JVM上的实现。RxJava可以应用于大部分基于JVM的语言,如Scala,Groovy等。整个RxJava+RxAndroid的包大小为(1125kb+10kb)

RxJava特点

函式响应式程式设计(Functional Reactive Programming,FRP)异步事件驱动的基于观察者模式专门的出错处理,当使用RxJava出现错误时,它不会直接丢掷异常,而是会执行OnError()方法;并发,可以很容易实现多执行绪RxJava的基本概念

RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者),Observables发出一系列事件,Subscribers处理这些事件。而RxJava的Observables是扩充套件自设计模式中的观察者模式,添加了以下几个能力:

onCompleted(),当没有新的可用资料时,通知Observables;onError(),当发生错误时,通知Observables,但不会直接将错误或异常直接丢掷;四个关键概念

Observable,产生事件(事件源)Observer, 根据事件作出相应的响应Subscriber,实现了Observer的抽象类,Subjects,Observable + ObserverObservable

Observable在存活期间,生命周期包含三个可能的事件,与迭代器的生命周期很类似:

Events Iterable(pull) Observable(push) 得到资料 T next() onNext(T) 发现错误 throws Exception onError(Throwable) 完成 !hasNext() onCompleted() 与使用迭代器的区别:在使用迭代器的时候,执行绪会阻塞直到他们需要的资料到来。而使用Observable,是使用异步的方式将资料推送到Observer;

而根据推送机制的不同,Observable分为热Observable和冷Observable:

热Observable,当他建立时新开始执行它的职责,这样所有订阅了这个Observable的Observer就可以直接大中途观察了(但可能会丢失前面传送的资料(事件));冷Observable,只有等到有订阅(subscribes)了这个Observable的Observer才开始执行它的职责:传送资料;下面是一个简单的建立观察者的程式码:

Observable.create(new Observable.OnSubscribe(){

@Override

public void call(Subscriber super Object> subscriber){}

});

// example

Observable ob=

Observable.create(new Observable.OnSubscribe(){

@Override

public void call(Subscriber super Integer> subscriber){

for(int i = 0; i observer.onNext(i);

}

observer.onCompleted();

}

});

// 并不用关心有多少资料,

Subscription subscriptionPrint =

observableString.subscribe(new Observer @Override

public void onCompleted(){

System.out.println("Observable completed");

}

@Override

public void onError(Throwable e){

System.out.println("Oh no! Something wrong happened!");

}

@Override

public void onNext(Integer item)

System.out.println("Item is " + item);

})

Observable的构造方法:

create(subscribe),需要一个subscribe作为引数来构造,from(list)// 用来从一个已知的列表的产生资料,和前面的create作用类似;just(funnction),用来接收从一个方法的返回值(最多可以有9个引数),如果返回的是List,它不会去逐个遍历List的Items,而是直接输出整个List ;empty(),不输出资料,但可以正常结束;never(),不输出资料,并且不会终止;throw(),不输出资料,但在发生错误时终止;interval(),建立一个按固定间隔传送整数序列的Observable,timer(),建立一个Observable在给定的延迟后传送 一个 特殊的值;Subject

Subject = Observable + Observer 这意味着一个Subject可以同时是观察者和被观察者,事件源(Observable),也就是说Subject可以像观察者一样订阅一个事件源,并且可以像Observable一样输出它们收到的事件。RxJava提供了四种不同型别的subjects:

PublishSubject,BehaviorSubject,输出它观察到的大部分最近的Items和随后观察的Items到所有的订阅者,初始化时需要一个初始值来做为最近的ItemsReplaySubject,将它观察到的所有资料重复传送到所有订阅了的观察者;AsyncSubject, 在整个Observable完成后,将最后观察到的Items传送给每一个订阅者;RxJava的操作符

过滤

filter(),过滤掉不需要的资料,只有返回true 的资料才会被使用;take(int n),只取返回资料中的前n个,skip(int n)跳过前n个数据;takeLast(int n),只取资料的最后n个,skipLast(int n);distinct(),会帮助我们处理重复的资料,但如果资料太大的话,内存需要比较大distinctUntilChanged(),只有当新资料与先前的不同,才会输出,first(),last();firstOrDefault(),lastOrDefault,如果Observable没有输出任何资料时,我们可以给一个预设值;elementAt(int n),输出第n个位置上的资料(从0 开始)timeout(),如果在给定时间间隔内,没有输出有效资料,则会执行onError();delay() 用于事件流中,延迟一段时间再发送来自Observable的结果;对映

map(),用来对映简单的资料flatMap(),用来对映伫列等,但可能会改变资料的顺序concatMap(),解决了fmp的交错的问题flatMapIterable(),将生成的Iterable与Items进行对应起来(类似于key-value);switchMap(), 这几个方法都是将输入的资料以一种新的形式输出;Scan(),类似于一个累加的方法,后一个item是前面item的后再加上原来的item;GroupBy(),buffer(int n),将资料作为列表(每n个数据作为一个列表)输出而不是单个的Items;cast() 类似于map();合并

merge()可以将多个输入整合成一个输出(并不会合并Items);zip(),可以将多个输入整合成一个输出(会合并Items);重试

retryWhen(),当接收到onError()事件时,触发重新订阅(发生某些错误时,需要做什么工作);repeat(),当接收到onComplete()事件时,触发重新订阅执行绪的排程(Schedulers)

RxJava提供了5种类型的排程者: Schedulers

.io(), 使用执行绪池来为IO操作进行排程,但没有骑士执行绪池的大小 作限制,因此使用时需要考虑内存的使用.computation(),与IO无关的计算型排程,有很多RxJava相关的预设方法:buffer(),debounce(),delay(),interval()等预设是在该类执行绪中执行;.immediate(),在当前执行绪中快速开始某项操作,是方法:timeout(),timeInterval等的预设排程器;.newThread(), 开启新执行绪来执行某项操作.trampoline(),为一些不需要立即执行的任务进行排程,会依次执行伫列里的任务,是方法:repeat(),retry()的预设排程器;RxAndroid还提供了一个Android特有的排程器:AndroidSchedulers.mainThread()来让程式码在UI主执行绪中执行;

RxJava提供了一个每一个Observables都可以使用的subscribeOn(),ObserveOn()方法,将Scheduler与Observables建立联络,我们可以这样来使用:

.subscribeOn(Schedulers.io()) // 让事件的产生发生在IO执行绪,多次呼叫,以最后一次呼叫的结果为准

.observeOn(AndroidSchedulers.mainThread()) // 让事件的回调发生在UI主执行绪中

.subscribe(....)

RxAndroid

前面版本的RxAndroid还提供了AppObservable,ViewObservable,WidgetObservable,LifecycleObservable 等,但最新版本的RxAndroid直接删掉了这些;

2019-12-20 00:05:00

相关文章