채널과 셀렉트

Channel

  • 2개의 코루틴 사이를 연결하는 파이프로 코루틴 사이에 데이터 stream을 공유할 수 있습니다.
  • 멀티 스레딩 환경에서 큐 형태로 데이터를 처리하고, 동기화된 방식으로 데이터를 주고받을 수 있도록 도와주는 자료구조입니다.

BlockingQueue

  • Java의 java.util.concurrent 패키지에 포함된 인터페이스로, 여러 스레드 간에 데이터를 안전하게 주고받을 수 있도록 설계된 큐입니다.
  • 간단한 동기화, 자동 블로킹 처리, 다양한 구현체를 제공합니다.

BlockingQueue는 스레드 기반 처리를 사용하며, 스레드 풀을 활용한 작업을 위해 추가적인 설정이 필요합니다.

Channel

  • Channel은 BlockingQueue와 마찬가지로 비동기 데이터 stream을 지원합니다.
  • 채널 내부의 용량이 가득 찼을 경우 데이터를 채널에 보내려고 하면, 채널은 현재 코루틴을 일시 중단시키고 나중에 재개합니다.
  • suspend 함수를 사용해 데이터를 보내거나 받으므로, 스레드를 블로킹하지 않고 비동기적으로 동작합니다.

나중에 재개가 가능하다는 점에서 BlockingQueue와 큰 차이를 보입니다.

Channel 구현

fun main() {
    runBlocking {
        val channel = Channel<Int>()
        launch { // 코루틴 스코프 1
            for (x in 1..10){
                channel.send( x * x)
            }
        }
        launch { // 코루틴 스코프 2
            repeat(10){
                println(channel.receive())
            }
        }
    }
}
  • 채널은 send()를 통해 데이터를 전달하며, receive()로 데이터를 전달받아 처리할 수 있습니다.
fun main() {
    runBlocking {
        val channel = Channel<Int>()
        launch {
            for (x in 1..10){
                channel.send( x * x)
                // 채널을 닫을 경우 ClosedSendChannelException 발생
//                if (x == 5){
//                    channel.close()
//                }
            }
        }
        launch {
            for (y in channel){
                println(y)
            }
            println("end")
        }
    }
}
  • close()를 통해 채널을 닫아 더 이상 요소를 오지 않게 할 수 있습니다.
  • 채널에 닫혔는데 send()를 시도할 경우 ClosedSendChannelException이 발생합니다.
  • 이는 채널에 close 토큰을 보내는 것과 같으며, 즉시 반복이 중지되어 close 이전에 모든 요소가 수신되는 것을 보장합니다.

channel producer - consumer

  • 비동기 프로그래밍에서 제공자-소비자 패턴은 흔하게 사용됩니다.
  • 채널은 제공자 측에서 바로 produce 할 수 있도록 consumeEach 함수를 제공합니다.
    • 이는 for loop를 대체하여 사용할 수 있습니다.
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.produceValue() : ReceiveChannel<Int> {
    return produce { 
        for (x in 1.. 10) send( x * x)
    }
}
fun main(){
    runBlocking { 
        val produceValue = produceValue()
        produceValue.consumeEach { 
            println(it)
        }
        println("end")
    }
}

Pipeline

  • 한 채널로부터 받은 원소를 다른 채널로 전송하는 경우 활용하며, 채널과 채널을 연결하는 역할을 합니다.
  • 하나의 코루틴이 무한의 stream 값을 생성하는 패턴입니다.
    • 다른 코루틴이 이전 코루틴의 stream을 소비하여 다른 결과를 만들어 냅니다.
    • map과 비슷한 기능을 합니다.
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.numbers() : ReceiveChannel<Int> {
    return produce {
        repeat(3){ num ->
            send(num + 1)
        }
    }
}

@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) : ReceiveChannel<Int> {
    return produce {
        numbers.consumeEach {
            send( it * it)
        }
    }
}

suspend fun main() = coroutineScope {
    val numbers = numbers()
    val squared = square(numbers)
    squared.consumeEach {
        println(it)
    }
}

셀렉트

  • 가장 먼저 완료되는 코루틴의 결과를 기다리는 함수입니다.
  • 복수의 채널을 동시에 처리할 수 있는 기능을 제공합니다.

여러 개의 채널 중 버퍼에 남은 공간이 있는 채널을 먼저 확인하여 데이터를 보내거나, 이용 가능한 원소가 있는 채널로부터 데이터를 받을 수 있는지 여부를 확인합니다.

  • API 형태가 바뀔 가능성이 있으며, 실제 사용 경우는 드뭅니다.

특징

  • 여러 채널을 동시에 처리하고, 첫 번째로 데이터를 받는 채널에 대해 처리를 진행합니다.
  • select 블록 내부에서 복수의 채널을 기다릴 수 있고, 각 채널에 대한 처리를 독립적으로 정의할 수 있습니다.
  • 첫 번째로 완려된 작업을 처리하므로, 순서와 상관없이 실행됩니다.
fun main() = runBlocking {
    val channel1 = Channel<String>()
    val channel2 = Channel<String>()

    launch {
        delay(1000)
        channel1.send("channel1")
    }

    launch {
        delay(500)
        channel2.send("channel2")
    }

    // select 블록을 사용하여 채널에서 값을 받음
    select<Unit> {
        // 채널 1 select
        channel1.onReceive { message ->
            println("Received from channel1: $message")
        }
        // 채널 2 select
        channel2.onReceive { message ->
            println("Received from channel2: $message")
        }
    }
}