💻 프로그래밍

[RxJava] #3 Operators(1) 생성연산자

콩드로이드 2022. 8. 14. 16:44

2022.08.01 - [💻 프로그래밍] - [RxJava] #1

 

[RxJava] #1

안녕하세요 오늘은 비동기 프로그래밍 중 하나인 RxJava에 대해 알아보겠습니다 API와 통신할 때만 사용했기에, 더 많은 기능을 알고자 정리해보겠습니당 우선, 명령형 프로그래밍과 반응형 프로

kong-droid.com

 위 글에서 Operator는 공식문서 참조로 끝났지만, Operator를 실습해야 RxJava를 편하게 쓸 수 있을 거 같아 

포스팅을 쓰고자 합니다 🧐

 


생성 연산자 

Observable 생성 시 자주 쓰이는 연산자들에 대해 알아보겠습니다 

 

Just

• 인자 그대로 발행(어떤 변화 없이 그대로 발행),  인자 여러 개 선언 시 타입은 같아야 함 (최대 10개)

onNext, onComplete, onError는 자동 호출

• RxJava는 기본적으로 null을 허용하지 않기 때문에, null을 발행하면 에러 발생 ▶️  🔑 Observable.empty() 사용해 해결

Just

 

Observable.just("ㄱ","ㄴ","ㄷ","ㄹ","ㅁ","ㅂ")
          .subscribe {
             Log.d("Just", it)
          }


 

Create

• 개발자가 직접 Emitter(onNext, onComplete, onError)를 호출

onComplete와 onError는 같이 사용될 수 없고, 각각 호출된 후엔 데이터를 발행할 수 없음

Create

 

Observable.create<String> { subscriber ->
      try {
             subscriber.onNext("It's")
             subscriber.onNext("Successful")
             subscriber.onComplete()
          } 
      catch (e: Exception) {
             subscriber.onError(e)
          }
}.subscribe { Log.d("create", it) }

 


From

• 다른 타입의 데이터를 Observable로 변경

Future,  Iterable,  Array 등을 Observable로 변경

from

 

1) fromArray() : 배열을 observable로 변경, 아이템 순차적으로 발행

val items = arrayOf(0, 1, 2, 3, 4, 5)
val fromArray = Observable.fromArray(items)

fromArray.subscribe { 
	it.forEach {
    	   Log.d("fromArray", "$it") 
        } 
}

 

2) fromIterable() : ArrayList, HashSet .. 등을 observable화

3) fromFuture() : 비동기 작업의 결과를 구할 때 사용

4) fromPublisher() : Publisher(구독시점 이후에 아이템을 발행하는 생산자)를 observable화

5) fromCallable() : 비동기적인 실행결과를 반환


Defer

• 구독하기 전까지 Observable을 생성하지 않음

• 각 Observer에 매번 새로운 Observable을 생성

Defer

 

val observable = Observable.defer {
     Observable.just(LocalTime.now())
}
        
observable.subscribe { Log.d("defer", "$it") }
Thread.sleep(2000L)
observable.subscribe{ Log.d("defer", "$it") }

 

 


Empty, never

• empty : 아이템을 발행하진 않지만, 정상적으로 스트림을 종료 

• never : 아이템을 발행하지 않고 , 스트림을 종료시키지도 않음

Observable.empty<Any>()
	.doOnTerminate{ println("empty onComplete") }
    .subscribe()
    
Observable.never<Any>()
     .doOnTerminate { println("never onComplete") }
     .subscribe()

* doOnTerminate Observable의 onComplete()가 호출될 때 사용됨

empty onComplete만 출력됩니다 

 


interval

• 주어진 시간 간격의 순서대로 정수를 발행하는 Observable 생성

구독 중지 전까지 계속 배출되므로 구독 해제 필요

val dispose = Observable.interval(1, TimeUnit.MINUTES).subscribe()
Thread.sleep(5000)
dispose.dispose()

 


 

range

• 특정 범위의 정수를 순서대로 발행하는 Observable 생성

데이터 발행이 끝나면 스트림이 자동으로 종료 


궁금하신 점이나 의견이 있으시면 댓글 부탁드립니다 감사합니다 😊