[Coroutine] ๊ณต์œ  ์ƒํƒœ๋กœ ์ธํ•œ ๋ฌธ์ œ

 ๐Ÿ’ก ์ฝ”๋ฃจํ‹ด์—์„œ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋Š” ๊ณต์œ  ์ƒํƒœ๋กœ ์ธํ•œ ๋ฌธ์ œ๋ฅผ ํ™•์ธํ•˜๊ณ , ์ด๋ฅผ ๋™๊ธฐํ™” ๋ธ”๋กœํ‚น์„ ํ†ตํ•ด ์–ด๋–ป๊ฒŒ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๋Š”์ง€ ํ•™์Šตํ•˜์˜€์Šต๋‹ˆ๋‹ค.

 

๊ณต์œ  ์ƒํƒœ๋กœ ์ธํ•œ ๋ฌธ์ œ

class UserDowloader(
    private val api : NetworkService
) {
    private val users = mutableListOf<User>()

    fun downloaded(): List<User> = users.toList()

    suspend fun fetchUser(id: Int) {
        val newUser = api.fetchUser(id)
        users.add(newUser)
    }
}
  • ์œ„ ์ฝ”๋“œ๋Š” ๋™์‹œ ์‚ฌ์šฉ์— ๋Œ€ํ•œ ๋Œ€๋น„๊ฐ€ ๋˜์–ด ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
  • fetchUser ํ˜ธ์ถœ์€ users๋ฅผ ๋ณ€๊ฒฝํ•˜๋Š”๋ฐ, ์ด ๊ฒฝ์šฐ ๊ฐ™์€ ์‹œ๊ฐ„์— ํ•ด๋‹น ํ•จ์ˆ˜๊ฐ€ ํ•œ ๊ฐœ ์Šค๋ ˆ๋“œ์—์„œ ์‹œ์ž‘ํ•  ๊ฒฝ์šฐ์—๋งŒ ์ •์ƒ์ ์œผ๋กœ ์ž‘๋™ํ•ฉ๋‹ˆ๋‹ค.
  • ๋งŒ์•ฝ ๋™์‹œ์— ๋ฆฌ์ŠคํŠธ๋ฅผ ๋ณ€๊ฒฝํ•˜๋ฉด ์ถฉ๋Œ์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์•„๋ž˜์™€ ๊ฐ™์€ ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    • ๊ฐ™์€ ๊ฐ์ฒด์— ๋Œ€ํ•˜์—ฌ 1_000_000๋ฒˆ ์‹คํ–‰ํ•  ๊ฒฝ์šฐ ์‹ค์ œ๋ก  ๊ธฐ๋Œ€ํ•˜๋Š” ์ˆ˜๋ณด๋‹ค ์ž‘์€ ํ˜ธ์ถœ์ด ๋ฐœ์ƒํ•˜๋ฉฐ, ์˜ˆ์™ธ๊ฐ€ ์ƒ๊ธธ ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.
repeat(1_000_000) {
    launch {
        downloader.fetchUser(it)
    }
}

๋™๊ธฐํ™” ๋ธ”๋กœํ‚น

  • ๊ณต์œ  ์ƒํƒœ๋กœ ์ธํ•œ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด์„œ ์ž๋ฐ”์—์„œ๋Š” synchronized ๋ธ”๋ก์ด๋‚˜ ๋™๊ธฐํ™”๋œ ์ปฌ๋ ‰์…˜์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.
