본문 바로가기

Back-End/RxJava

RxJava란? Observable,Observerd



좋은 어플리케이션을 만들기 위해서는 3가지 요소를 고려하여야 한다. Responsiveness, Elasticness, Resilience이다.

RxJava 설정하기

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.0</version>
</dependency>



RxJava는 어떻게 동작하는가?

Dzone에 있는 모든 문서를 가져와주는 어플리케이션을 예시로 살펴보자.

public class DzoneDBDao {
private static DzoneDBDao service = new DzoneDBDao();
public static DzoneDBDao get() {
return service;
}
DZoneDoc[] getAllDocFromDB() {
return produceDocs();
}
private DZoneDoc[] produceDocs() {
DZoneDoc[] array = {
DZoneDoc.create("Java Microservice", "Refcardz"),
DZoneDoc.create("RX Java", "Article"),
DZoneDoc.create("IOT in Action", "Refcardz"),
DZoneDoc.create("Java8 in Action", "Refcardz"),
};
return array;
}
}


위의 함수는 mock database로 사용하기 위한 것이다.


public class DZoneDoc {
private String name;
private String type;
private DZoneDoc() {}
public static DZoneDoc create(String name, String type) {
DZoneDoc doc = new DZoneDoc();
doc.setName(name);
doc.setType(type);
return doc;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "DZoneDoc [name=" + name + ", type=" + type + "]";
}
}
import io.reactivex.Observable;
public class DZoneSearchService {
Observable < DZoneDoc > getAllDocs() {
return Observable.fromArray(DzoneDBDao.get().getAllDocFromDB());
}
}

위의 Observable은 RxJava에서 가장 중요한 추상클래스이다. 이제 RxJava의 구성요소들을 하나씩 살펴보자.


RxJava - Observable

Observable은 데이터를 끊임없이 가져오고 데이터를 필요로 하는 곳에 바로 보내주는 객체이다.

이는 Data를 소비하는 곳(Data를 받아서 쓰는 곳)과 data source를 디커플링해준다. 

Observable의 가장 큰 특징은 데이터를 계속 가져오고 즉시에 데이터를 보내주기 때문에, 소비자(consumer)는 여러번 통지(notify)를 받게 된다.

이 것이 모든 데이터가 가져온 후에 callback으로 알게 되는 Asynchronous 시스템과의 차이다. 


Observable은 finite / infinite 두가지 타입이 있다. finite는 우리의 예시와 같은 경우이고, infinite는 주식 가격과 관련된 경우이다.

아래 코드는 예제에서 Observable을 사용한 것이다.

 Observable < DZoneDoc > getAllDocs() {
return Observable.fromArray(DzoneDBDao.get().getAllDocFromDB());
}


RxJava - Observer

Observer는 자기가 관심있는 객체의 상태가 변한 경우 통지(notify)를 받기 원하는 객체이다.  그렇기 때문에 몇가지 액션을 취할 수 있다. 대신, Observer는 통지를 받기 위해 Observable을 구독(subscribe)하여야 한다.

Observer 3가지 액션

  • OnNext : 데이터/이벤트가 내보내질 때 호출이 됨.
  • OnError : 데이터를 가져오다가 에러가 나는 경우 호출이 됨
  • OnCompleted : Observable가 모든 데이터를 보내고 나서 호출이 됨

우리가 이해한 이 2가지 객체를 예제에 적용해보자. 

public class DZoneUI {
private DZoneSearchService dzoneService = new DZoneSearchService();
public void printAllDocs() {
dzoneService.getAllDocs().subscribe(System.out::println); //dzonedocs
}
public static void main(String[] args) {
DZoneUI UI = new DZoneUI();
UI.printAllDocs();
}
}
DZoneDoc [name=Java Microservice, type=Refcardz]
DZoneDoc [name=RX Java, type=Article]
DZoneDoc [name=IOT in Action, type=Refcardz]
DZoneDoc [name=Java8 in Action, type=Refcardz]


실제 개발에서는 search service는 REST API로 호출될 것이다. RxJava를 적용한 REST API 개발을 할 때, Spring 5 Reactive Web을 사용하면 된다. ( 이와 관련된 포스팅은 다음에! )


Map 함수 이용하기

서비스로부터 데이터를 가져왔지만, 사용하는 클라이언트에서 재가공을 해야 하는 경우가 많다. 데이터를 UI에 보여줄 때, 비즈니스 요구에 따라 보여주는 방식은 자주 변할 것이다. 이때마다, API를 뚫어주는 것은 불가능에 가깝다. 즉, 사용하는 클라이언트 쪽에서 그때그때 가공을 해줘야 한다.

밑의 예제를 살펴보면, doc의 이름만을 출력하기 위해 map()를 사용하였다. UI는 이제 Observable<String>을 구독하게 된 것이다. 이것을  Operator라 부른다.

public class DZoneUI {
private DZoneSearchService dzoneService = new DZoneSearchService();
public void printAllDocsName() {
dzoneService.getAllDocs().map(doc - > doc.getName()).subscribe(System.out::println); // dzonedocs
}
public static void main(String[] args) {
DZoneUI UI = new DZoneUI();
UI.printAllDocsName();
}
}


RxJava - Dynamic Filtering

Runtime에 데이터를 필터링해야 하는 경우가 있다. 이럴 때, filter() 함수를 사용하면 된다.

아래는 두가지 필터를 적용한 예제이다. Filter chain pattern처럼 여러개의 필터가 동작하고 있다.
import io.reactivex.observables.GroupedObservable;

public class DZoneUI {
private DZoneSearchService dzoneService = new DZoneSearchService();
public void printJavaRefCardz() {
dzoneService.getAllDocs().filter(docs - > "Refcardz".equalsIgnoreCase(docs.getType()))
.filter(doc - > doc.getName().contains("Java"))
.subscribe(System.out::println); // dzonedocs
}
public static void main(String[] args) {
DZoneUI UI = new DZoneUI();
UI.printJavaRefCardz();
}
}


RxJava를 적용하면서 주의해야 할 점

  1. RxJava는 non-blocking이다. 절대로 응답을 기다리는 toBlocking() 함수를 호출하지 말자
  2. Filter 순서를 최적화하자. 많이 거를수 있는 Filter를 먼저 사용하자.