ν”Œλ‘œμš° 직접 κ΅¬ν˜„ν•˜κΈ°

πŸ’‘ flow μΈν„°νŽ˜μ΄μŠ€μ™€ flow λΉŒλ”κ°€ μ‹€μ œλ‘œ μ–΄λ–»κ²Œ λ™μž‘ν•˜λŠ”μ§€ μ΄ν•΄ν•˜κΈ°

Flow 직접 κ΅¬ν˜„

  • ν”Œλ‘œμš°μ˜ 원리에 λŒ€ν•΄ μ΄ν•΄ν•˜κΈ° μœ„ν•΄μ„œ 직접 μ½”λ“œλ‘œ κ΅¬ν˜„ν•˜μ˜€μŠ΅λ‹ˆλ‹€.

FlowCollector

  • λžŒλ‹€μ‹μ„ ν†΅ν•΄μ„œ ν”Œλ‘œμš° λ™μž‘μ„ κ΅¬ν˜„ν•  수 μžˆμŠ΅λ‹ˆλ‹€.
  • λžŒλ‹€μ‹μ€ λ˜ν•œ ν•¨μˆ˜λ₯Ό λ‚˜νƒ€λ‚΄λŠ” νŒŒλΌλ―Έν„°λ₯Ό κ°€μ§ˆ 수 μžˆμŠ΅λ‹ˆλ‹€.
    • flow λ™μž‘μ„ μ΄ν•΄ν•˜κΈ° μœ„ν•΄μ„œ ν•΄λ‹Ή νŒŒλΌλ―Έν„°λ₯Ό emit으둜 μ •μ˜ν•˜μ˜€μŠ΅λ‹ˆλ‹€.
// μ΄μ „μ˜ μ½”λ“œλŠ” ν•¨μˆ˜λ₯Ό νŒŒλΌλ―Έν„°λ‘œ μ „λ‹¬ν•©λ‹ˆλ‹€.
// λ³΅μž‘ν•œ ν•¨μˆ˜ν˜•μ„ κ°„κ²°ν•˜κ²Œ λ§Œλ“€κΈ° μœ„ν•˜μ—¬ ν•¨μˆ˜ν˜• μΈν„°νŽ˜μ΄μŠ€λ‘œ μΆ”μƒν™”ν•©λ‹ˆλ‹€.
private suspend fun <T> before(){
    val f: suspend ((T) -> Unit) -> Unit = { emit ->
        emit("flow 방좜" as T)
    }
}
  • 이 λ•Œ emit은 쀑단 ν•¨μˆ˜λ‘œ κ΅¬ν˜„λ˜μ–΄μ•Ό ν•©λ‹ˆλ‹€.
    • flowλŠ” 비동기 슀트림으둜, 데이터λ₯Ό 순차적으둜 λ°©μΆœν•˜λ©΄μ„œλ„ μΌμ‹œ 쀑단이 κ°€λŠ₯ν•΄μ•Ό ν•©λ‹ˆλ‹€.
    • 흐름을 μ€‘λ‹¨ν•˜κ³  μž¬κ°œν•˜λŠ” 방식을 ν™œμš©ν•  수 μžˆμ–΄μ•Ό ν•©λ‹ˆλ‹€.
  • ν•¨μˆ˜ν˜• νŒŒλΌλ―Έν„°λ₯Ό ν™œμš©ν•˜λ©΄ μ½”λ“œκ°€ λ³΅μž‘ν•˜κ³  가독성이 λ–¨μ–΄μ§‘λ‹ˆλ‹€.
    • 이λ₯Ό ν•΄κ²°ν•˜κΈ° μœ„ν•΄μ„œ FlowCollector ν•¨μˆ˜ν˜• μΈν„°νŽ˜μ΄μŠ€λ₯Ό μ •μ˜ν•˜μ˜€μŠ΅λ‹ˆλ‹€.
// ν•¨μˆ˜ν˜• μΈν„°νŽ˜μ΄μŠ€λ₯Ό ν™”μš©ν•˜μ—¬ λžŒλ‹€μ‹μœΌλ‘œ μ •μ˜ν•©λ‹ˆλ‹€.
fun interface FlowCollector<T> {
    suspend fun emit(value: T)
}