synchronized(lock) { // ์Šค๋ ˆ๋“œ๋ฅผ ๋ธ”๋กœํ‚นํ•ฉ๋‹ˆ๋‹ค
  counter++
}
  • ํ•˜์ง€๋งŒ ์œ„ ๋ฐฉ์‹์€ ์•„๋ž˜์™€ ๊ฐ™์€ ๋ฌธ์ œ์ ์ด ์žˆ์Šต๋‹ˆ๋‹ค.
    • synchronized ๋‚ด๋ถ€์—์„œ ์ค‘๋‹จ ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.
    • synchronized ๋ธ”๋ก์—์„œ ์ฝ”๋ฃจํ‹ด์ด ์ž๊ธฐ ์ฐจ๋ก€๋ฅผ ๊ธฐ๋‹ค๋ฆด ๋•Œ ์Šค๋ ˆ๋“œ๋ฅผ ๋ธ”๋กœํ‚นํ•ฉ๋‹ˆ๋‹ค.
  • ๋””์ŠคํŒจ์ฒ˜์˜ ์›๋ฆฌ๋ฅผ ์ƒ๊ฐํ•œ๋‹ค๋ฉด ์ฝ”๋ฃจํ‹ด์ด ์Šค๋ ˆ๋“œ๋ฅผ ๋ธ”๋กœํ‚นํ•˜๋Š” ๊ฒƒ์„ ์ง€์–‘ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
  • ๋ฉ”์ธ ์Šค๋ ˆ๋“œ๋ฅผ ๋ธ”๋กœํ‚นํ•˜๊ฑฐ๋‚˜ ์ œํ•œ ๋œ ์ˆ˜์˜ ์Šค๋ ˆ๋“œ๋งŒ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ๊ฒƒ์€ ์ข‹์ง€ ์•Š์Šต๋‹ˆ๋‹ค !!
  • ์Šค๋ ˆ๋“œ๋ฅผ ๋‚ญ๋น„ํ•˜์ง€ ์•Š๊ณ , ์ฝ”๋ฃจํ‹ด์— ํŠนํ™”๋œ ๋ฐฉ๋ฒ•์„ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์›์ž์„ฑ

  • ์ž๋ฐ”์—์„œ๋Š” ์›์ž๊ฐ’์„ ํ™œ์šฉํ•œ ์—ฐ์‚ฐ์„ ์‚ฌ์šฉํ•˜๋ฉด (์›์ž์„ฑ ์—ฐ์‚ฐ) ๋ฝ ์—†์ด ๋กœ์šฐ ๋ ˆ๋ฒจ๋กœ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
private var counter = AtomicInteger() //์›์ž์„ฑ ์—ฐ์‚ฐ

fun main() = runBlocking {
    massiveRun {
        counter.incrementAndGet()
    }
}
  • ๋ฌผ๋ก  ํ•˜๋‚˜์˜ ์—ฐ์‚ฐ์—์„œ ์›์ž์„ฑ์„ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค๊ณ  ํ•ด์„œ ์ „์ฒด ์—ฐ์‚ฐ์—์„œ ์›์ž์„ฑ์ด ๋ณด์žฅ๋˜๋Š” ๊ฒƒ์€ ์•„๋‹™๋‹ˆ๋‹ค.
  • ์ด๋Š” ํ•˜๋‚˜์˜ ํ”„๋ฆฌ๋ฏธํ‹ฐ๋ธŒ ๋ณ€์ˆ˜ ๋˜๋Š” ํ•˜๋‚˜์˜ ๋ž˜ํผ๋Ÿฐ์Šค์˜ ์•ˆ์ „์„ ๋ณด์žฅํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.

์‹ฑ๊ธ€์Šค๋ ˆ๋“œ๋กœ ์ œํ•œ๋œ ๋””์ŠคํŒจ์ฒ˜

  • ๊ณต์œ  ์ƒํƒœ๋กœ ์ธํ•œ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด์„œ ๋ณ‘๋ ฌ์„ฑ์„ ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ๋กœ ์ œํ•œํ•˜๋Š” ๋””์ŠคํŒจ์ฒ˜๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
val dispatcher = Dispatchers.IO
    .limitedParallelism(1)
