플로우 테스트

플로우 테스트

  • Flow를 반환하는 대부분의 함수는 Flow를 반환하는 다른 함수를 호출합니다.
  • 이는 데이터 → 도메인 레이어로 Flow 반환과 뷰 모델에서 UI 모델로의 전환에서도 적용됩니다.
  • 이러한 함수들을 어떻게 테스트하는지 직접 구현해봤습니다.

비즈니스 로직

  • 테스트를 위한 로직을 구현하였습니다.
  • ObserveAppointmentsService는 비즈니스 로직을 포함하고 있습니다.
  • 레퍼지토리 저장소로부터 Flow 데이터를 반환받아, 다양한 연산 후 재반환합니다.
// Sealed Class 정의
sealed class Appointment {
    data class AppointmentsUpdate(val appointments: List<Appointment>) : Appointment()
    data object AppointmentsAdd : Appointment()
    data object AppointmentsDelete : Appointment()
}

// Repository 인터페이스
interface AppointmentRepository {
    fun observeAppointments(): Flow<Appointment>
}

class ObserveAppointmentsService(
    private val appointmentRepository: AppointmentRepository,
) {
    // 원소 필터링, 매핑, 반복 원소 제거, 특정 예외의 경우 재시도 연산 수행
    fun observeAppointments(): Flow<List<Appointment>> =
        appointmentRepository
            .observeAppointments()
            .filterIsInstance<Appointment.AppointmentsUpdate>()
            .map { it.appointments }
            .distinctUntilChanged()
            .retry {
                it is ApiException && it.statusCode == CommonStatusCodes.NETWORK_ERROR
            }
}

단위 테스트 구성하기

  • 위 로직에 아래와 같은 단위 테스트를 수행할 수 있습니다.
    • 갱신된 약속(AppointmentsUpdate)만 유지합니다.
    • 이전 원소와 동일한 원소를 제거합니다.
    • 네트워크 에러가 발생한다면, 재시도를 요청합니다.

AppointmentRepository가 인터페이스로 구현되어 있으므로, 위 테스트를 진행하기 위해서는 모킹과 fake 객체를 활용할 수 있습니다. 테스트를 위해 가짜 객체를 만들고, 데이터 소스로 상수값을 제공하는 플로우를 반환해야 합니다.

FakeRepository

  • Fake 객체를 통한 상수값을 제공할 수 있습니다.
  • flow를 인자로 받으며, 테스트 환경에서 자유롭게 활용할 수 있습니다.
// Fake 객체를 통한 테스트
class FakeAppointmentRepository(
    private val flow: Flow<Appointment>
): AppointmentRepository {
    override fun observeAppointments(): Flow<Appointment> = flow

}

flowOf 활용

  • flowOf를 활용하여 시간 의존성이 없는 테스트를 진행할 수 있습니다.
// 리스트로 변환하여 테스트하기
// 시간 의존성 테스트가 불가능하지만 직관적
@Test
fun `should keep only appointments from`() = runTest {
    //given
    val repo = FakeAppointmentRepository(
        flow = flowOf(
            anAppointment1,
            anAppointment1,
            anAppointment2,
            anAppointment2,
        )
    )
    val service = ObserveAppointmentsService(repo)

    // when
    val result = service.observeAppointments().toList()

    // then
    Assertions.assertEquals(listOf(listOf(anAppointment1)), result)
}
  • 이는 리스트로 변환하여 테스트하는 방식과 같습니다.
  • 한정된 플로우를 생성하는 방법이며, 시간 의존성이 중요하지 않은 플로우를 테스트할 수 있습니다.
  • 하지만 원소가 실제로 지연 없이 전달되는지 확인할 수 없습니다.

시간 의존성 테스트

  • 가상 환경에서 코루틴 테스트를 제공하는 runTest를 사용하고, delay를 통해 시간 의존성 테스트를 진행할 수 있습니다.
