플로우 구현하기

플로우 만들기

  • 플로우는 어디선가 시작되어야 하며, 필요한 경우에 따라 다양하게 시작할 수 있습니다.

원시값을 가지는 플로우

  • 플로우를 가지는 가장 간단한 방법은 flowOf를 활용하는 것입니다.
suspend fun main() {
    flowOf(1, 2, 3, 4, 5)
        .collect { print(it) }
}

컨버터

  • asFlow 함수를 사용하여 Iterable, Iterator, Sequence를 Flow로 바꿀 수 있습니다.
// asFlow 함수를 이용해 즉시 사용 가능한 원소들의 플로우를 만들 수 있습니다.
suspend fun main(){
    sequenceOf(1,2,3,4,5) // sequenceOf
        .toSet() //setOf
        .toList() // listOf
        .asFlow()
        .collect {
            println(it)
        }
}

함수를 플로우로 변경

  • 플로우는 지연되는 하나의 값을 나타낼 때 사용될 수 있습니다.
  • 따라서 중단 함수를 플로우를 변환할 수 있습니다.
  • 이 때의 함수 결과 값은 플로우의 유일한 값이 됩니다.
// 일반 함수를 플로우로 변환할 수 있습니다.
// 함수 참조값이 필요하며, 코틀린에서는 ::를 활용합니다.
suspend fun getUserName(): String {
    delay(1000)
    return "UserName"
}

suspend fun main() {
    // 일반 함수 플로우로 활용
    ::getUserName
        .asFlow()
        .collect { println(it) }

    // 중단 함수를 플로우로 변환할 수 있습니다.
    // 이때 중단 함수의 결과가 플로우의 유일한 값이 됩니다.
    val function = suspend {
        // 중단 함수를 람다식으로 생성합니다.
        delay(1000)
        "UserName" // 결과 반환
    }

    function
        .asFlow()
        .collect { println(it) }
}

플로우 빌더

플로우를 만들 때 가장 많이 사용되는 방법이며, flow 함수를 먼저 호출한 후 람다식 내부에서 emit 함수를 사용해 다음 값을 방출합니다.

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    elements.forEach {
        emit(it)
    }
}
  • flow 빌더를 호출하면 객체를 생성하기만 합니다.
  • collect를 호출함으로써 collector 인터페이스의 block 함수를 호출합니다.
    • 이는 인터페이스로 정의 된 람다식입니다.
  • emit을 호출하면 정의된 람다식을 생성하고, collect를 호출하면 정의된 람다식을 시작합니다.
    • 이것이 플로우의 기본 동작 원리입니다.

채널 플로우

  • Flow는 콜드 데이터 스트림으로 필요할 때만 값을 생성합니다.
  • 사용자 목록을 로드하고 있을 때 미리 페이지를 받아오고 싶은 경우가 발생합니다.
    • 네트워크 호출을 증가시키지만, 결과를 더 빠르게 얻을 수 있습니다.
  • 데이터를 생성하고 소비하는 과정이 별개로 진행되어야 합니다.
    • 이는 핫 데이터 스트림의 특징이며, 플로우로 값을 반환하기 위해서 채널 + 플로우 방식이 필요합니다.
data class User(val name: String)
interface UserApi {
    suspend fun takePage(pageNumber: Int): List<User>
}
// Flow는 콜드 데이터 스트림으로, 필요할 때만 값을 생성합니다.
// 특정 상황 : 사용자가 첫 페이지에 있는 경우
// -> 더 많은 페이지를 요청할 필요없이 첫 값만 요청
// -> 필요할 때만 지연 요청
// -> 콜드 데이터 스트림 필요

// 반면에 원소를 처리하고 있을 때 미리 페이지를 받아오고 싶은 경우가 있습니다.
// - 데이터를 생성하고 소비하는 과정이 별개로 진행
// -> 이 경우 핫 데이터 스트림 + 콜드 데이터 스트림 필요
// 채널플로우 빌더 활용 : collect와 같은 일반 함수로 최종 연산으로 시작됨
// 한 번 시작하면, 리시버를 기다릴 필요 없이 분리된 코루틴에서 값을 생성 (채널의 특징)
// -> 페이지를 얻어오면서 동시에 사용자 확인
fun allUserFlow(api: UserApi): Flow<User> = channelFlow {
    var page = 0
    do {
        val users = api.takePage(page++)
        users.forEach { send(it) } //emit 대신 send 활용
    } while (users.isNotEmpty())
}
  • emit 대신에 send를 활용하며, 여러 개의 값을 독립적으로 계산해야 할 때 주로 사용합니다.
  • 다른 코루틴처럼 channelFlow도 모든 자식 코루틴이 종료 상태가 될 때까지 끝나지 않습니다.

콜백 플로우

  • 사용자의 클릭이나 활동 변화를 감지해야 하는 경우 사용할 수 있는 플로우입니다.
  • 콜백 함수를 통해 flow 스트림을 전달할 때 사용합니다.
  • 채널 플로우가 같은 역할을 수행할 수 있지만, 콜백 플로우는 콜백 함수를 래핑한다는 특징이 있습니다.
fun <T> flowFrom(api: CallbackBasedApi<T>): Flow<T> = callbackFlow {
    val callback = object : Callback<T> {
        override fun onNextValue(value: T) {
            trySendBlocking(value)
                .onFailure { throwable ->
                    // Downstream이 취소되었거나 실패한 경우, 여기에서 로그를 남길 수 있음
                }
        }
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }
        override fun onCompleted() {
            channel.close()
        }
    }
    api.register(callback)

    // 수집자가 취소되거나 'onCompleted'/'onApiError' 호출될 때까지 대기
    awaitClose { api.unregister() }
}

class CallbackBasedApi<T> {
    private var callback: Callback<T>? = null

    // 콜백 등록
    fun register(callback: Callback<T>) {
        this.callback = callback
    }

    // 콜백 해제
    fun unregister() {
        this.callback = null
    }
}

interface Callback<T> {
    fun onNextValue(value: T)
    fun onApiError(cause: Throwable)
    fun onCompleted()
}
  • 이 때 반드시 awaitClose를 호출하여야 합니다.
    • 그렇지 않으면 외부 취소 시 콜백 or 수신기가 누출될 수 있습니다.
  • 작업이 완료되면 flow or channel을 종료하여 메모리 누수를 방지해야 합니다.
    • close()는 값을 전달하지 않으면 정상 종료로 간주합니다.
    • cancel()은 작업을 중지하고 exception을 전달합니다.

요약

  • flow 또는 emptyFlow로 간단하게 플로우를 생성할 수 있고, asFlow, Flow 빌더를 통해 변환 플로우를 생성할 수 있습니다.
  • 채널의 특징을 가지는 channelFlow, callbackFlow도 활용할 수 있습니다.
  • 각 빌더는 용도가 다르기 때문에, 여러 종류의 빌더를 알아 두는 것이 좋습니다.