var counter = 0
fun main() = runBlocking {
    massiveRun {
        withContext(dispatcher){
            counter++
        }
    }
}
  • ์‹ฑ๊ธ€ ์Šค๋ ˆ๋“œ๋ฅผ ๋””์ŠคํŒจ์ฒ˜๋กœ ์ œํ•œํ•˜๋Š” ๋ฐฉ๋ฒ•์—๋Š” ๋‘๊ฐ€์ง€ ๋ฐฉ๋ฒ•์ด ์žˆ์Šต๋‹ˆ๋‹ค.
    • ์ฒซ๋ฒˆ์งธ๋Š” ์œ„์ฒ˜๋Ÿผ withContect๋กœ ์ „์ฒด ํ•จ์ˆ˜๋ฅผ ๋ž˜ํ•‘ํ•˜๋Š” ์ฝ”์Šค ๊ทธ๋ ˆ์ธ๋“œ ์Šค๋ ˆ๋“œ ํ•œ์ •์ž…๋‹ˆ๋‹ค.
      • ์‚ฌ์šฉํ•˜๊ธฐ ์‰ฌ์šฐ๋ฉฐ ์ถฉ๋Œ์„ ๋ฐฉ์ง€ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, ํ•จ์ˆ˜ ์ „์ฒด์—์„œ ๋ฉ€ํ‹ฐ์Šค๋ ˆ๋”ฉ์˜ ์ด์ ์„ ๋ˆ„๋ฆฌ์ง€ ๋ชปํ•˜๋Š” ๋ฌธ์ œ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.
    • ๋‘๋ฒˆ์งธ ๋ฐฉ๋ฒ•์€ ํŒŒ์ธ ๊ทธ๋ ˆ์ธ๋“œ ์Šค๋ ˆ๋“œ ํ•œ์ •์ž…๋‹ˆ๋‹ค.
      • ์ƒํƒœ๋ฅผ ๋ณ€๊ฒฝํ•˜๋Š” ๊ตฌ๋ฌธ๋“ค๋งŒ ๋ž˜ํ•‘ํ•˜๋ฉฐ, ๋ฒˆ๊ฑฐ๋กญ์ง€๋งŒ CPU ์ง‘์•ฝ์ ์ธ ๊ฒฝ์šฐ์— ๋” ๋‚˜์€ ์„ฑ๋Šฅ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.
val dispatcher = Dispatchers.IO
    .limitedParallelism(1)

suspend fun downloaded(): List<User> =
    withContext(dispatcher){
        users.toList()
    }

๋ฎคํ…์Šค (Mutex)

  • ๊ฐ€์žฅ ๋งŽ์ด ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ์‹์ด๋ฉฐ, ๋‹จ ํ•˜๋‚˜์˜ ์—ด์‡ ๊ฐ€ ์žˆ๋Š” ๋ฐฉ์ด๋ผ๊ณ  ํ‘œํ˜„ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • ์ฒซ ๋ฒˆ์งธ ์ฝ”๋ฃจํ‹ด์ด lock์„ ํ˜ธ์ถœํ•˜๋ฉด ์—ด์‡ ๋ฅผ ๊ฐ€์ง€๊ณ  ์ค‘๋‹จ ์—†์ด ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
  • ๋˜๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด์ด lock์„ ํ˜ธ์ถœํ•˜๋ฉด ์ฒซ ๋ฒˆ์งธ ์ฝ”๋ฃจํ‹ด์ด unlock์„ ํ˜ธ์ถœํ•  ๋•Œ๊นŒ์ง€ ์ค‘๋‹จ๋ฉ๋‹ˆ๋‹ค.
    • ํ™”์žฅ์‹ค์˜ ์—ด์‡ ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ž‘์—…๊ณผ ์œ ์‚ฌ 
  • ์ด ์ƒํ™ฉ์—์„œ ๋˜ ๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด์ด lock์„ ํ˜ธ์ถœํ•  ๊ฒฝ์šฐ ์ž‘์—…์„ ์ค‘๋‹จํ•œ ๋’ค์— ๋‘ ๋ฒˆ์งธ ์ฝ”๋ฃจํ‹ด ๋‹ค์Œ ์ˆœ์„œ๋กœ ํ์— ๋“ค์–ด๊ฐ€๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
  • ๋‹จ ํ•˜๋‚˜์˜ ์ฝ”๋ฃจํ‹ด๋งŒ์ด lock๊ณผ unlock ์‚ฌ์ด์— ์กด์žฌํ•˜๋Š” ๋ฐฉ๋ฒ•์ž…๋‹ˆ๋‹ค.