// 시간 의존성 테스트
// 변화된 플로우에서 언제 원소가 방출되었는지에 대한 정보를 저장하여, 도착 시간 의존성을 확인할 수 있다.
@Test
fun `should eliminate elements that are`() = runTest {
    // given
    val repo = FakeAppointmentRepository(
        flow = flow {
            delay(1000)
            emit(Appointment.AppointmentsUpdate(listOf(anAppointment1)))
            emit(Appointment.AppointmentsUpdate(listOf(anAppointment1)))
            delay(1000)
            emit(Appointment.AppointmentsUpdate(listOf(anAppointment2)))
            delay(1000)
            emit(Appointment.AppointmentsUpdate(listOf(anAppointment2)))
            emit(Appointment.AppointmentsUpdate(listOf(anAppointment1)))
        }
    )
    val service = ObserveAppointmentsService(repo)

    // when
    val result = service.observeAppointments()
        .map { currentTime to it } // 도착 시간 확인
        .toList()
    //then
    Assertions.assertEquals(
        listOf(
            1000L to listOf(anAppointment1),
            2000L to listOf(anAppointment2),
            3000L to listOf(anAppointment1),
        ), result
    )
}

retry(에러 재시도) 테스트

  • [네트워크 에러가 발생하면 retry를 활용한 재시도를 지원한다]에 대한 비즈니스 로직을 테스트할 수 있습니다.
  • 주의할 점은 재시도하는 플로우를 반환할 경우, 테스트하는 함수가 무한정 재시도하게 되며 끝나지 않는 플로우가 생성됩니다.
    • 이는 StateFlow, SharedFlow를 활용하는 방법과 동일합니다.
    • 플로우가 끝나지 않아서, 테스트가 실패합니다.
  • take를 활용한 원소 수 제한으로 간단하게 해결할 수 있습니다.
// 에러 코드 API 테스트하기
// 재시도하지 않는 플로우를 반화할 경우 재시도 기능을 테스트할 필요가 없다.
// 재시도하는 플로우를 반환하는 경우, 무한정 재시도 되어 플로우가 끝나지 않음
// 원소 수를 제한하는 방법을 활용
@Test
fun `should retry when API exception`() = runTest {
    // given
    val status = Status(CommonStatusCodes.NETWORK_ERROR) // 예: NETWORK_ERROR 상태 코드
    val apiException = ApiException(status)
    val repo = FakeAppointmentRepository(
        flow {
            emit(Appointment.AppointmentsUpdate(listOf(anAppointment1)))
            throw apiException
        }
    )
    val service = ObserveAppointmentsService(repo)

    // when
    val result = service.observeAppointments()
        .take(3)
        .toList()

    // then
    Assertions.assertEquals(
        listOf(
            listOf(anAppointment1),
            listOf(anAppointment1),
            listOf(anAppointment1)
        ), result
    )
}

끝나지 않는 플로우 테스트

  • retry에 대한 로직을 테스트할 때, 끝나지 않는 플로우에 대한 테스트를 진행하였습니다.
  • 원소 수 제한을 통해 쉽게 해결 하였지만, 스코프를 필요로 하는 플로우에 대한 테스트는 이를 통해 해결하기 어렵습니다.
    • 원소 수를 제한하는 take를 활용할 경우 테스트는 통과하지만, 많은 정보를 잃게 됩니다.
  • StateFlow와 SharedFlow는 코루틴 환경에서 수집되어야 하며, 스코프를 필요로 합니다.
  • runTest를 사용할 경우 backgroundScope를 사용하므로 테스트에서 스코프가 끝나는 것을 기다릴 수 없습니다.

해결 방안

  • 스코프가 취소되지 않는 한 플로우는 완료되지 않으며, backgroundScope를 활용하는 runTest에서는 스코프가 끝나는 것을 무한정 기다릴 수 없습니다.
  • 이를 해결하기 위해서 아래와 같은 해결 방안을 생각해볼 수 있습니다.
    • 짧은 시간 동안만 감지할 수 있는 toList 활용
    • 플로우가 방출하는 모든 원소를 컬렉션에 저장
    • Turbine 라이브러리 활용
  • 끝나지 않는 플로우 테스트를 위해서 flow를 감지하여 저장하는 MessageService를 생성하였습니다.
data class Message(
    val fromUserId : String,
    val message: String,
)