// λžŒλ‹€μ‹ 내뢀에 FlowColletor νƒ€μž…μ˜ λ¦¬μ‹œλ²„λ₯Ό μƒμ„±ν–ˆμŠ΅λ‹ˆλ‹€.
// 이λ₯Ό ν™œμš©ν•˜λ©΄ it.emit() ν˜•μ‹μ΄ μ•„λ‹Œ emit() ν˜•μ‹μœΌλ‘œ 호좜이 κ°€λŠ₯ν•©λ‹ˆλ‹€.
private suspend fun <T> after(){
    val f: suspend FlowCollector<T>.() -> Unit = {
        emit("flow 방좜" as T)
    }
}

Flow μΈν„°νŽ˜μ΄μŠ€

  • λžŒλ‹€μ‹μ„ μ „λ‹¬ν•˜λŠ” λŒ€μ‹ μ—, μΈν„°νŽ˜μ΄μŠ€λ₯Ό κ΅¬ν˜„ν•˜λŠ” 객체λ₯Ό λ§Œλ“œλŠ” 것이 가독성과 μœ μ§€λ³΄μˆ˜μ— μ’‹μŠ΅λ‹ˆλ‹€.
// 기쑴의 FlowCollectorλ₯Ό λ‹€μŒκ³Ό 같이 μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.
// λžŒλ‹€μ‹ 전달은 가독성이 λ–¨μ–΄μ§€λ―€λ‘œ, μΈν„°νŽ˜μ΄μŠ€λ₯Ό κ΅¬ν˜„ν•œ 객체λ₯Ό μƒμ„±ν•©λ‹ˆλ‹€.
private suspend fun <T> before() {
    val f: suspend FlowCollector<T>.() -> Unit = {
        emit("flow 방좜" as T)
    }
    f {
        println(it) // flow 방좜
    }
}

  • μΈν„°νŽ˜μ΄μŠ€ 이름을 Flow라 ν•˜κ³ , ν•΄λ‹Ή μΈν„°νŽ˜μ΄μŠ€μ˜ μ •μ˜λ₯Ό 객체 ν‘œν˜„μ‹μœΌλ‘œ λž˜ν•‘ν•©λ‹ˆλ‹€.
interface Flow<T> {
    suspend fun collect(collector: FlowCollector<T>)
}

private suspend fun <T> after() {
    val flow: Flow<T> = object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.emit("flow 방좜" as T)
        }
    }

    flow.collect {
        println(it)
    }
}

FlowBuilder

  • ν”Œλ‘œμš° 생성 λ‘œμ§μ„ 맀번 κ΅¬ν˜„ν•˜κΈ°μ—λŠ” λ³΅μž‘ν•©λ‹ˆλ‹€.
  • 이λ₯Ό ν•΄κ²°ν•˜κΈ° μœ„ν•΄μ„œ ν”Œλ‘œμš°λ₯Ό μƒμ„±ν•˜λŠ” flow λΉŒλ”λ₯Ό μ œμž‘ν•˜μ˜€μŠ΅λ‹ˆλ‹€.
// λ³΅μž‘ν•œ ν”Œλ‘œμš° 생성 λ‘œμ§μ„ λ‚΄λΆ€λ‘œ μΊ‘μŠν™”ν•©λ‹ˆλ‹€.
fun <T> flowBuilder(
    builder: suspend FlowCollector<T>.() -> Unit
) = object : Flow<T> {
    override suspend fun collect(collector: FlowCollector<T>) {
        collector.builder()
    }
}

Custom Flow

  • Flow, FlowCollector, flowBuilderλ₯Ό ν†΅ν•΄μ„œ μ‹€μ œ Flow와 λ™μΌν•œ λ°©μ‹μ˜ μ½”λ“œλ₯Ό μž‘μ„±ν•˜μ˜€μŠ΅λ‹ˆλ‹€.
// ν”Œλ‘œμš°μ™€ λ˜‘κ°™μ΄ λ™μž‘ν•©λ‹ˆλ‹€.
suspend fun main() {
    val f: Flow<String> = flowBuilder {
        emit("flow 방좜 1")
        emit("flow 방좜 2")
    }

    f.collect {
        println("μˆ˜μ§‘1 : $it")
    }
    f.collect {
        println("μˆ˜μ§‘2 : $it")
    }
}
  • collectλ₯Ό ν˜ΈμΆœν•˜λ©΄, flow λΉŒλ”λ₯Ό ν˜ΈμΆœν•  λ•Œ 넣은 λžŒλ‹€μ‹μ΄ ν˜ΈμΆœλ©λ‹ˆλ‹€.
    • μ΄λŠ” ν”Œλ‘œμš° μž‘λ™ 원리와 κ°™μŠ΅λ‹ˆλ‹€.