suspend fun main() = coroutineScope {
    repeat(5){
        launch {
            delayAndPrint()
        }
    }
}

val mutex = Mutex()

suspend fun delayAndPrint() {
    mutex.lock()
    delay(1000)
    mutex.unlock()
}
  • lock๊ณผ unlock์„ ์ง์ ‘ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์€ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฑฐ๋‚˜ ๋ฐ˜ํ™˜์ด ๋น ๋ฅด๊ฒŒ ์ด๋ค„์งˆ ๊ฒฝ์šฐ ์œ„ํ—˜ํ•œ ์ƒํ™ฉ์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    • ๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด์ด lock์„ ํ†ต๊ณผํ•  ์ˆ˜ ์—†๋Š” ๊ฒฝ์šฐ (์—ด์‡ ๋ฅผ ๋Œ๋ ค๋ฐ›์ง€ ๋ชปํ•จ) ๋ฐ๋“œ๋ฝ ์ƒํ™ฉ์ด ๋ฐœ์ƒํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
    • [๋ฐ๋“œ๋ฝ] : ์‹œ์Šคํ…œ ์ž์›์— ๋Œ€ํ•œ ์š”๊ตฌ๊ฐ€ ๋’ค์—‰ํ‚จ ์ƒํƒœ์ด๋ฉฐ, ์ ์œ ํ•˜๊ณ  ์žˆ๋Š” ์ž์›์„ ์„œ๋กœ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ƒํ™ฉ
  • ์ด๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด์„œ lock์œผ๋กœ ์‹œ์ž‘ํ•ด finally ๋ธ”๋ก์—์„œ unlock์„ ํ˜ธ์ถœํ•˜๋Š” withLock ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ธ”๋ก ๋‚ด์—์„œ ์–ด๋–ค ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋”๋ผ๋„ ์„ฑ๊ณต์ ์œผ๋กœ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
val mutex = Mutex()

var counter = 0

fun main() = runBlocking {
    messiveRun {
        mutex.withLock {
            counter++
        }
    }
}
  • synchronized ๋ธ”๋ก๊ณผ์˜ ์ฐจ์ด์ ์€ ์Šค๋ ˆ๋“œ๋ฅผ ๋ธ”๋กœํ‚นํ•˜๋Š” ๋Œ€์‹  ์ฝ”๋ฃจํ‹ด์„ ์ค‘๋‹จ์‹œํ‚จ๋‹ค๋Š” ์ ์ž…๋‹ˆ๋‹ค.
  • ์„ฑ๋Šฅ์ด ์ข‹์ง€๋งŒ ์‚ฌ์šฉํ•˜๊ธฐ ์–ด๋ ค์šฐ๋ฉฐ, ์•„๋ž˜์™€ ๊ฐ™์€ ๋ฌธ์ œ์ ์ด ์žˆ์Šต๋‹ˆ๋‹ค.
    • ์ฝ”๋ฃจํ‹ด์ด ๋ฝ์„ ๋‘ ๋ฒˆ ํ†ต๊ณผํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.
    • ์ฝ”๋ฃจํ‹ด์ด ์ค‘๋‹จ๋˜์—ˆ์„ ๋•Œ ๋ฎคํ…์Šค๋ฅผ ํ’€ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.
suspend fun main() {
    val mutex = Mutex()
    mutex.withLock {
        mutex.withLock {
            // in mutex
        }
    }		// ๋ฝ์„ ๋‘๋ฒˆ ์‚ฌ์šฉํ•˜์—ฌ ์˜์›ํžˆ ์‹คํ–‰

    coroutineScope {
        launch {
            delay(10000)
            mutex.withLock {
                // in mutex
            }
        }
    } // delay๊ฐ€ ์†Œ๋ชจ๋  ๋•Œ๊นŒ์ง€ mutex๋ฅผ ํ’€ ์ˆ˜ ์—†์Œ 
}