// 끝나지 않는 플로우 테스트
// 상태플로우, 공유 플로우를 테스트하는 방법을 학습합니다.
class MessageService(
    messageSource : Flow<Message>,
    scope: CoroutineScope,
) {
    private val source = messageSource
        .shareIn(
            scope = scope,
            started = SharingStarted.WhileSubscribed()
        )

    fun observeMessage(fromUserId: String) = source
        .filter { it.fromUserId == fromUserId }
}

  • 이 로직은 scope가 취소되지 않는 한 끝나지 않는 플로우를 반환합니다.

문제 확인하기

  • 플로우가 방출하는 원소를 저장하기 위해서 toList를 활용할 수 있습니다.
  • toList를 사용할 경우, 이 테스트는 영원히 중단됩니다.
  • 이를 해결하기 위해서 take를 활용하여 원소 수를 제한하지만, 많은 정보를 손실합니다.
// toList 활용 -> 테스트가 끝나지 않음
// take 활용 -> 정확한 테스트 불가능
@Test
fun `should emit messages from user`() = runTest {
    // given
    val source = flowOf(
        Message(fromUserId = "0", message = "A"),
        Message(fromUserId = "1", message = "B"),
        Message(fromUserId = "0", message = "C"),
    )
    val service = MessageService(
        messageSource = source,
        scope = backgroundScope
    )

    // when
    val result = service.observeMessage("0")
        // take를 호출하면 테스트가 통과하지만, 많은 정보를 잃습니다.
        .take(2)
        .toList() // 영원히 기다리게 됨 (공유 플로우 특징)

    // then
    Assertions.assertEquals(
        listOf(
            Message(fromUserId = "0", message = "A"),
            Message(fromUserId = "0", message = "C"),
        ), result
    )
}
  • 이런 상황을 단위 테스트로 인지하기 어려우며, 영원히 실행될 가능성이 존재합니다.

플로우가 방출하는 모든 원소를 컬렉션에 저장

  • backgroundScope에서 플로우를 시작하고, 모든 원소를 컬렉션에 저장하는 방법입니다.
  • 이 방식은 실패하는 경우에 무엇인지와 무엇이 되어야 하는지를 명확하게 보여주며, 테스트 시간을 유연하게 설정할 수 있습니다.
// backgroundScope에서 플로우를 시작하고, 플로우가 방출하는 모든 원소를 컬렉션에 저장
// 실패하는 경우 무엇인지, 무엇이 되어야 하는지에 대해 명확해짐
// 테스트 시간을 유연하게 설정 가능
@Test
fun `should emit message from user`() = runTest {
    //given
    val source = flow {
        emit(Message(fromUserId = "0", message = "A"))
        delay(1000)
        emit(Message(fromUserId = "1", message = "B"))
        emit(Message(fromUserId = "0", message = "C"))
    }
    val service = MessageService(
        messageSource = source,
        scope = backgroundScope
    )

    // when
    val emittedMessages = mutableListOf<Message>()
    service.observeMessage("0")
        .onEach { emittedMessages.add(it) }
        .launchIn(backgroundScope) // scope 종속
    delay(1)

    // then
    Assertions.assertEquals(
        listOf(
            Message(fromUserId = "0", message = "A")
        ), emittedMessages
    )
}

짧은 시간 동안만 감지할 수 있는 toList 활용

  • 유연성이 떨어지지만, 간단하고 직관적인 방법으로 테스트를 진행할 수 있습니다.
  • toListDuring이라는 확장 함수를 생성해, 짧은 시간만 감지하는 테스트 환경을 구축합니다.
// 짧은 시간 동안만 유지되는 List를 통해 끝나지않는 flow 테스트
suspend fun <T> Flow<T>.toListDuring(
    duration: Duration
): List<T> = coroutineScope {
    val result = mutableListOf<T>()
    val job = launch {
        this@toListDuring.collect(result::add)
    }
    delay(duration)
    job.cancel()
    return@coroutineScope result
}
  • 이 로직은 duration 동안만 감지를 실행하며, 끝나지 않은 테스트를 의도적으로 조작할 수 있습니다.
