플로우 만들기
- 플로우는 어디선가 시작되어야 하며, 필요한 경우에 따라 다양하게 시작할 수 있습니다.
원시값을 가지는 플로우
- 플로우를 가지는 가장 간단한 방법은 flowOf를 활용하는 것입니다.
suspend fun main() {
flowOf(1, 2, 3, 4, 5)
.collect { print(it) }
}
컨버터
- asFlow 함수를 사용하여 Iterable, Iterator, Sequence를 Flow로 바꿀 수 있습니다.
// asFlow 함수를 이용해 즉시 사용 가능한 원소들의 플로우를 만들 수 있습니다.
suspend fun main(){
sequenceOf(1,2,3,4,5) // sequenceOf
.toSet() //setOf
.toList() // listOf
.asFlow()
.collect {
println(it)
}
}
함수를 플로우로 변경
- 플로우는 지연되는 하나의 값을 나타낼 때 사용될 수 있습니다.
- 따라서 중단 함수를 플로우를 변환할 수 있습니다.
- 이 때의 함수 결과 값은 플로우의 유일한 값이 됩니다.
// 일반 함수를 플로우로 변환할 수 있습니다.
// 함수 참조값이 필요하며, 코틀린에서는 ::를 활용합니다.
suspend fun getUserName(): String {
delay(1000)
return "UserName"
}
suspend fun main() {
// 일반 함수 플로우로 활용
::getUserName
.asFlow()
.collect { println(it) }
// 중단 함수를 플로우로 변환할 수 있습니다.
// 이때 중단 함수의 결과가 플로우의 유일한 값이 됩니다.
val function = suspend {
// 중단 함수를 람다식으로 생성합니다.
delay(1000)
"UserName" // 결과 반환
}
function
.asFlow()
.collect { println(it) }
}
플로우 빌더
플로우를 만들 때 가장 많이 사용되는 방법이며, flow 함수를 먼저 호출한 후 람다식 내부에서 emit 함수를 사용해 다음 값을 방출합니다.
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
elements.forEach {
emit(it)
}
}
- flow 빌더를 호출하면 객체를 생성하기만 합니다.
- collect를 호출함으로써 collector 인터페이스의 block 함수를 호출합니다.
- 이는 인터페이스로 정의 된 람다식입니다.
- emit을 호출하면 정의된 람다식을 생성하고, collect를 호출하면 정의된 람다식을 시작합니다.
- 이것이 플로우의 기본 동작 원리입니다.
채널 플로우
- Flow는 콜드 데이터 스트림으로 필요할 때만 값을 생성합니다.
- 사용자 목록을 로드하고 있을 때 미리 페이지를 받아오고 싶은 경우가 발생합니다.
- 네트워크 호출을 증가시키지만, 결과를 더 빠르게 얻을 수 있습니다.
- 데이터를 생성하고 소비하는 과정이 별개로 진행되어야 합니다.
- 이는 핫 데이터 스트림의 특징이며, 플로우로 값을 반환하기 위해서 채널 + 플로우 방식이 필요합니다.
data class User(val name: String)
interface UserApi {
suspend fun takePage(pageNumber: Int): List<User>
}
// Flow는 콜드 데이터 스트림으로, 필요할 때만 값을 생성합니다.
// 특정 상황 : 사용자가 첫 페이지에 있는 경우
// -> 더 많은 페이지를 요청할 필요없이 첫 값만 요청
// -> 필요할 때만 지연 요청
// -> 콜드 데이터 스트림 필요
// 반면에 원소를 처리하고 있을 때 미리 페이지를 받아오고 싶은 경우가 있습니다.
// - 데이터를 생성하고 소비하는 과정이 별개로 진행
// -> 이 경우 핫 데이터 스트림 + 콜드 데이터 스트림 필요
// 채널플로우 빌더 활용 : collect와 같은 일반 함수로 최종 연산으로 시작됨
// 한 번 시작하면, 리시버를 기다릴 필요 없이 분리된 코루틴에서 값을 생성 (채널의 특징)
// -> 페이지를 얻어오면서 동시에 사용자 확인
fun allUserFlow(api: UserApi): Flow<User> = channelFlow {
var page = 0
do {
val users = api.takePage(page++)
users.forEach { send(it) } //emit 대신 send 활용
} while (users.isNotEmpty())
}
- emit 대신에 send를 활용하며, 여러 개의 값을 독립적으로 계산해야 할 때 주로 사용합니다.
- 다른 코루틴처럼 channelFlow도 모든 자식 코루틴이 종료 상태가 될 때까지 끝나지 않습니다.
콜백 플로우
- 사용자의 클릭이나 활동 변화를 감지해야 하는 경우 사용할 수 있는 플로우입니다.
- 콜백 함수를 통해 flow 스트림을 전달할 때 사용합니다.
- 채널 플로우가 같은 역할을 수행할 수 있지만, 콜백 플로우는 콜백 함수를 래핑한다는 특징이 있습니다.
fun <T> flowFrom(api: CallbackBasedApi<T>): Flow<T> = callbackFlow {
val callback = object : Callback<T> {
override fun onNextValue(value: T) {
trySendBlocking(value)
.onFailure { throwable ->
// Downstream이 취소되었거나 실패한 경우, 여기에서 로그를 남길 수 있음
}
}
override fun onApiError(cause: Throwable) {
cancel(CancellationException("API Error", cause))
}
override fun onCompleted() {
channel.close()
}
}
api.register(callback)
// 수집자가 취소되거나 'onCompleted'/'onApiError' 호출될 때까지 대기
awaitClose { api.unregister() }
}
class CallbackBasedApi<T> {
private var callback: Callback<T>? = null
// 콜백 등록
fun register(callback: Callback<T>) {
this.callback = callback
}
// 콜백 해제
fun unregister() {
this.callback = null
}
}
interface Callback<T> {
fun onNextValue(value: T)
fun onApiError(cause: Throwable)
fun onCompleted()
}
- 이 때 반드시 awaitClose를 호출하여야 합니다.
- 그렇지 않으면 외부 취소 시 콜백 or 수신기가 누출될 수 있습니다.
- 작업이 완료되면 flow or channel을 종료하여 메모리 누수를 방지해야 합니다.
- close()는 값을 전달하지 않으면 정상 종료로 간주합니다.
- cancel()은 작업을 중지하고 exception을 전달합니다.
요약
- flow 또는 emptyFlow로 간단하게 플로우를 생성할 수 있고, asFlow, Flow 빌더를 통해 변환 플로우를 생성할 수 있습니다.
- 채널의 특징을 가지는 channelFlow, callbackFlow도 활용할 수 있습니다.
- 각 빌더는 용도가 다르기 때문에, 여러 종류의 빌더를 알아 두는 것이 좋습니다.