Channel
- 2개의 코루틴 사이를 연결하는 파이프로 코루틴 사이에 데이터 stream을 공유할 수 있습니다.
- 멀티 스레딩 환경에서 큐 형태로 데이터를 처리하고, 동기화된 방식으로 데이터를 주고받을 수 있도록 도와주는 자료구조입니다.
BlockingQueue
- Java의 java.util.concurrent 패키지에 포함된 인터페이스로, 여러 스레드 간에 데이터를 안전하게 주고받을 수 있도록 설계된 큐입니다.
- 간단한 동기화, 자동 블로킹 처리, 다양한 구현체를 제공합니다.
BlockingQueue는 스레드 기반 처리를 사용하며, 스레드 풀을 활용한 작업을 위해 추가적인 설정이 필요합니다.
Channel
- Channel은 BlockingQueue와 마찬가지로 비동기 데이터 stream을 지원합니다.
- 채널 내부의 용량이 가득 찼을 경우 데이터를 채널에 보내려고 하면, 채널은 현재 코루틴을 일시 중단시키고 나중에 재개합니다.
- suspend 함수를 사용해 데이터를 보내거나 받으므로, 스레드를 블로킹하지 않고 비동기적으로 동작합니다.
나중에 재개가 가능하다는 점에서 BlockingQueue와 큰 차이를 보입니다.
Channel 구현
fun main() {
runBlocking {
val channel = Channel<Int>()
launch { // 코루틴 스코프 1
for (x in 1..10){
channel.send( x * x)
}
}
launch { // 코루틴 스코프 2
repeat(10){
println(channel.receive())
}
}
}
}
- 채널은 send()를 통해 데이터를 전달하며, receive()로 데이터를 전달받아 처리할 수 있습니다.
fun main() {
runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..10){
channel.send( x * x)
// 채널을 닫을 경우 ClosedSendChannelException 발생
// if (x == 5){
// channel.close()
// }
}
}
launch {
for (y in channel){
println(y)
}
println("end")
}
}
}
- close()를 통해 채널을 닫아 더 이상 요소를 오지 않게 할 수 있습니다.
- 채널에 닫혔는데 send()를 시도할 경우 ClosedSendChannelException이 발생합니다.
- 이는 채널에 close 토큰을 보내는 것과 같으며, 즉시 반복이 중지되어 close 이전에 모든 요소가 수신되는 것을 보장합니다.
channel producer - consumer
- 비동기 프로그래밍에서 제공자-소비자 패턴은 흔하게 사용됩니다.
- 채널은 제공자 측에서 바로 produce 할 수 있도록 consumeEach 함수를 제공합니다.
- 이는 for loop를 대체하여 사용할 수 있습니다.
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.produceValue() : ReceiveChannel<Int> {
return produce {
for (x in 1.. 10) send( x * x)
}
}
fun main(){
runBlocking {
val produceValue = produceValue()
produceValue.consumeEach {
println(it)
}
println("end")
}
}
Pipeline
- 한 채널로부터 받은 원소를 다른 채널로 전송하는 경우 활용하며, 채널과 채널을 연결하는 역할을 합니다.
- 하나의 코루틴이 무한의 stream 값을 생성하는 패턴입니다.
- 다른 코루틴이 이전 코루틴의 stream을 소비하여 다른 결과를 만들어 냅니다.
- map과 비슷한 기능을 합니다.
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.numbers() : ReceiveChannel<Int> {
return produce {
repeat(3){ num ->
send(num + 1)
}
}
}
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) : ReceiveChannel<Int> {
return produce {
numbers.consumeEach {
send( it * it)
}
}
}
suspend fun main() = coroutineScope {
val numbers = numbers()
val squared = square(numbers)
squared.consumeEach {
println(it)
}
}
셀렉트
- 가장 먼저 완료되는 코루틴의 결과를 기다리는 함수입니다.
- 복수의 채널을 동시에 처리할 수 있는 기능을 제공합니다.
여러 개의 채널 중 버퍼에 남은 공간이 있는 채널을 먼저 확인하여 데이터를 보내거나, 이용 가능한 원소가 있는 채널로부터 데이터를 받을 수 있는지 여부를 확인합니다.
- API 형태가 바뀔 가능성이 있으며, 실제 사용 경우는 드뭅니다.
특징
- 여러 채널을 동시에 처리하고, 첫 번째로 데이터를 받는 채널에 대해 처리를 진행합니다.
- select 블록 내부에서 복수의 채널을 기다릴 수 있고, 각 채널에 대한 처리를 독립적으로 정의할 수 있습니다.
- 첫 번째로 완려된 작업을 처리하므로, 순서와 상관없이 실행됩니다.
fun main() = runBlocking {
val channel1 = Channel<String>()
val channel2 = Channel<String>()
launch {
delay(1000)
channel1.send("channel1")
}
launch {
delay(500)
channel2.send("channel2")
}
// select 블록을 사용하여 채널에서 값을 받음
select<Unit> {
// 채널 1 select
channel1.onReceive { message ->
println("Received from channel1: $message")
}
// 채널 2 select
channel2.onReceive { message ->
println("Received from channel2: $message")
}
}
}