Flow 처리 방식

  • ν”Œλ‘œμš°μ˜ 각 μ›μ†Œλ₯Ό λ°˜ν™˜ν•˜λŠ” map ν•¨μˆ˜λ„ μ‰½κ²Œ μ œμž‘ν•  수 μžˆμŠ΅λ‹ˆλ‹€.
  • flowλ₯Ό μƒˆλ‘œ μƒμ„±ν•˜λŠ” 둜직이기 λ•Œλ¬Έμ— ν”Œλ‘œμš° λΉŒλ”λ₯Ό ν™œμš©ν•©λ‹ˆλ‹€.
  • λž˜ν•‘ν•˜κ³  μžˆλŠ” ν”Œλ‘œμš°λ₯Ό μ‹œμž‘ν•˜κ²Œ λ˜λ―€λ‘œ, λΉŒλ” λ‚΄λΆ€μ—μ„œ collect λ©”μ„œλ“œλ₯Ό ν˜ΈμΆœν•©λ‹ˆλ‹€.
  • 이 외에 onEach, onStart, filter도 μ‰½κ²Œ μ œμž‘ κ°€λŠ₯ν•©λ‹ˆλ‹€.

flowBuilder - map

// μ›μ†Œλ₯Ό λ³€ν™˜ν•˜μ—¬ μƒˆλ‘œμš΄ ν”Œλ‘œμš° 방좜
fun <T, R> Flow<T>.map(
    trasformation: suspend (T) -> R
): Flow<R> = flowBuilder {
    collect {
        emit(trasformation(it))
    }
}

flowBuilder - filter

fun <T> Flow<T>.filter(
    predicate: suspend (T) -> Boolean
): Flow<T> = flowBuilder {
    collect {
        if (predicate(it)) {
            emit(it)
        }
    }
}

flowBuilder - onEach

fun <T> Flow<T>.onEach(
    action: suspend (T) -> Unit
): Flow<T> = flowBuilder {
    collect {
        action(it) // μΆ”κ°€ λ™μž‘ μˆ˜ν–‰
        emit(it)
    }
}

flowBuilder - onStart

fun <T> Flow<T>.onStart(
    action: suspend () -> Unit
): Flow<T> = flowBuilder {
    action() // μΆ”κ°€ λ™μž‘μ„ ν•œ 번만 μˆ˜ν–‰
    collect{
        emit(it)
    }
}

flowBuilder - flowOf

// ν”Œλ‘œμš° 생성 ν•¨μˆ˜λ‘œ listOf와 λΉ„μŠ·ν•˜κ²Œ λ™μž‘
fun <T> flowOf(vararg elements: T): Flow<T> = flowBuilder {
    elements.forEach {
        emit(it)
    }
}

Ex

suspend fun main() {
    flowOf("A", "B", "C")
        .filter {
            it != "A"
        }
        .onEach {
            println("B or C")
        }
        .onStart {
            println("μ‹œμž‘λ˜μ—ˆμŠ΅λ‹ˆλ‹€")
        }
        .map {
            delay(1000)
            it.plus(it)
        }
        .collect {
            println(it)
        }
}

λ™κΈ°λ‘œ μž‘λ™ν•˜λŠ” Flow

  • ν”Œλ‘œμš° λ˜ν•œ 쀑단 ν•¨μˆ˜μ²˜λŸΌ λ™κΈ°λ‘œ μž‘λ™ν•˜κΈ° λ•Œλ¬Έμ—, ν”Œλ‘œμš°κ°€ μ™„λ£Œλ  λ•ŒκΉŒμ§€ collect 호좜이 μ€‘λ‹¨λ©λ‹ˆλ‹€.

ν”Œλ‘œμš°λŠ” μƒˆλ‘œμš΄ 코루틴을 μ‹œμž‘ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. 각각의 처리 λ‹¨κ³„λŠ” λ™κΈ°λ‘œ μ‹€ν–‰λ˜κΈ° λ•Œλ¬Έμ—, onEach 내뢀에 delayκ°€ 있으면 λͺ¨λ“  μ›μ†Œκ°€ 처리되기 전이 μ•„λ‹Œ 각 μ›μ†Œ 사이에 지연이 μƒκΉλ‹ˆλ‹€.

suspend fun main() {
    flowOf("A","B","C")
        .onEach { delay(1000) }
        .collect{ println(it) }
}