Flow, Channel, ChannelFlow, CallbackFlow
DoDoBest
·2024. 8. 15. 21:39
Flow와 Channel의 목적
Flow : 데이터를 생성하고 처리하는데 사용할 수 있습니다.
Channel : 코루틴 간에 데이터를 주고 받는데 사용할 수 있습니다.
Flow & Channel 비교
Flow는 비동기적으로 생성된 데이터를 전달하는 스트림으로, 관찰자가 없으면 데이터를 생성하지 않는 Cold source 입니다.
기본적으로 Flow는 생성된 데이터가 collect 함수에 의해 소비될 때까지 다음 데이터를 생성하지 않습니다. 단, buffer 함수를 통해 버퍼를 설정하면, 버퍼 크기 + 1 만큼의 데이터를 미리 생성할 수 있습니다. 버퍼가 있는 Flow는 Hot source 입니다. onBufferOverflow 파라미터를 이용해 버퍼가 꽉찼을 때 동작을 정의할 수 있으며, 다음의 3가지 종류가 있습니다.
- SUSPEND : 버퍼가 비워질 때까지 중단됩니다.
- DROP_OLDEST : 버퍼에서 가장 오래된 데이터를 drop하고, 새로운 데이터를 버퍼에 입력합니다. 중단 되지 않습니다.
- DROP_LATEST : 최신 데이터를 drop하기 때문에 buffer의 데이터는 바뀌지 않습니다. 중단 되지 않습니다.
Flow는 Coroutine Context 내부에서 비동기적으로 작동하여 Main Thread를 blocking 하지 않고, 다른 코루틴이 실행될 수 있도록 스레드 대기열로 진입합니다.
동기적으로 데이터를 생성하고 전달하는 Cold 스트림으로는 Sequence가 있으며, 동기적으로 데이터를 생성하기 때문에 main thread를 blocking 할 수 있습니다.
Channel은 코루틴 간에 데이터를 전달하는 스트림으로, 관찰자가 없어도 데이터를 생성할 수 있는 Hot source 입니다. 단, 버퍼가 없을 경우 send 함수는 consumer가 준비될 때까지 중단됩니다.
Channel에서 데이터를 생성하고 전달하는 코루틴을 producer, 데이터를 전달받아 소비하는 코루틴을 consumer라고 합니다. 한 개의 channel에 여러 개의 producer와 여러 개의 consumer가 있을 수 있습니다. 여러 consumer가 있더라도 하나의 데이터는 단일 consumer에게만 전달됩니다. 전달 되는 순서는 channel에 등록한 순서입니다.
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) {
delay(1000)
println("send : $it")
channel.send(it)
}
}
launch {
repeat(3) {
val received = channel.receive()
println("[First] receive : $received")
}
}
launch {
repeat(2) {
val received = channel.receive()
println("[Second] receive : $received")
}
}
}
/**
*
* send : 0
* [First] receive : 0
* send : 1
* [Second] receive : 1
* send : 2
* [First] receive : 2
* send : 3
* [Second] receive : 3
* send : 4
* [First] receive : 4
*/
반면, Flow는 collect 함수가 호출될 때마다 flow 빌더 로직에 따라 매번 새롭게 데이터를 생성합니다.
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
val flow = flow {
repeat(5) {
delay(1000)
println("emit : $it")
emit(it)
}
}
launch {
flow.collect {
println("[First] collect : $it")
}
}
launch {
flow.collect {
println("[Second] collect : $it")
}
}
}
/**
* emit : 0
* emit : 0
* [Second] collect : 0
* [First] collect : 0
* emit : 1
* emit : 1
* [Second] collect : 1
* [First] collect : 1
* emit : 2
* emit : 2
* [First] collect : 2
* [Second] collect : 2
* emit : 3
* emit : 3
* [First] collect : 3
* [Second] collect : 3
* emit : 4
* emit : 4
* [Second] collect : 4
* [First] collect : 4
*/
Flow는 flow 빌더 내부에서 emit 함수를 통해 데이터를 송신하고 collect 함수를 통해 데이터를 수신하기 때문에, 데이터는 단방향으로 흐릅니다.
Channel은 SendChannel, ReceiveChannel 인터페이스와 이 둘을 상속하는 Channel 인터페이스로 구성됩니다. Producer는 SendChannel을 통해 데이터를 전송하고, Consumer는 ReceiveChannel을 통해 데이터를 수신합니다. Channel 인터페이스를 통해 전송과 수신이 모두 가능하기 때문에 데이터는 양방향으로 흐를 수 있습니다.
Channel은 기본적으로 버퍼가 없는 랑데뷰 채널(rendezvous channel)로 동작하며, 이 경우 producer는 consumer가 없으면 데이터를 생성하지 않고 suspend 상태가 됩니다. 버퍼 종류에 따른 채널을 정리하면 다음과 같습니다.
- 무제한(Unlimited) : 제한이 없는 용량 버퍼를 Channel.UNLIMITED로 설정된 채널로, send가 중단되지 않습니다.
- 버퍼(Buffered) : 특정 용량 크기 또는 Channel.BUFFERED(기본값은 64)로 설정된 채널입니다.
- 랑데뷰(Rendezvous) : 용량이 0이거나 Channel.RENDEZVOUS(이것도 용량이 0)인 채널로, 송신자와 수신자가 만날 때만 원소를 교환합니다.
- 융합(conflated) : 버퍼 크기가 1이거나 Channel.CONFLATED인 채널로, 새로운 원소가 이전 원소를 대체한다.
ChannelFlow
ChannelFlow는 CoroutineScope와 SendChannel을 상속하는 ProducerScope을 이용하는 Flow로써, 버퍼를 가지는 Flow라고 할 수 있습니다. 기본적인 버퍼 크기는 64이며, buffer 함수를 통해 버퍼 크기를 설정할 수 있고, 버퍼가 꽉찼을 때의 동작을 정의할 수 있습니다.
ChannelFlow는 Flow를 merge할 때 사용할 수 있습니다. Flow 빌더 block의 수신 객체는 FlowCollector이기 때문에 CoroutineScope을 필요로 하는 launch 블록을 사용할 수 없습니다. 또한, FlowCollector는 thread-safe 하지 않고, 동시에 emit 하는 것을 금지하기 때문에 CoroutineScope을 지정하여 launch 블록을 사용할 수도 없습니다.
flow의 람다 블록은 SAM 인터페이스인 FlowCollector의 emit 함수를 구현할 뿐, CoroutineScope을 제공하지 않습니다.
반면 ChannelFlow는 CoroutineScope을 상속하는 ProducerScope을 이용하기 때문에 launch 블록을 사용할 수 있습니다.
아래 예제는 CoroutineScope에 따른 Flow와의 차이를 보여줍니다.
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
val abcFlow = flow {
('A'..'E').forEach {
delay(50)
emit(it)
}
}
val numFlow = flow {
(1..5).forEach {
delay(50)
emit(it)
}
}
val normalFlow = flow {
abcFlow.collect {
emit(it)
}
numFlow.collect {
emit(it)
}
}
normalFlow.collect {
print("$it ") // A B C D E 1 2 3 4 5
}
val channelFlow = channelFlow {
launch {
abcFlow.collect {
send(it)
}
}
numFlow.collect {
send(it)
}
}
println()
channelFlow.collect {
print("$it ") // 1 A B 2 3 C D 4 5 E ( 동시에 emit 되기 때문에 결과는 실행할 때마다 달라질 수 있다. )
}
}
ChannelFlow의 또 다른 장점은 non-suspend 함수인 trySend를 이용할 수 있다는 점입니다. trySend는 버퍼가 꽉차서 emit에 실패하더라도 기다리지 않고 데이터를 버립니다.
trySendBlocking을 이용하면 emit에 실패할 경우 기다린 후 데이터를 다시 emit 할 수 있습니다. trySendBlocking은 non-suspend 함수이나, emit에 실패할 경우 runBlocking 빌더를 실행하고, 빌더 내부에서 send 함수를 호출해 중단시킵니다.
CallbackFlow
CallbackFlow는 외부에서 정해지지 않은 시점에 데이터를 send 할 수 있도록 해주는 Flow 빌더입니다. awaitClose 함수를 이용해 Channel이 cancel 되거나 close 될 때까지 중단 시킵니다. awaitClose는 반드시 CallbackFlow 람다 블록의 마지막에 선언해줘야 합니다. 마지막에 선언하지 않으면 Exception이 발생하지는 않으나, awaitClose 하단에 있는 코드는 실행되지 않기 때문에 마지막에 선언하는 것이 적절합니다. awaitClose 자체를 선언하지 않으면 런타임에 IllegalStateException이 발생합니다.
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.launch
suspend fun main(): Unit = coroutineScope {
var sendData: (data: Int) -> Unit = { }
var closeChannel: () -> Unit = { }
launch {
callbackFlow {
for (i in 1..5) trySend(i)
sendData = { data -> trySend(data) }
closeChannel = { close() }
awaitClose {
sendData = {}
closeChannel = {}
}
}.collect { println(it) }
}
delay(1000) // delay를 걸지 않으면 sendData 함수가 launch의 코루틴 블록보다 먼저 실행된다.
println("Sending 6")
sendData(6)
delay(1000)
println("Sending 7")
sendData(7)
delay(1000)
println("Sending 8 after closeChannel")
closeChannel()
sendData(7)
}
/**
* 1
* 2
* 3
* 4
* 5
* Sending 6
* 6
* Sending 7
* 7
* Sending 8 after closeChannel
*/
StateFlow, SharedFlow와 CallbackFlow 비교
SharedFlow는 broadcast 방식으로 모든 관찰자에게 동일한 데이터를 전달하는 Hot Flow입니다. 기본 값으로 버퍼 크기가 0으로 설정되기 때문에, 새로운 관찰자는 이전에 방출된 최신 데이터를 전달 받을 수 없습니다. Shared Flow는 절대 Complete 상태가 되지 않기 때문에 Shared Flow에 대한 collect 함수는 일반적으로 끝나지 않습니다.
StateFlow는 관찰자가 없어도 state를 유지하는 Hot Flow이기 때문에, 생성 시점에 초기값을 설정해줘야 합니다. 새로운 관찰자는 이전에 생성된 최신 데이터를 전달 받습니다. StateFlow는 SharedFlow를 상속하는 인터페이스입니다.
CallbackFlow는 관찰자가 없으면 inactive 상태인 Cold Flow입니다.
같은 값을 emit할 때, StateFlow의 관찰자는 state가 변경되지 않아 값을 전달받지 못합니다. SharedFlow와 CallbackFlow의 관찰자는 전달받을 수 있습니다.
StateFlow와 SharedFlow는 여러 명의 관찰자가 데이터를 모두 전달받을 수 있지만, CallbackFlow는 가장 최근에 관찰을 등록한 관찰자만이 데이터를 전달받을 수 있습니다.
참고자료
'학습' 카테고리의 다른 글
deprecated된 firebase dynamic link 없이 Firebase email link authentication 구현하기 - 1 (2) | 2024.09.11 |
---|---|
Safe Navigation Action (0) | 2024.08.22 |
RecyclerView Itemdecoration 올바르게 사용하기 (0) | 2024.07.24 |
왜 repeatOnLifecycle 앞에 viewLifecycleOwner를 붙여야할까? (0) | 2024.06.26 |
Iterable, Iterator, Collection (0) | 2024.05.21 |