用RxJava实现事件总线(Event Bus)

登高瞭望 8年前

事件总线可以使Android中各组件之间的通信变得简单,最重要的是可以解耦!
目前大多数开发者使用EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。

不多说,上代码

  /**  * RxBus  * Created by YoKeyword on 2015/6/17.  */  public class RxBus {      private static volatile RxBus defaultInstance;      // 主题      private final Subject bus;      // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者      public RxBus() {        bus = new SerializedSubject<>(PublishSubject.create());      }      // 单例RxBus      public static RxBus getDefault() {          RxBus rxBus = defaultInstance;          if (defaultInstance == null) {              synchronized (RxBus.class) {                  rxBus = defaultInstance;                  if (defaultInstance == null) {                      rxBus = new RxBus();                      defaultInstance = rxBus;                  }              }          }          return rxBus;      }      // 提供了一个新的事件      public void post (Object o) {          bus.onNext(o);      }      // 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者      public <T> Observable<T> toObserverable (Class<T> eventType) {          return bus.ofType(eventType);  //        这里感谢小鄧子的提醒: ofType = filter + cast  //        return bus.filter(new Func1<Object, Boolean>() {  //            @Override  //            public Boolean call(Object o) {  //                return eventType.isInstance(o);  //            }  //        }) .cast(eventType);      }  }

注:
1、上述RxBus的单例写法,想了解更多可以参考这里Java 单例真的写对了么?

2、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

3、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

4、ofType操作符只发射指定类型的数据,其内部就是filter+cast(这里非常感谢@小鄧子 的提醒)

  public final <R> Observable<R> ofType(final Class<R> klass) {      return filter(new Func1<T, Boolean>() {          @Override          public final Boolean call(T t) {              return klass.isInstance(t);          }      }).cast(klass);  }

filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。
cast操作符可以将一个Observable转换成指定类型的Observable。

分析:

RxBus工作流程图


1、首先创建一个可同时充当Observer和Observable的Subject;

2、在需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),在这之后,一旦Subject接收到事件,立即发射给该订阅者;

3、在我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅该Subject的订阅者。

对于RxBus的使用,就和普通的RxJava订阅事件很相似了。
先看发送事件的代码:

  RxBus.getDefault().post(new UserEvent (1, "yoyo"));

userEvent是要发送的事件,如果你用过EventBus, 很容易理解,UserEvent的代码:

  public class UserEvent {      long id;      String name;      public User Event(long id,String name) {          this.id= id;          this.name= name;      }      public long getId() {          return id;      }      public String getName() {          return name;      }  }

再看接收事件的代码:

  // rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内  rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)          .subscribe(new Action1<UserEvent>() {                 @Override                 public void call(UserEvent userEvent) {                     long id = userEvent.getId();                     String name = userEvent.getName();                     ...                 }             },          new Action1<Throwable>() {              @Override              public void call(Throwable throwable) {                  // TODO: 处理异常              }                  });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

  @Override  protected void onDestroy() {      super.onDestroy();      if(!rxSubscription.isUnsubscribed()) {          rxSubscription.unsubscribe();      }  }

这样,一个简单的Event Bus就实现了!如果你的项目已经开始使用RxJava,建议可以把EventBus或Otto替换成RxBus,减小项目体积。

参考:
http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/



原文链接:YoKeyword(简书作者)