์„ธ๋งˆํฌ์–ด

  • ๋ฎคํ…์Šค์™€ ๋น„์Šทํ•˜๊ฒŒ ์ž‘๋™ํ•˜์ง€๋งŒ, ๋‘˜ ์ด์ƒ์ด ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ํŠน์ง•์ด ์žˆ์Šต๋‹ˆ๋‹ค.
  • ์„ธ๋งˆํฌ์–ด๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ ‘๊ทผ์„ ํ—ˆ์šฉํ•˜๋ฉฐ, acquire, release, withPermit ํ•จ์ˆ˜๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
suspend fun main() = coroutineScope {
    val semaphore = Semaphore(2)

    launch {
        semaphore.withPermit {
        }
    }
}
  • ์„ธ๋งˆํฌ์–ด๋Š” ๊ณต์œ  ์ƒํƒœ๋กœ ์ธํ•œ ๋ฌธ์ œ๋ฅผ ์ง์ ‘์ ์œผ๋กœ ํ•ด๊ฒฐํ•  ์ˆ˜ ์—†์ง€๋งŒ, ๋™์‹œ ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ˆ˜๋ฅผ ์ œํ•œํ•  ๋•Œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • ์ด๋Š” ์ฒ˜๋ฆฌ์œจ ์ œํ•œ ์žฅ์น˜๋ฅผ ๊ตฌํ˜„ํ•  ๋•Œ ๋„์›€์ด ๋ฉ๋‹ˆ๋‹ค.
    • rate limiter : ํด๋ผ์ด์–ธํŠธ ๋˜๋Š” ์„œ๋น„์Šค๊ฐ€ ๋ณด๋‚ด๋Š” ํŠธ๋ž˜ํ”ฝ์˜ ์ฒ˜๋ฆฌ์œจ์„ ์ œ์–ดํ•˜๊ธฐ ์œ„ํ•œ ์žฅ์น˜
class LimitedNetworkUserRepository(
    private val api: UserApi
) {
    private val semaphore = Semaphore(10)

    suspend fun requestUser(userId: String) =
        semaphore.withPermit {
            api.requstUser(userId)
        }
}

์š”์•ฝ

  • ๊ณต์œ  ์ƒํƒœ๋ฅผ ๋ณ€๊ฒฝํ•  ๋•Œ ์ถฉ๋Œ์„ ํ”ผํ•˜๊ธฐ ์œ„ํ•ด ์‹ฑ๊ธ€์Šค๋ ˆ๋“œ๋กœ ์ œํ•œ๋œ ๋””์ŠคํŒจ์ฒ˜๋ฅผ ์‚ฌ์šฉํ•ด ๊ณต์œ  ์ƒํƒœ๋ฅผ ๋ณ€๊ฒฝํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์ฃผ๋กœ ํ™œ์šฉํ•ฉ๋‹ˆ๋‹ค.
  • ํŒŒ์ธ ๊ทธ๋ ˆ์ธ๋“œ ์Šค๋ ˆ๋“œ ํ•œ์ • : ๋™๊ธฐํ™”๊ฐ€ ํ•„์š”ํ•œ ํŠน์ • ์žฅ์†Œ๋งŒ ๋ž˜ํ•‘
  • ์ฝ”์Šค ๊ทธ๋ ˆ์ธ๋“œ ์Šค๋ ˆ๋“œ ํ•œ์ • : ์ „์ฒด ํ•จ์ˆ˜๋ฅผ ๋ž˜ํ•‘
  • ์ด ์™ธ์— ์›์ž์„ฑ, ๋ฎคํ…์Šค, ์„ธ๋งˆํฌ์–ด๋ฅผ ํ™œ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์ด ์žˆ์Šต๋‹ˆ๋‹ค.