Kotlin
[Kotlin] 16.2 Cold Flow
mygomii
2025. 6. 25. 18:58
반응형
16.2.1 flow빌더 함수를 사용해 콜드 플로우 생성
- 새로운 콜드 플로우를 생성하는 것은 간단함
- 컬렉션과 마찬가지로 새로운 플로우를 생성할 수 있는 빌더 함수가 있음
- → 이 함수는 flow라 불림
- 빌더 함수의 블록 안에서는 emit 함수를 호출해 플로우의 수집자에게 값을 제공하고, 수집자가 해당 값을 처리할 때까지 빌더 함수의 실행을 중단함
- flow 가 받는 블록은 suspend 변경자가 붙어 있으므로 빌더 내부에서 delay 와 같은 다른 일시 중단함수를 호출할 수 있음
fun main() = flow {
println("Flow 시작됨")
emit("A") // <- emit 함수를 호출해 플로우의 수집자에게 값을 제공
delay(100)
emit("B")
}ㅇ
- 이 코드를 실행하면 실제로 아무런 출력도 나타나지 않는다는 점을 지적할만함
- 이는 빌더 함수가 연속적인 값의 스트림을 표현하는 Flow<T> 타입의 객체를 반환하기 때문
- 이 flow는 처음에 비활성 상태이며, 최종 연산자가 호출돼야만 빌더에서 정의된 계산이 시작됨
- 이로부터 flow가 cold라고 불리는 이유를 알 수 있음. 기본적으로 수집되기 시작할 때까지 비활성 상태이기 때문
16.2.2 Cold flow는 수집되기 전까지 작업을 수행하지 않는다
- Cold Flow는 collect 호출 전까지 아무 작업도 하지 않음
fun createValues(): Flow<String> = flow {
println("Flow 시작됨")
emit("A")
delay(100)
emit("B")
}
fun main(): Unit = runBlocking {
println("collect 호출 전")
createValues().collect { println("받음: $it") }
}
- collect 는 일시 중단 함수(suspend function)
- collect는 Flow 안에서 하나씩 방출되는 값을 비동기적으로 기다리며 수집해야 함
- 방출이 emit()으로 일어나는데, 이것도 suspend 함수이므로
- → 그걸 받는 collect도 자연스럽게 일시 중단 함수여야 함
16.2.3 flow 수집 취소
- Flow의 수집(collect)은 일시 중단(suspend) 함수이므로 취소가 가능
- 코루틴이 취소되면 collect도 중단되며, Flow 빌더 블록(flow {}) 내부의 실행도 함께 멈춤
val flow = flow {
emit(1)
delay(100)
emit(2)
delay(100)
emit(3)
}
val job = launch {
flow.collect { println(it) }
}
delay(150)
job.cancel() // 수집 도중 취소
16.2.4 Cold Flow의 내부 구현
- flow {}는 코틀린 표준 라이브러리에 정의된 빌더 함수
- 이 함수는 실제로는 Flow<T> 인터페이스의 구현체를 반환함
- 내부적으로는 람다와 클래스 구현을 조합한 익명 객체를 만들어냄
interface Flow<out T> {
suspend fun collect(collector: FlowCollector<T>)
}
- flow {}는 Flow 인터페이스를 구현한 익명 클래스를 생성함
- 내부적으로 emit(value) 호출은 collector.emit(value) 로 구현되어 있음.
- 즉, 우리가 작성한 flow 블록은 실제로는 다음과 같은 구조로 동작
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
// flow 블록 안의 코드가 이 안에 들어감
collector.emit(...)
}
}
- 왜 이렇게 설계됐을까? 🤔
- 지연 실행(lazy) 을 지원하고
- 코루틴을 활용해 비동기적이고 중단 가능한 흐름을 만들기 위함
- 구조적 동시성과 suspend 함수와의 자연스러운 연동을 위함
16.2.5 채널 플로우(channelFlow)를 사용한 동시성 플로우
- flow {}는 기본적으로 순차적으로 실행됨
- → emit()을 호출하면 수집자(collect) 가 값을 처리할 때까지 다음 코드로 진행하지 않음.
- 반면, channelFlow {} 는 생산자(emit) 와 소비자(collect) 가 동시성(concurrency) 을 갖고 실행될 수 있음.
- → 내부적으로 코루틴과 채널을 활용해 병렬 흐름을 구현함.
- 왜 필요할까? 🤔
- flow {}는 순차적인 플로우에는 좋지만, 여러 코루틴에서 값을 병렬로 생성하거나 다른 스레드에서 emit 해야 할 경우에는 channelFlow가 필요함
fun concurrentFlow(): Flow<Int> = channelFlow {
launch {
send(1)
}
launch {
send(2)
}
}
- 위 코드에서는 두 개의 코루틴이 병렬로 동작하면서 값을 보냄.
비교 항목flow {}channelFlow {}
실행 방식 | 순차적 | 병렬 / 동시성 가능 |
emit 방식 | suspend 함수 | send() 사용 (channel 기반) |
내부 구조 | 단일 코루틴 | 내부적으로 채널 + 다중 코루틴 |
사용 시점 | 단순 스트림 처리 | 동시성 필요할 때 (e.g. 여러 emit 병렬 처리) |
반응형