Pages

January 11, 2019

#RxJava Part 2 : Creating Observable, Observers and Subscribers

Creating Observers and Subscribers
While creating Observers and Subscribers, we need to override three methods:
  • onNext(): It gets the current value. It is called on observer each time a new event is published to the attached Observable. This is the method where we'll perform some action on each event.
  • onComplete(): It gets triggered when there is no more data left to be sent by the observable. This method indicate that we should not expect any more onNext calls on our observer
  • onError(): It gets triggered in case an exception  is thrown during the RxJava framework code or our event handling code.
FYI, the Iterator does have the equivalents for onNext() and onComplete() (hasNext()). It doesn’t have one when an exception is thrown. This is another advantage of Reactive code.


We can assume the reactive programming will provide us three channels, from one channel we will get data (i.e from onNext() method), when an exception occurs we will get the exception from the other channel (i.e via onError() method) and from the third channel we will get the intimation when no data is left(i.e via onComplete() method).

The data channel (via onNext() method) will stop immediately when the error occurred (onError() method will return an error) or when the data flow is complete (we get intimation via onComplete() method).

What’s the difference between onNext(), onComplete() and onError()?

These are the callbacks an Observable / Flowable will receive. The first one is called for each emission of the Observable / Flowable (e.g. zero to infinity times). onComplete() and onError() are mutually exclusive – only ONE of them will be called at most once. In other words a stream cannot complete and error out at the same time.

Creating An Observable
The role of an observable is to emit data. The Observable class can emit one or more sequence of streams using the method. It is meant to asynchronously push the items. This is in contrast to the Iterators interface which tends to pull items synchronously.

We need to call create() method, it is used to create a new Observable that can emit items. The call() method is where the items are pushed on an instance of the subscriber.

Linking Observer, Subscribers to the Observable
For the Observer and Subscriber to listen to the data stream emitted by the Observable they need to be subscribed using the subscribe() method, e.g:

myObservable.subscribe(mySubscriber);
myObservable.subscribe(myObserver);

RxJavaDemo
Let's write a simple program from which "Hello World" string is emitted by the observable for 10 times.

To check how the exception channel works we are throwing error by checking random number.

GIT URL: RxJavaDemo.java

When you run this program, you will notice once the error is thrown thedata channel will be closed i.e after the exception it will stop printing 'Hello World'.

In my case the output of above program is:
Subscriber onNext(): Hello World 1
Subscriber onNext(): Hello World 2
Subscriber onNext(): Hello World 3
Subscriber onNext(): Hello World 4
Error occured in Subscriber,java.lang.RuntimeException: Random error occurred..
Observer onNext(): Hello World 1
Error occured in Observer,java.lang.RuntimeException: Random error occurred..

Connectable Observables
A ConnectableObservable is like an ordinary Observable, except that it doesn't begin emitting items when it is subscribed. It starts emitting when the connect operator is applied to it.

Single
Single is like an Observable but instead of emitting a series of values, emits one value or an error notification. We can only use two methods to subscribe:

  • OnSuccess returns a Single that also calls a method we specify
  • OnError also returns a Single that immediately notifies subscribers of an error


-K Himaanshu Shuklaa..

No comments:

Post a Comment