1. 마블다이어그램
2. flatMap
- Observable을 반환 합니다.
- map() 함수는 단일 데이터를 발행하지만, flatMap() 함수는 여러개의 데이터를 발행 합니다.
- 데이터 발행의 순서를 보장하지 않습니다.
3. concatMap
concatMap() 함수는 flatMap() 함수와 매우 비슷합니다. flatMap()은 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면 나중에 들어온 데이터의 처리 결과가 먼저 출력 될 수도 있습니다. 이를 interleaving(끼어들기) 라고 합니다. 하지만, concatMap() 함수는 먼저 들어온 데이터 순서대로 처리해서 결과를 낼수 있도록 보장해 줍니다.
fun concatMapTest() {
val startTime = System.currentTimeMillis()
val balls = arrayOf("1", "3", "5")
Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { it.toInt() }
.map { balls[it] }
.take( balls.size.toLong() )
.concatMap { ball ->
Observable.interval(200L, TimeUnit.MILLISECONDS)
.map { ball + "🔶" }
.take(2)
}.subscribe {
println("time : ${System.currentTimeMillis() - startTime}, item : ${it}")
}
Thread.sleep(2000)
}
출력 결과
concatMap()함수의 마블 다이어그램을 소스코드로 구현 하였습니다.
- 100ms 간격으로 interval() 함수는 ["1", "3", "5"] 데이터를 발행 합니다.
- balls[] 사이즈 만큼 데이터를 발행 합니다.
- 200ms 간격으로 interval() 함수는 ["1"] -> ["1🔶", "1🔶"]... 데이터를 발행 합니다.
- 따라서, [첫번째 "3🔶"] 출력이 [두번째 "1🔶"] 보다 일찍 출력이 되어야 하지만,
- concatMap() 함수는 입력된 데이터의 순서를 보장 합니다.
- 따라서, ["1🔶" -> "1🔶" -> "3🔶" -> "3🔶" -> "5🔶" -> "5🔶"]
- 총 걸린 시간 : 1405ms
4. flatMap
fun flatMapTest() {
val startTime = System.currentTimeMillis()
val balls = arrayOf("1", "3", "5")
Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { it.toInt() }
.map { balls[it] }
.take( balls.size.toLong() )
.flatMap { ball ->
Observable.interval(200L, TimeUnit.MILLISECONDS)
.map { ball + "🔶" }
.take(2)
}.subscribe {
println("time : ${System.currentTimeMillis() - startTime}, item : ${it}")
}
Thread.sleep(2000)
}
출력 결과
concatMap() 소스 코드에서 concatMap() -> flatMap()으로만 변경해 주었습니다.
- concatMap() 처럼 입력 데이터의 순서를 보장해주지 않으니, 순서가 위에서 예상한대로 출력 되었습니다.
- ["1🔶" -> "3🔶" -> "5🔶"] 는 100ms 간격으로 출력 되었습니다.
- [첫번째 "1🔶"] -> [두번째 "1🔶"]... 는 200ms 간격으로 출력 되었습니다.
- 총 걸린 시간 : 812ms
- concatMap()은 1402ms 걸린것에 비해, 시간이 단축 되었습니다.
- concatMap()은 데이터의 순서를 보장하기 위해(interleaving), 시간이 더 걸립니다.
fun flatMapTest2() {
val startTime = System.currentTimeMillis()
val balls = arrayOf("1", "3", "5")
Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { it.toInt() }
.map { balls[it] }
.take( balls.size.toLong() )
.doOnNext {
println("time : ${System.currentTimeMillis() - startTime}, item : ${it}")
}.flatMap { ball ->
Observable.interval(200L, TimeUnit.MILLISECONDS)
.map { ball + "🔶" }
.take(2)
}.subscribe {
println("time : ${System.currentTimeMillis() - startTime}, item : ${it}")
}
Thread.sleep(2000)
}
출력 결과
switchMap()함수를 보기전에, flatMap()이 호출 되기 전에, doOnNext()함수로 발행되는 데이터를 출력 시켜보겠습니다.
- "1🔶"가 출력 되기 전에, "5"가 발행 되는 것을 알수 있습니다.
5. switchMap
fun switchMapTest() {
val startTime = System.currentTimeMillis()
val balls = arrayOf("1", "3", "5")
Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { it.toInt() }
.map { balls[it] }
.take( balls.size.toLong() )
.switchMap { ball ->
Observable.interval(200L, TimeUnit.MILLISECONDS)
.map { ball + "🔶" }
.take(2)
}.subscribe {
println("time : ${System.currentTimeMillis() - startTime}, item : ${it}")
}
Thread.sleep(2000)
}
출력 결과
- 위 flatMap()의 doOnNext()를 추가한 결과를 보면,
- "1🔶"가 출력 되기 전에, "5"가 발행 되는 것을 알수 있습니다.
- 따라서, switchMap() 함수는 데이터 발행이 완료되지 않은 상태에서, 다른 데이터가 발행 된다면, 완료되지 않은 데이터는 발행되지 않고, 마지막 데이터의 발행만 보장 합니다.
'Reactive Programing' 카테고리의 다른 글
FlatMap 파헤치기 (0) | 2021.02.02 |
---|---|
Cold Observable vs Hot Observable (PublishSubject 클래스) (0) | 2020.11.22 |
리액티브 프로그래밍 (Reactive Programming) RxJava (0) | 2020.11.01 |
Java API를 RxJava와 통합 하는 방법 (0) | 2020.10.31 |
Flowable 배압(BackPressure) 대응 (0) | 2020.10.30 |