Flow
- Kotlin Coroutines의 일부분으로 비동기적 데이터 스트림을 처리하는 API입니다.
- 떠다니는 원소들을 모으는 역할을 하며, 플로우의 끝에 도달할 때까지 각 값을 처리하는 걸 의미합니다.
- collect는 컬렉션의 forEach와 비슷하게 동작합니다.
특징
- 소비자가 구독을 시작해야만 데이터 생산이 시작됩니다.
- 메모리 효율적입니다.
- 데이터를 비동기적으로 가져와 UI를 처단하지 않고 효율적으로 업데이트합니다.
시간에 따라 발생하는 데이터 변화를 처리하는데 유용하며, Android에서는 StateFlow, SharedFlow의 형태로 구현하여 상태 관리와 이벤트 처리를 합니다.
vs 컬렉션
- List, Set과 같은 값은 플로우 처럼 여러개의 값을 반환합니다.
- 하지만 모든 원소의 계산이 완료될 때까지 기다려야 하기 때문에 시간이 소모됩니다.
- 즉각적으로 원소가 필요할 경우, 원소가 나오자마자 바로 얻는 것이 좋습니다.
vs Sequence
- CPU 집약적인 연산 또는 블로킹 연산일 때 필요할 때마다 값을 계산하여 나타낼 수 있습니다.
- 하지만 중단 함수가 아니기 때문에, 시퀸스 빌더 내부에 중단점이 있다면 스레드가 블로킹됩니다.
- yield, yieldAll 등의 중단 함수를 활용할 수 있지만, 생각지 못한 스레드 블로킹이 발생할 수 있습니다.
with Coroutines
- 플로우는 코루틴을 사용해야 하는 데이터 스트림으로 사용되어야 합니다.
- first(), toList(), find() 등으로 원하는 값을 원하는 크기만큼 효율적으로 가져올 수 있습니다.
- 플로우의 최종 연산은 스레드를 블로킹하는 대신 코루틴을 중단시킵니다.
- 따라서 플로우는 취소가 가능하며, 구조화된 동시성을 기본으로 가집니다.
// 플로우 빌더는 중단 함수가 아니기 때문에 CoroutineScope가 필요하지 않습니다.
fun userFlow(): Flow<String> = flow {
repeat(3) {
delay(1000)
val ctx = currentCoroutineContext()
val name = ctx[CoroutineName]?.name
emit("User$it in $name")
}
}
suspend fun main() {
val user = userFlow()
val coroutineName = CoroutineName("Name")
withContext(coroutineName) {
val job = launch {
// collect는 중단 함수로 스코프 내에서 실행되어야 합니다.
user.collect {
println(it)
}
}
// 충분히 기다린 뒤 실행
delay(2100)
println("I got enough")
job.cancel()
}
}
- 코루틴 취소 시 플로우는 적절하게 취소됩니다.
with 동시성 처리
- 동시성 처리를 판매자 목록을 통해 상품을 가져오는 로직에 적용할 수 있습니다.
- 컬렉션 내부에서 async를 사용하여 동시성 처리를 진행합니다.
interface SampleApi {
suspend fun requestOffers(name: String): List<String>
}
// Offer의 크기가 다를 때,
// 많은 요청을 처리할 경우 서버와 서비스에 좋지 않은 영향을 끼칩니다.
suspend fun getOffers(
api : SampleApi,
sellers: List<String>
): List<String> = coroutineScope {
sellers
.map { seller ->
async { api.requestOffers(seller) }
}
.flatMap { it.await() }
}
- 위 방법은 요청 크기가 클 경우 비효율적일 수 있습니다.
- 이를 위해 플로우 처리를 활용할 수 있습니다.
// 동시성 속성이 적용 된 플로우 처리를 통해 이를 해결할 수 있습니다.
suspend fun getOffersWithFlow(
api: SampleApi,
sellers: List<String>,
): Flow<List<String>> = sellers
.asFlow()
.flatMapMerge(concurrency = 20) { seller ->
suspend { api.requestOffers(seller) }.asFlow()
}
// 동시 처리, 컨텍스트, 예외를 상황에 따라 조절할 수 있습니다.
요약
- 중단 함수 대신 플로우를 사용하는 경우, 단 하나의 값만 반환하는 경우에도 플로우가 유용할 수 있습니다.
- 중단 함수를 쓰는 것이 더 효율적인 경우도 있습니다.
- 플로우는 시퀸스와 달리 코루틴을 지원하며 비동기적으로 계산되는 값을 나타냅니다.