How should we handle Kotlin Coroutines’ channel in Android’s components, especially in Activity/Fragment?

This is the English translation of the previous post.

class FooViewModel {
    private var _state: State
    val state: State
        get() = _state

    private val _states = BroadcastChannel<State>(1)
    val states: ReceiveChannel<State>
        get() = _states.openSubscription().also { _states.offer(state) }

    // reducer and other stuff here

Say we havve a ViewModel like above. It’s based on MVI-like architecture. FooViewModel.states will notify us if there’s any state update.

This FooViewModel is in a Kotlin Multiplatform Project Module is targeting Android and iOS, so we can’t use LiveData. Of cource we can choose to create some kind of Observable by ourselves, but because we have Kotlin Coroutines, I’d like to use Channel.

The problem is, how we should handle channel inside Activity/Fragment, based on its lifecycle. Channel cannot be paused/resumed, so we need to came up with something.

Two ideas poped up into my head: LifecycleObserver and LiveData.

class ChannelLifecycleObserver(
    private val owner: LifecycleOwner
) : LifecycleObserver, CoroutineScope by MainScope() {

    private val channels = mutableMapOf<ChannelHandler<*>, Job?>()

    private val shouldBeActive
        get() = owner.lifecycle.currentState.isAtLeast(Lifecycle.State.STARTED)

    private var isActive: Boolean = false

    fun <T> add(handle: ChannelHandler<T>) {
        val job = if (shouldBeActive) {
        } else null

        channels[handle] = job

    fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        if (owner.lifecycle.currentState == Lifecycle.State.DESTROYED) {

        val newActive = shouldBeActive
        // skip subsequent process if the active state is not changed
        // ex: STARTED -> RESUMED
        if (isActive == newActive) {
        isActive = newActive

        if (isActive) {
            println("should be active, register channels")

            channels.entries.forEach { (handle, job) ->
                if (job?.isActive == true) {
                channels[handle] = handle.start(this)
        } else {
            println("should be inactive, deregister channels")
            channels.entries.forEach { (handle, job) ->
                channels[handle] = null

    data class ChannelHandler<T>(val channelFactory: () -> ReceiveChannel<T>, val action: (T) -> Unit) {
        fun start(coroutineScope: CoroutineScope): Job {
            return coroutineScope.launch {
                channelFactory().consumeEach { value ->

 * Start/Cancel subscribing [ReceiveChannel] depending on a provided [lifecycleObserver].
fun <E> (() -> ReceiveChannel<E>).consumeEach(lifecycleObserver: ChannelLifecycleObserver, action: (E) -> Unit) {
            channelFactory = this,
            action = action

Implementation is fairly simple. Inspired by LiveData, just start subscribing to a channel the Activity/Fragment enters onStart/onResume, and canceled the cached Job when the Activity/Fragment enters onPause/onStop. Actual usage is below.

class FooActivity: AppCompatActivity() {
    val channelLifecycle by lazy { ChannelLifecycleObserver(this) }
    val viewModel: FooViewModel by lazy { /* Obtain a ViewModel */ }

    override fun onCreate(savedInstanceState: Bundle?) {

        // add to lifecycle

        // use method reference!
        viewModel::states.consumeEach(channelLifecycle) { state ->
            Log.d("FooActivity", "State Updated: $state")

It’s implemented as an extension function for a method and for me it looks a bit weired, but it is necessary since ReceiveChannel is un-reusable after its cancellation and exposing BroadcastChannel is not acceptable. So we need to re-create ReceiveChannel each time. We assume that the latest data is cached inside the ViewModel.

Implementation using LiveData should be pretty much the same, but I didn’t dig into it so much as I don’t want to think about the cache by LiveData.

class ChannelLiveData<T>(
    private val channelFactory: () -> ReceiveChannel<T>
) : LiveData<T>(), CoroutineScope by MainScope() {

    private var job: Job? = null

    override fun onActive() {

        job = launch {
            channelFactory().consumeEach { value ->

    override fun onInactive() {


fun <T> (() -> ReceiveChannel<T>).toLiveData(): LiveData<T> = ChannelLiveData(this)

Please not that you should not call these in lifecycle events which will be called many times in its whole lifecycle, such as onStart or onResume. Each time you call fun <E> (() -> ReceiveChannel<E>).consumeEach(lifecycleObserver, action), it will cache that call regardless of the previous call, and you eventually duplicate subscription.