// ListDuringForFlowTest
@Test
fun `should emit message from user with Extension`() = runTest {
    //given
    val source = flow {
        emit(Message(fromUserId = "0", message = "A"))
        emit(Message(fromUserId = "1", message = "B"))
        emit(Message(fromUserId = "0", message = "C"))
    }
    val service = MessageService(
        messageSource =  source,
        scope = backgroundScope,
    )

    // when
    val emittedMessages = service.observeMessage("0")
        .toListDuring(1.milliseconds)

    // then
    Assertions.assertEquals(
        listOf(
            Message(fromUserId = "0", message = "A"),
            Message(fromUserId = "0", message = "C")
        ), emittedMessages
    )
}

Turbine 라이브러리 활용

  • Turbine 라이브러리는 객체별로 데이터를 모을 수 있게 해 주며, 원소를 기다리는 데 사용할 수 있는 도구를 제공합니다.
  • 이를 활용하면 플로우 테스트를 간단하게 수행할 수 있습니다.
  • 쉽고 직관적으로 해결할 수 있지만, 서드파티 라이브러리를 추가해야 하기 때문에 협업 상황에서는 논의가 필요합니다.

뷰 모델 테스트

  • 플로우 빌더는 원소가 소스로부터 어떻게 방출되어야 하는지 저장하는 간단하고 강력한 메서드입니다.
  • SharedFlow를 소스로 사용하고 테스트에서 원소를 방출하는 방법이 있는데, 뷰 모델 테스트에 유용하게 활용할 수 있습니다.

뷰 모델 로직 구현

  • 테스트 환경을 위해서 비즈니스 로직을 해결하는 뷰 모델을 구현하였습니다.
// 플로우 테스트를 뷰 모델에서 진행
// SharedFlow를 소스로 사용하고, 테스트에서 원소를 방출하는 방법
class ChatViewModel(
    private val scope : CoroutineScope,
    private val messageService: MessageService,
): ViewModel() {
    private val _lastMessage =
        MutableStateFlow<String?>(null)
    val lastMessage:StateFlow<String?> = _lastMessage

    private val _message =
        MutableStateFlow(emptyList<String>())
    val message:StateFlow<List<String>> = _message

    fun start(fromUserId: String){
        messageService.observeMessage(fromUserId)
            .onEach {
                val text = it.message
                _lastMessage.value = text
                _message.value += text
            }
            .launchIn(scope)
    }
}
  • 가상 시간에 의존하지 않고, 함수의 동작을 정확하게 테스트할 수 있습니다.
  • 단위 테스트를 더 쉽게 읽을 수 있으며, 시간 의존성이 있는 ViewModel에 대한 테스트가 가능합니다.
@ExperimentalCoroutinesApi
@ExtendWith(
    CoroutinesTestExtension::class
)
class ChatViewModelTest {
    @Test
    fun `should expose message from user`() = runTest {
        //given
        val source = MutableSharedFlow<Message>(replay = 1)

        // when
        val viewModel = ChatViewModel(
            messageService = MessageService(
                messageSource = source,
                scope = backgroundScope,
            ),
            scope = backgroundScope
        )
        viewModel.start("0")
        // then
        Assertions.assertEquals(
            null,
            viewModel.lastMessage.value
        )

        // when
        source.emit(Message(fromUserId = "0", message = "ABC"))
        advanceTimeBy(1)
        // then
        Assertions.assertEquals(
            "ABC",
            viewModel.lastMessage.value
        )
        Assertions.assertEquals(
            listOf("ABC"),
            viewModel.message.value
        )

        // when
        source.emit(Message(fromUserId = "0", message = "DEF"))
        source.emit(Message(fromUserId = "1", message = "GHI"))
        advanceTimeBy(1)
        // then
        Assertions.assertEquals(
            "DEF",
            viewModel.lastMessage.value
        )
        Assertions.assertEquals(
            listOf("ABC", "DEF"),
            viewModel.message.value
        )
    }

}

요약

  • 플로우에 대한 테스트 원리는 중단 함수와 코루틴과 비슷합니다.
  • 하지만 플로우는 시간 지연 테스트에 대한 로직을 구현해야 하는 특징이 있습니다.
  • 다양한 환경에 맞는 테스트 환경을 구축하여 플로우 테스트를 진행할 수 있습니다.