Kotlin

[Kotlin] 16.2 Cold Flow

mygomii 2025. 6. 25. 18:58
반응형

16.2.1 flow빌더 함수를 사용해 콜드 플로우 생성

  • 새로운 콜드 플로우를 생성하는 것은 간단함
  • 컬렉션과 마찬가지로 새로운 플로우를 생성할 수 있는 빌더 함수가 있음
  • → 이 함수는 flow라 불림
  • 빌더 함수의 블록 안에서는 emit 함수를 호출해 플로우의 수집자에게 값을 제공하고, 수집자가 해당 값을 처리할 때까지 빌더 함수의 실행을 중단함
  • flow 가 받는 블록은 suspend 변경자가 붙어 있으므로 빌더 내부에서 delay 와 같은 다른 일시 중단함수를 호출할 수 있음
fun main() = flow {
    println("Flow 시작됨")
    emit("A") // <- emit 함수를 호출해 플로우의 수집자에게 값을 제공
    delay(100)
    emit("B") 
}ㅇ
  • 이 코드를 실행하면 실제로 아무런 출력도 나타나지 않는다는 점을 지적할만함
  • 이는 빌더 함수가 연속적인 값의 스트림을 표현하는 Flow<T> 타입의 객체를 반환하기 때문
  •  flow는 처음에 비활성 상태이며, 최종 연산자가 호출돼야만 빌더에서 정의된 계산이 시작됨
  • 이로부터 flow가 cold라고 불리는 이유를 알 수 있음. 기본적으로 수집되기 시작할 때까지 비활성 상태이기 때문

16.2.2 Cold flow는 수집되기 전까지 작업을 수행하지 않는다

  • Cold Flow collect 호출 전까지 아무 작업도 하지 않음
fun createValues(): Flow<String> = flow {
    println("Flow 시작됨")
    emit("A")
    delay(100)
    emit("B")
}

fun main(): Unit = runBlocking {
    println("collect 호출 전")
    createValues().collect { println("받음: $it") }
}
  • collect 는 일시 중단 함수(suspend function)
  • collect는 Flow 안에서 하나씩 방출되는 값을 비동기적으로 기다리며 수집해야 함
  • 방출이 emit()으로 일어나는데, 이것도 suspend 함수이므로
  • → 그걸 받는 collect도 자연스럽게 일시 중단 함수여야 함

16.2.3 flow 수집 취소

  • Flow의 수집(collect)은 일시 중단(suspend) 함수이므로 취소가 가능
  • 코루틴이 취소되면 collect도 중단되며, Flow 빌더 블록(flow {}) 내부의 실행도 함께 멈춤
val flow = flow {
    emit(1)
    delay(100)
    emit(2)
    delay(100)
    emit(3)
}

val job = launch {
    flow.collect { println(it) }
}
delay(150)
job.cancel() // 수집 도중 취소

16.2.4 Cold Flow의 내부 구현

  • flow {}는 코틀린 표준 라이브러리에 정의된 빌더 함수
  • 이 함수는 실제로는 Flow<T> 인터페이스의 구현체를 반환함
  • 내부적으로는 람다와 클래스 구현을 조합한 익명 객체를 만들어냄
interface Flow<out T> {
    suspend fun collect(collector: FlowCollector<T>)
}
  • flow {}는 Flow 인터페이스를 구현한 익명 클래스를 생성함
  • 내부적으로 emit(value) 호출은 collector.emit(value) 로 구현되어 있음.
  • 즉, 우리가 작성한 flow 블록은 실제로는 다음과 같은 구조로 동작
return object : Flow<T> {
    override suspend fun collect(collector: FlowCollector<T>) {
        // flow 블록 안의 코드가 이 안에 들어감
        collector.emit(...)
    }
}
  • 왜 이렇게 설계됐을까? 🤔
    • 지연 실행(lazy) 을 지원하고
    • 코루틴을 활용해 비동기적이고 중단 가능한 흐름을 만들기 위함
    • 구조적 동시성과 suspend 함수와의 자연스러운 연동을 위함

16.2.5 채널 플로우(channelFlow)를 사용한 동시성 플로우

  • flow {}는 기본적으로 순차적으로 실행됨
  •  emit()을 호출하면 수집자(collect) 가 값을 처리할 때까지 다음 코드로 진행하지 않음.
  • 반면, channelFlow {} 는 생산자(emit) 와 소비자(collect) 가 동시성(concurrency) 을 갖고 실행될 수 있음.
  • → 내부적으로 코루틴과 채널을 활용해 병렬 흐름을 구현함.
  • 왜 필요할까? 🤔
    • flow {}는 순차적인 플로우에는 좋지만, 여러 코루틴에서 값을 병렬로 생성하거나 다른 스레드에서 emit 해야 할 경우에는 channelFlow가 필요함
fun concurrentFlow(): Flow<Int> = channelFlow {
    launch {
        send(1)
    }
    launch {
        send(2)
    }
}
  • 위 코드에서는 두 개의 코루틴이 병렬로 동작하면서 값을 보냄.

비교 항목flow {}channelFlow {}

실행 방식 순차적 병렬 / 동시성 가능
emit 방식 suspend 함수 send() 사용 (channel 기반)
내부 구조 단일 코루틴 내부적으로 채널 + 다중 코루틴
사용 시점 단순 스트림 처리 동시성 필요할 때 (e.g. 여러 emit 병렬 처리)
반응형