ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 13-3. 동시성 : 동시성 통신
    Study(종료)/Kotlin 22.09.13 ~ 12.18 2022. 12. 11. 15:48

    13장 - 동시성

     

    13-1. 코루틴

    13-2.  코루틴 흐름 제어와 잡 생명 주기

    13-3. 동시성 통신

    13-4.  자바 동시성 사용하기


    13-3. 동시성 통신

     

    어제인가 친구랑 얘기를 하다가 코루틴 관련 내용이 나왔는데,

    코루틴 얘기를 하다가 같은 변수에 동시에 접근하면 어떻게 될까? 라는 얘기를 했다.

    그냥 내 개인적인 생각으로는 작업별로 우선순위를 정한 뒤 우선순위 큐에 넣으면 해결되지 않을까?

    라는 생각을 했는데, 사실 굉장히 얕게 생각했던 해결 방법이다.

    이번 단원에서 이런 내용에 대해 코틀린에서 어떻게 해결하는지 알려주는 것 같다.

     

    이번 단원은 코루틴과 액터 사이에서 동기화나 락을 사용하지 않고도 변경 가능한 상태를

    안전하게 공유할 수 있는 데이터 스트림을 제공하는 매커니즘인 채널(channel)에 대한 내용이다.

     

    1) 채널

    채널은 임의의 데이터 스트림을 코루틴 사이에 공유할 수 있는 편리한 방법이다.

    Channel 인터페이스가 제공하는 채널에 대한 기본 연산은 send(), receive() 메서드다.

     

    이 메서드들이 자신의 작업을 완료할 수 없을 때

    채널은 현재 코루틴을 일시 중단시키고 나중에 처리가 가능할 때 재개한다.

    채널 내부 버퍼가 꽉 찼는데, 데이터를 채널에 보내려고 하는 경우를 예로 들 수 있다.

    자바의 동시성 API에서 블러킹 큐는 스레드를 블럭시키는데,

    채널은 일시 중단/재개 과정이 있으므로 가장 큰 차이로 볼 수 있다.

     

    제네릭 Channel() 함수를 사용해 채널을 만들 수 있으며 채널의 용량을 지정한 정수를 받는다.

    fun main() {
        runBlocking {
            val streamSize = 5
            val channel = Channel<Int>(3) // 채널 용량 = 3
    
            launch {
                for (n in 1..streamSize) {
                    delay(Random.nextLong(100))
                    val square = n * n
                    println("Sending: $square")
                    channel.send(square)
                }
            }
    
            launch {
                for (i in 1..streamSize) {
                    delay(Random.nextLong(100))
                    val n = channel.receive()
                    println("Receiving: $n")
                }
            }
        }
    }
    //Sending: 1
    //Receiving: 1
    //Sending: 4
    //Sending: 9
    //Receiving: 4
    //Receiving: 9
    //Sending: 16
    //Receiving: 16
    //Sending: 25
    //Receiving: 25

    채널은 모든 값이 송신된 순서 그대로 수신되도록 보장한다.

     

    Channel() 함수는 채널의 동작을 바꿀 수 있는 여러 특별한 값을 받을 수 있다.

    이러한 값은 Channel 인터페이스의 동반 객체에 상수로 정의돼 있다.

    public interface Channel<E> : kotlinx.coroutines.channels.SendChannel<E>, 
    	kotlinx.coroutines.channels.ReceiveChannel<E> {
            public companion object Factory {
                public const final val BUFFERED: kotlin.Int /* compiled code */
    
                internal final val CHANNEL_DEFAULT_CAPACITY: kotlin.Int /* compiled code */
    
                public const final val CONFLATED: kotlin.Int /* compiled code */
    
                public const final val DEFAULT_BUFFER_PROPERTY_NAME: kotlin.String /* compiled code */
    
                internal const final val OPTIONAL_CHANNEL: kotlin.Int /* compiled code */
    
                public const final val RENDEZVOUS: kotlin.Int /* compiled code */
    
                public const final val UNLIMITED: kotlin.Int /* compiled code */
            }
    }

     ● Channel.UNLIMITED ( = Int.MAX_VALUE)

      채널 용량 제한이 없고, 내부 버퍼는 필요에 따라 증가, send() 시에 결코 일시 중단되지 않음.

      receive()를 하는 경우 버퍼가 비어있을 경우 일시 중단될 수 있음.

     

     ● Channel.RENDEZVOUS( = 0)

      아무 내부 버퍼가 없는 랑데부 채널. 채널 생성 시 용량을 지정하지 않으면 이 방식의 채널 생성.

      send() 호출은 다른 코루틴이 receive()를 호출할 때 까지

      receive() 호출은 다른 코루틴이 send()를 호출할 때 까지 일시 중단됨.

     

     ● Channel.CONFLATED ( = -1)

      송신된 값이 합쳐지는 채널(conflated channel)

      즉 send()로 보낸 원소를 최대 하나만 버퍼에 저장하고,

      이 값이 다른 누군가에 의해 수신되기 전에 send() 요청이 올 경우 기존 값을 덮어 씀.

      (수신되지 못한 값은 소실)

      send() 메서드는 결코 일시 중단되지 않음.

      

     ● Channel.UNLIMITED 보다 작은 임의의 양수

      버퍼 크기가 일정하게 제한된 채널 생성

     

    RENDEZVOUS 채널은 생산자와 소비자 코루틴이 교대로 활성화되도록 보장한다.

    앞에 예제의 채널 크기를 0으로 바꾸면 시간과 상관없이 안정적으로 동작한다.

     

    CONFLATED 채널은 스트림에 기록한 모든 원소가 도착할 필요가 없고,

    소비자 코루틴이 뒤쳐지면 생산자가 만들어낸 값 중 일부를 버려도 되는 경우 쓰인다.

    위 예제 코드의 receive 함수의 지연시간을 늘리고, CONFLATED 채널로 생성하면

    아래 코드와 같은 결과를 얻을 수 있다.

    fun main() {
        runBlocking {
            val streamSize = 5
            val channel = Channel<Int>(Channel.CONFLATED)
    
            launch {
                for (n in 1..streamSize) {
                    delay(Random.nextLong(100))
                    val square = n * n
                    println("Sending: $square")
                    channel.send(square)
                }
            }
    
            launch {
                for (i in 1..streamSize) {
                    delay(Random.nextLong(500))
                    val n = channel.receive()
                    println("Receiving: $n")
                }
            }
        }
    }
    //Sending: 1
    //Sending: 4
    //Receiving: 4
    //Sending: 9
    //Sending: 16
    //Sending: 25
    //Receiving: 25

    위 코드를 실행하면 마지막 줄을 출력한 다음에도 프로그램이 끝나지 않는다.

    수신자 코루틴이 기대하는 원소 개수가 다섯 개이기 때문이다.

     

    이 때 필요한 것은 채널이 닫혀서 더이상 데이터를 보내지 않는다는 사실을 알려주는 신호이다.

    Channel API는 생산자 쪽에서 close() 메서드를 사용해 이러한 신호를 보낼 수 있다.

    또한 소비자 쪽에서는 이터레이션 횟수를 고정하는 대신

    채널에서 들어오는 데이터에 대해 이터레이션을 할 수 있다.

    fun main() {
        runBlocking {
            val streamSize = 5
            val channel = Channel<Int>(Channel.CONFLATED)
    
            launch {
                for (n in 1..streamSize) {
                    delay(Random.nextLong(100))
                    val square = n * n
                    println("Sending: $square")
                    channel.send(square)
                }
                channel.close()
            }
    
            launch {
                for (n in channel) { 
                // 범위 대신 channel 이터레이션 사용
                    delay(Random.nextLong(200))
                    println("Receiving: $n")
                }
            }
        }
    }
    //Sending: 1
    //Sending: 4
    //Sending: 9
    //Sending: 16
    //Receiving: 1
    //Sending: 25
    //Receiving: 16
    //Receiving: 25

    위 코드를 실행하면 데이터 교환이 완료된 후 프로그램이 종료된다.

    소비자쪽에서 명시적인 이터레이션 대신 consumeEach() 함수를 사용할 수 있다.

    for (n in channel) { 
        println("Receiving: $n")
    }
    
    channel.consumeEach {
        println("Receiving: $it")
    }
    // 동일한 코드

    채널이 닫힌 뒤 send()를 호출하면 예외가 발생하며 실패한다.

    채널이 닫힌 후 receive()를 호출하면 버퍼가 원소가 소진될 때 까지 정상적으로 원소를 반환하지만,

    이후에 마찬가지로 동일한 예외(ClosedSendChannelException)가 발생한다


    채널 통신에 참여하는 생산자가 여럿일 수도 있다.

    한 채널을 여러 코루틴이 동시에 읽을 수도 있으며 이 경우를 fan out이라고 한다.

    반대로 여러 생산자 코루틴이 한 채널에 써넣은 데이터를 한 소비자 코루틴이 읽는 fan in도 있다.

    fun main() {
        runBlocking {
            val streamSize = 5
            val channel = Channel<Int>(2)
    
            launch {
                for (n in 1..streamSize) {
                    val square = n * n
                    println("Sending: $square")
                    channel.send(square)
                }
                channel.close()
            }
            for (i in 1..3) {
                launch {
                    for (n in channel) {
                        println("Receiving by consumer #$i: $n")
                        delay(100)
                    }
                }
            }
        }
    }
    //Sending: 1
    //Sending: 4
    //Sending: 9
    //Receiving by consumer #1: 1
    //Receiving by consumer #2: 4
    //Receiving by consumer #3: 9
    //Sending: 16
    //Sending: 25
    //Receiving by consumer #1: 16
    //Receiving by consumer #2: 25

    위 코드는 생산자 코루틴이 생성한 데이터 스트림을 세 소비자 코루틴이 나눠 받는다.


    2) 생산자

    컬렉션 API를 설명할때 얘기했던 sequence() 함수가 있다.

    더보기

    시퀸스(Sequence)

    이터러블과 비슷하게 시퀸스도 iterator() 메서드를 제공한다. 이 메서드를 통해 시퀸스 내용을 순해할 수 있다.

    하지만 시퀸스는 지연 계산을 가정하기 때문에 iterator()의 의도가 이터러블과 다르다.

    대부분의 시퀸스 구현은 객체 초기화 시 원소를 초기화하지 않고 요청에 따라 원소를 계산한다.

    대부분의 시퀸스 구현은 상태가 없다.

    이 말은 지연 계산한 컬렉션 원소 중에 정해진 개수의 원소만 저장한다는 뜻 이다.

    반면 이터러블은 원소 개수에 비례해 메모리를 사용한다.

    코루틴은 이 sequence() 함수와 비슷하게

    동시성 데이터 스트림을 생성할 수 있는 produce()라는 코루틴 빌더가 있다.

    이 빌더는 채널과 비슷한 send() 메서드를 제공하는 ProducerScope가 있다.

    fun main() {
        runBlocking {
            val channel = produce{
                for (n in 1..5) {
                    val square =  n * n
                    println("Sending: $square")
                    send(square)
                }
            }
    
            launch {
                channel.consumeEach { println("Receiving: $it") }
            }
        }
    }
    //Sending: 1
    //Receiving: 1
    //Sending: 4
    //Sending: 9
    //Receiving: 4
    //Receiving: 9
    //Sending: 16
    //Sending: 25
    //Receiving: 16
    //Receiving: 25

    produce() 빌더의 경우 채널을 명시적으로 닫을 필요가 없고,

    코루틴이 종료되면 자동으로 빌더가 채널을 닫아준다.

    예외 처리는 produce()는 async() / await() 의 정책인

    예외가 발생하면 예외를 저장했다가 해당 채널에 receive()를 가장 처음 호출한

    코루틴 쪽에 예외를 다시 던지는 방식을 사용한다.

     

    내가 이해한 대로 정리하자면,

    코루틴을 채널을 만들고 채널에 값을 대입하고 하는 방식 대신

    produce()를 이용하면, 좀 더 편하게 값을 보내고 받을 수 있다는 것 같다.

    이 점에서 컬렉션에서 사용했던 sequnce()와 유사하다고 표현한 것 같다.

    sequence 역시 컬렉션을 사용할때 코드를 좀 더 줄여줬던 것으로 기억한다.

    결국 채널을 사용하면서 코드를 좀 더 줄일수 있는

    produce() 이용하는 방법에 대한 내용이라고 나는 이해했다.


    3) 티커

    티커 채널은 어떤 채널로부터 마지막 소비가 일어난 이후

    주어진 지연 시간이 지난 뒤에 Unit을 반환하는 랑데부 채널이다.

    이 채널은 한 원소와 다음 원소의 발생 시간이 주어진 지연 시간만큼 떨어져 있는 스트림을 만든다.

    만들기 위해서는 ticker() 함수를 사용해야 하며 변수로 아래 내용을 지정할 수 있다.

     

     ● delayMillis : 티커 원소의 발생 시간 간격, ms 단위로 지정

     ● initialDelayMillis : 티커 생성 시점과 원소 최초 발생 시점 사이 간격, 디폴트는 delayMills와 동일

     ● context : 티커를 실행할 코루틴 문맥, 티폴트는 빈 문맥

     ● mode : 티커의 행동을 결정, 디폴트는 FIXED_PERIOD

       ○ TickerMode.FIXED_PERIOD

        원소간의 시간 간격을 최대한 맞추기 위해 지연 시간을 조정

       ○ TickerMode.FIXED_DELAY

        실제 흘러간 시간과 관계없이 delayMills로 지정한 지연 시간만큼만 지연시키고, 다음 원소를 송신

    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.*
    
    @OptIn(ObsoleteCoroutinesApi::class)
    fun main() = runBlocking<Unit> {
        val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
        var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
        println("Initial element is available immediately: $nextElement") // return kotlin.Unit
        // no initial delay
    
        nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
        println("Next element is not ready in 50 ms: $nextElement") // return null
    
        nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // return kotlin.Unit
        println("Next element is ready in 100 ms: $nextElement")
    
        // Emulate large consumption delays
        println("Consumer pauses for 300ms")
        delay(300)
    
        nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } // return kotlin.Unit
        // Next element is available immediately
        println("Next element is available immediately after large consumer delay: $nextElement")
    
    
        nextElement = withTimeoutOrNull(95) { tickerChannel.receive() } // return kotlin.Unit
        // 300ms delay 이후 요소 간 delay 간격을 맞추기 위해 지정한 100ms 보다 빨리 요소를 받음
        println("Next element is ready in 95ms after consumer pause in 300ms: $nextElement")
    
        nextElement = withTimeoutOrNull(95) { tickerChannel.receive() } // return kotlin.Unit
        // 300ms delay 이후 요소 간 delay 간격을 맞추기 위해 지정한 100ms 보다 빨리 요소를 받음
        println("Next element is ready in 95ms after consumer pause in 300ms: $nextElement")
    
        tickerChannel.cancel() // indicate that no more elements are needed
    }
    //Initial element is available immediately: kotlin.Unit
    //Next element is not ready in 50 ms: null
    //Next element is ready in 100 ms: kotlin.Unit
    //Consumer pauses for 300ms
    //Next element is available immediately after large consumer delay: kotlin.Unit
    //Next element is ready in 95ms after consumer pause in 300ms: kotlin.Unit
    //Next element is ready in 95ms after consumer pause in 300ms: kotlin.Unit

    코틀린 DOCS 예제 코드에 테스트를 위해 코드를 좀 더 추가했다.

    https://kotlinlang.org/docs/channels.html#ticker-channels

     

    mode만 변경해서도 실행해봤다.

    @OptIn(ObsoleteCoroutinesApi::class)
    fun main() = runBlocking<Unit> {
        val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0, mode = TickerMode.FIXED_DELAY) // create ticker channel
        var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
        println("Initial element is available immediately: $nextElement") // return kotlin.Unit
        // no initial delay
    
        nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
        println("Next element is not ready in 50 ms: $nextElement") // return null
    
        nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // return kotlin.Unit
        println("Next element is ready in 100 ms: $nextElement")
    
        // Emulate large consumption delays
        println("Consumer pauses for 300ms")
        delay(300)
    
        nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } // return kotlin.Unit
        // Next element is available immediately
        println("Next element is available immediately after large consumer delay: $nextElement")
    
    
        nextElement = withTimeoutOrNull(95) { tickerChannel.receive() } // return null
        println("Next element is not ready in 95ms: $nextElement")
    
        nextElement = withTimeoutOrNull(95) { tickerChannel.receive() } // return kotlin.Unit
        println("Next element is ready in 190ms: $nextElement")
    
        nextElement = withTimeoutOrNull(95) { tickerChannel.receive() } // return null
        println("Next element is not ready in 285ms: $nextElement")
    
        tickerChannel.cancel() // indicate that no more elements are needed
    }
    //Initial element is available immediately: kotlin.Unit
    //Next element is not ready in 50 ms: null
    //Next element is ready in 100 ms: kotlin.Unit
    //Consumer pauses for 300ms
    //Next element is available immediately after large consumer delay: kotlin.Unit
    //Next element is not ready in 95ms: null
    //Next element is ready in 190ms: kotlin.Unit
    //Next element is not ready in 285ms: null

    4) 액터

    가변 스레드 상태를 안전하게 공유하는 방법을 구현하는 방법으로 액터(actor) 모델이 있다.

    액터는 내부 상태와 다른 액터에게 메시지를 보내 동시성 통신을 진행할 수 있는 수단을 제공하는 객체이다.

    액터는 자신에게 들어노는 메시지를 리슨(listen)하고, 자신의 상태를 바꾸면서 메시지에 응답할 수 있으며

    다른 메시지를 자기 자신이나 다른 액터에게 보낼 수 있고, 새로운 액터를 시작할 수도 있다.

     

    액터의 상태는 액터 내부에 감춰져 있으므로 다른 액터가 직접 이 상태에 접근할 수는 없고,

    메시지를 보내고 응답을 받아 상태를 아는 방법 뿐이다.

    그래서 액터 모델은 락 기반 동기화 관련 여러 문제로부터 자유룝다.

     

    actor() 코루틴 빌더를 사용해 액터를 만들 수 있다.

    액터는 ActorScope 영역을 만들고,

    이 영역은 기본 코루틴 영역에서 자신에게 들어오는 메시지에 접근할 수 있는 수신자 채널이 된다.

     

    actor() 빌더는 결과값을 생성해내는게 아닌 잡을 시작한다는 점과

    CoroutineExceptionHandler에 의존한다는 점에서 launch()과 유사하고,

    produce() 빌더와 actor() 빌더는 모두 통신에 채널을 사용하지만,

    액터는 데이터를 받기 위해, producer는 소비자에게 데이터를 보내기 위해

    채널을 생성한다는 점에서 서로 대비된다고 볼 수 있다.

     

    은행 계좌 잔고를 유지하고 어떤 금액을 저축 / 인출할 수 있는 코드를 작성해보자

    sealed class AccountMessage
    class GetBalance(
            val amount: CompletableDeferred<Long>
            // GetBalance 메시지를 보낸 코루틴에게 현재 잔고를 반환
    ) : AccountMessage()
    class Deposit(val amount: Long) : AccountMessage() // 예금
    class Withdraw(
            val amount: Long,
            val isPermitted: CompletableDeferred<Boolean>
            // 성공, 실패 여부를 반환
    ) : AccountMessage()
    
    fun CoroutineScope.accountManager( // 액터
            initialBalance: Long
    ) = actor<AccountMessage> {
        var balance = initialBalance
    
        for (message in channel) {
            when (message) {
                is GetBalance -> message.amount.complete(balance)
    
                is Deposit -> {
                    balance += message.amount
                    println("Deposited ${message.amount}")
                }
    
                is Withdraw -> {
                    val canWithdraw = balance >= message.amount
                    if (canWithdraw) {
                        balance -= message.amount
                        println("Withdrawn ${message.amount}")
                    }
                    message.isPermitted.complete(canWithdraw)
                    // 액터 클라이언트에게 요청 결과를 돌려줄 때 complete() 사용
                }
            }
        }
    }
    
    private suspend fun SendChannel<AccountMessage>.deposit(
            name: String,
            amount: Long
    ) {
        send(Deposit(amount))
        println("$name: deposit $amount")
    }
    
    private suspend fun SendChannel<AccountMessage>.tryWithdraw(
            name: String,
            amount: Long
    ) {
        val status = CompletableDeferred<Boolean>().let {
            send(Withdraw(amount, it))
            if (it.await()) "OK" else "DENIED"
        }
        println("$name: withdraw $amount ($status)")
    }
    
    private suspend fun SendChannel<AccountMessage>.printBalance(
            name: String
    ) {
        val balance = CompletableDeferred<Long>().let {
            send(GetBalance(it))
            it.await()
        }
        println("$name: balance is $balance")
    }
    
    fun main() {
        runBlocking {
            val manager = accountManager(100) // 초기값: 100원
            withContext(Dispatchers.Default) {
                launch {
                    manager.printBalance("Client #1")
                    manager.deposit("Client #1", 50) // +50원
                    manager.printBalance("Client #1")
                }
    
                launch {
                    manager.printBalance("Client #2")
                    manager.tryWithdraw("Client #2", 100) // -100원
                    manager.printBalance("Client #2")
                }
            }
    
            manager.tryWithdraw("Client #0", 1000) // -1000원, 인출실패
            manager.printBalance("Client #0")
            manager.close()
        }
    }
    //Client #1: balance is 100
    //Client #1: deposit 50
    //Deposited 50
    //Client #2: balance is 100
    //Client #1: balance is 150
    //Withdrawn 100
    //Client #2: withdraw 100 (OK)
    //Client #2: balance is 50
    //Client #0: withdraw 1000 (DENIED)
    //Client #0: balance is 50

    다음글

    13-4.  자바 동시성 사용하기

    반응형

    댓글

Designed by Tistory.