π‘ flow μΈν°νμ΄μ€μ flow λΉλκ° μ€μ λ‘ μ΄λ»κ² λμνλμ§ μ΄ν΄νκΈ°
Flow μ§μ ꡬν
- νλ‘μ°μ μ리μ λν΄ μ΄ν΄νκΈ° μν΄μ μ§μ μ½λλ‘ κ΅¬ννμμ΅λλ€.
FlowCollector
- λλ€μμ ν΅ν΄μ νλ‘μ° λμμ ꡬνν μ μμ΅λλ€.
- λλ€μμ λν ν¨μλ₯Ό λνλ΄λ νλΌλ―Έν°λ₯Ό κ°μ§ μ μμ΅λλ€.
- flow λμμ μ΄ν΄νκΈ° μν΄μ ν΄λΉ νλΌλ―Έν°λ₯Ό emitμΌλ‘ μ μνμμ΅λλ€.
// μ΄μ μ μ½λλ ν¨μλ₯Ό νλΌλ―Έν°λ‘ μ λ¬ν©λλ€.
// 볡μ‘ν ν¨μνμ κ°κ²°νκ² λ§λ€κΈ° μνμ¬ ν¨μν μΈν°νμ΄μ€λ‘ μΆμνν©λλ€.
private suspend fun <T> before(){
val f: suspend ((T) -> Unit) -> Unit = { emit ->
emit("flow λ°©μΆ" as T)
}
}
- μ΄ λ emitμ μ€λ¨ ν¨μλ‘ κ΅¬νλμ΄μΌ ν©λλ€.
- flowλ λΉλκΈ° μ€νΈλ¦ΌμΌλ‘, λ°μ΄ν°λ₯Ό μμ°¨μ μΌλ‘ λ°©μΆνλ©΄μλ μΌμ μ€λ¨μ΄ κ°λ₯ν΄μΌ ν©λλ€.
- νλ¦μ μ€λ¨νκ³ μ¬κ°νλ λ°©μμ νμ©ν μ μμ΄μΌ ν©λλ€.
- ν¨μν νλΌλ―Έν°λ₯Ό νμ©νλ©΄ μ½λκ° λ³΅μ‘νκ³ κ°λ
μ±μ΄ λ¨μ΄μ§λλ€.
- μ΄λ₯Ό ν΄κ²°νκΈ° μν΄μ FlowCollector ν¨μν μΈν°νμ΄μ€λ₯Ό μ μνμμ΅λλ€.
// ν¨μν μΈν°νμ΄μ€λ₯Ό νμ©νμ¬ λλ€μμΌλ‘ μ μν©λλ€.
fun interface FlowCollector<T> {
suspend fun emit(value: T)
}
// λλ€μ λ΄λΆμ FlowColletor νμ
μ 리μλ²λ₯Ό μμ±νμ΅λλ€.
// μ΄λ₯Ό νμ©νλ©΄ it.emit() νμμ΄ μλ emit() νμμΌλ‘ νΈμΆμ΄ κ°λ₯ν©λλ€.
private suspend fun <T> after(){
val f: suspend FlowCollector<T>.() -> Unit = {
emit("flow λ°©μΆ" as T)
}
}
Flow μΈν°νμ΄μ€
- λλ€μμ μ λ¬νλ λμ μ, μΈν°νμ΄μ€λ₯Ό ꡬννλ κ°μ²΄λ₯Ό λ§λλ κ²μ΄ κ°λ μ±κ³Ό μ μ§λ³΄μμ μ’μ΅λλ€.
// κΈ°μ‘΄μ FlowCollectorλ₯Ό λ€μκ³Ό κ°μ΄ μ¬μ©ν μ μμ΅λλ€.
// λλ€μ μ λ¬μ κ°λ
μ±μ΄ λ¨μ΄μ§λ―λ‘, μΈν°νμ΄μ€λ₯Ό ꡬνν κ°μ²΄λ₯Ό μμ±ν©λλ€.
private suspend fun <T> before() {
val f: suspend FlowCollector<T>.() -> Unit = {
emit("flow λ°©μΆ" as T)
}
f {
println(it) // flow λ°©μΆ
}
}
- μΈν°νμ΄μ€ μ΄λ¦μ FlowλΌ νκ³ , ν΄λΉ μΈν°νμ΄μ€μ μ μλ₯Ό κ°μ²΄ ννμμΌλ‘ λνν©λλ€.
interface Flow<T> {
suspend fun collect(collector: FlowCollector<T>)
}
private suspend fun <T> after() {
val flow: Flow<T> = object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.emit("flow λ°©μΆ" as T)
}
}
flow.collect {
println(it)
}
}
FlowBuilder
- νλ‘μ° μμ± λ‘μ§μ λ§€λ² κ΅¬ννκΈ°μλ 볡μ‘ν©λλ€.
- μ΄λ₯Ό ν΄κ²°νκΈ° μν΄μ νλ‘μ°λ₯Ό μμ±νλ flow λΉλλ₯Ό μ μνμμ΅λλ€.
// 볡μ‘ν νλ‘μ° μμ± λ‘μ§μ λ΄λΆλ‘ μΊ‘μνν©λλ€.
fun <T> flowBuilder(
builder: suspend FlowCollector<T>.() -> Unit
) = object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.builder()
}
}
Custom Flow
- Flow, FlowCollector, flowBuilderλ₯Ό ν΅ν΄μ μ€μ Flowμ λμΌν λ°©μμ μ½λλ₯Ό μμ±νμμ΅λλ€.
// νλ‘μ°μ λκ°μ΄ λμν©λλ€.
suspend fun main() {
val f: Flow<String> = flowBuilder {
emit("flow λ°©μΆ 1")
emit("flow λ°©μΆ 2")
}
f.collect {
println("μμ§1 : $it")
}
f.collect {
println("μμ§2 : $it")
}
}
- collectλ₯Ό νΈμΆνλ©΄, flow λΉλλ₯Ό νΈμΆν λ λ£μ λλ€μμ΄ νΈμΆλ©λλ€.
- μ΄λ νλ‘μ° μλ μ리μ κ°μ΅λλ€.
Flow μ²λ¦¬ λ°©μ
- νλ‘μ°μ κ° μμλ₯Ό λ°ννλ map ν¨μλ μ½κ² μ μν μ μμ΅λλ€.
- flowλ₯Ό μλ‘ μμ±νλ λ‘μ§μ΄κΈ° λλ¬Έμ νλ‘μ° λΉλλ₯Ό νμ©ν©λλ€.
- λννκ³ μλ νλ‘μ°λ₯Ό μμνκ² λλ―λ‘, λΉλ λ΄λΆμμ collect λ©μλλ₯Ό νΈμΆν©λλ€.
- μ΄ μΈμ onEach, onStart, filterλ μ½κ² μ μ κ°λ₯ν©λλ€.
flowBuilder - map
// μμλ₯Ό λ³ννμ¬ μλ‘μ΄ νλ‘μ° λ°©μΆ
fun <T, R> Flow<T>.map(
trasformation: suspend (T) -> R
): Flow<R> = flowBuilder {
collect {
emit(trasformation(it))
}
}
flowBuilder - filter
fun <T> Flow<T>.filter(
predicate: suspend (T) -> Boolean
): Flow<T> = flowBuilder {
collect {
if (predicate(it)) {
emit(it)
}
}
}
flowBuilder - onEach
fun <T> Flow<T>.onEach(
action: suspend (T) -> Unit
): Flow<T> = flowBuilder {
collect {
action(it) // μΆκ° λμ μν
emit(it)
}
}
flowBuilder - onStart
fun <T> Flow<T>.onStart(
action: suspend () -> Unit
): Flow<T> = flowBuilder {
action() // μΆκ° λμμ ν λ²λ§ μν
collect{
emit(it)
}
}
flowBuilder - flowOf
// νλ‘μ° μμ± ν¨μλ‘ listOfμ λΉμ·νκ² λμ
fun <T> flowOf(vararg elements: T): Flow<T> = flowBuilder {
elements.forEach {
emit(it)
}
}
Ex
suspend fun main() {
flowOf("A", "B", "C")
.filter {
it != "A"
}
.onEach {
println("B or C")
}
.onStart {
println("μμλμμ΅λλ€")
}
.map {
delay(1000)
it.plus(it)
}
.collect {
println(it)
}
}
λκΈ°λ‘ μλνλ Flow
- νλ‘μ° λν μ€λ¨ ν¨μμ²λΌ λκΈ°λ‘ μλνκΈ° λλ¬Έμ, νλ‘μ°κ° μλ£λ λκΉμ§ collect νΈμΆμ΄ μ€λ¨λ©λλ€.
νλ‘μ°λ μλ‘μ΄ μ½λ£¨ν΄μ μμνμ§ μμ΅λλ€. κ°κ°μ μ²λ¦¬ λ¨κ³λ λκΈ°λ‘ μ€νλκΈ° λλ¬Έμ, onEach λ΄λΆμ delayκ° μμΌλ©΄ λͺ¨λ μμκ° μ²λ¦¬λκΈ° μ μ΄ μλ κ° μμ μ¬μ΄μ μ§μ°μ΄ μκΉλλ€.
suspend fun main() {
flowOf("A","B","C")
.onEach { delay(1000) }
.collect{ println(it) }
}