Pages

January 11, 2019

#RxJava Part 4: Operators in RxJava

An operator is a function that takes one Observable as a source as its first argument and returns another Observable as the destination. For every item that the source observable emits, it will apply a function to that item, and then emit the result on the destination Observable.

When we want to create complex data flows that filter event based on certain criteria, we can chain the operators one after another. i.e we can apply multiple operators to the same observable.

Operators are methods created for solving transformations and handling API calls problems. For example, let's say we want to append "Hello" String with "World", we can do it in several ways.

1). We can do it in Observable, but Observable is supposed to emit items, not to change them.
Observable justObservable = Observable.just("Hello" + "World");

2). We make append "World" in Subscriber, but Subscriber wasn’t supposed to change items.
Subscriber intSubscriber = new Subscriber() {
   public void onCompleted() {
      System.out.println("onCompleted()");
   }
   public void onError(Throwable e) {
      System.out.println("onErro()");
   }

   public void onNext(String s) {
      System.out.println(s+ "World");
   }
};

3). Since we cannot use the above two solutions, we need to add one more step, by adding Operator. Operator is one additional step between Observable and Subscriber, where object can be transformed.

Observable o = Observable.create
                       (new Observable.OnSubscribe() {
                       Observable createObserver = 
                       Observable.just("Hello")
                       .map(new Func1() {
                          public String call(String s) {
                            return s + " World";
                          }
                       });
In the above example Observable emits just one unchanged item (i.e Hello), we will use Map operator can transform our object in a way we like and return it to Subscriber.

Observable Transformations and Conditional Operators

map operator transforms items emitted by an Observable by applying a function to each item. e.g: below code will transform employeeNames into upper case.

Observable.from(employeeNames)
  .map(String::toUpperCase)
  .subscribe(employeeName -> result += employeeName);

flatMap can be used to flatten Observables whenever we end up with nested Observables. e.g, lets say we have a method (getEmployeeNames) that returns an Observable from a list of strings. Now we'll print each string (employee names) from a new Observable the list of names based on what Subscriber sees.

Observable getEmployeeNames() {
    return Observable.from(employeeNameList);
}

Observable.just("Himaanshu Shuklaa", "Ravi Thapliyal")
  .flatMap(s -> getEmployeeNames())
  .subscribe(l -> result += l);

scan operator applies a function to each item emitted by an Observable sequentially and emits each successive value. It allows us to carry forward state from event to event. e.g:

String[] employeeNames= {"Himaanshu Shuklaa", "Ravi Thapliyal"};
Observable.from(employeeNames)
  .scan(new StringBuilder(), StringBuilder::append)
  .subscribe(total -> result += total.toString());

groupBy operator allows us to classify the events in the input Observable into output categories. Below example takes a list of numbers and apply group by operator to divide the numbers into even and odd categories.

Observable.from(numbers)
  .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
  .subscribe(group ->
    group.subscribe((number) -> {
        if (group.getKey().toString().equals("EVEN")) {
            EVEN[0] += number;
        } else {
            ODD[0] += number;
        }
    })
  );

filter emits only those items from an observable that pass a predicate test. Below code will filter the integer array for the even numbers:
Observable.from(numbers)
  .filter(i -> (i % 2 == 0))
  .subscribe(i -> result += i);

defaultIfEmpty emits item from the source Observable, or a default item if the source Observable is empty.
Observable.empty()
  .defaultIfEmpty("Empty Observable")
  .subscribe(s -> result += s);

takeWhile operator discards items emitted by an Observable after a specified condition becomes false.
Observable.from(numbers)
  .takeWhile(i -> i < 10)
  .subscribe(n -> sum[0] += n);

-K Himaanshu Shuklaa..

No comments:

Post a Comment