Kotlin CoroutinesのChannelをAndroidでいい感じに使うにはどうしたらいいかなー、という話。
class FooViewModel {
private var _state: State
val state: State
get() = _state
private val _states = BroadcastChannel<State>(1)
val states: ReceiveChannel<State>
get() = _states.openSubscription().also { _states.offer(state) }
// reducerとか他にいろいろある
}
class FooViewModel {
private var _state: State
val state: State
get() = _state
private val _states = BroadcastChannel<State>(1)
val states: ReceiveChannel<State>
get() = _states.openSubscription().also { _states.offer(state) }
// reducerとか他にいろいろある
}
ちょーざっくりこんな感じのViewModelがあるとする。
MVI的なアーキテクチャになっていて、 FooViewModel.states
がStateの更新を通知してくれる。
このViewModelはKotlin Multiplatformなモジュールに存在し、Android/iOSで使い回すために LiveData
を使うことはできない。
適当なObservableを実装することもできるけど、まあせっかくなのでKotlin CoroutinesのChannelを使ってみようと思いこんな感じになっている。
問題は、この FooViewModel.states
をどうやってActivity/Fragmentのライフサイクルにあわせて使うか、ということだ。
Channelは現状pause/resumeができないので、なんとかする必要がある。
パッと思いついたのは LiveData
を実装する方法と LifecycleObserver
を実装する方法だ。
class ChannelLifecycleObserver(
private val owner: LifecycleOwner
) : LifecycleObserver, CoroutineScope by MainScope() {
private val channels = mutableMapOf<ChannelHandler<*>, Job?>()
private val shouldBeActive
get() = owner.lifecycle.currentState.isAtLeast(Lifecycle.State.STARTED)
private var isActive: Boolean = false
fun <T> add(handle: ChannelHandler<T>) {
val job = if (shouldBeActive) {
handle.start(this)
} else null
channels[handle] = job
}
@Suppress("unused")
@OnLifecycleEvent(Lifecycle.Event.ON_ANY)
fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (owner.lifecycle.currentState == Lifecycle.State.DESTROYED) {
cancel()
return
}
val newActive = shouldBeActive
// skip subsequent process if the active state is not changed
// ex: STARTED -> RESUMED
if (isActive == newActive) {
return
}
isActive = newActive
if (isActive) {
println("should be active, register channels")
channels.entries.forEach { (handle, job) ->
if (job?.isActive == true) {
job.cancel()
}
channels[handle] = handle.start(this)
}
} else {
println("should be inactive, deregister channels")
channels.entries.forEach { (handle, job) ->
job?.cancel()
channels[handle] = null
}
}
}
data class ChannelHandler<T>(val channelFactory: () -> ReceiveChannel<T>, val action: (T) -> Unit) {
fun start(coroutineScope: CoroutineScope): Job {
return coroutineScope.launch {
channelFactory().consumeEach { value ->
action(value)
}
}
}
}
}
/**
* Start/Cancel subscribing [ReceiveChannel] depending on a provided [lifecycleObserver].
*/
fun <E> (() -> ReceiveChannel<E>).consumeEach(lifecycleObserver: ChannelLifecycleObserver, action: (E) -> Unit) {
lifecycleObserver.add(
ChannelLifecycleObserver.ChannelHandler(
channelFactory = this,
action = action
)
)
}
class ChannelLifecycleObserver(
private val owner: LifecycleOwner
) : LifecycleObserver, CoroutineScope by MainScope() {
private val channels = mutableMapOf<ChannelHandler<*>, Job?>()
private val shouldBeActive
get() = owner.lifecycle.currentState.isAtLeast(Lifecycle.State.STARTED)
private var isActive: Boolean = false
fun <T> add(handle: ChannelHandler<T>) {
val job = if (shouldBeActive) {
handle.start(this)
} else null
channels[handle] = job
}
@Suppress("unused")
@OnLifecycleEvent(Lifecycle.Event.ON_ANY)
fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (owner.lifecycle.currentState == Lifecycle.State.DESTROYED) {
cancel()
return
}
val newActive = shouldBeActive
// skip subsequent process if the active state is not changed
// ex: STARTED -> RESUMED
if (isActive == newActive) {
return
}
isActive = newActive
if (isActive) {
println("should be active, register channels")
channels.entries.forEach { (handle, job) ->
if (job?.isActive == true) {
job.cancel()
}
channels[handle] = handle.start(this)
}
} else {
println("should be inactive, deregister channels")
channels.entries.forEach { (handle, job) ->
job?.cancel()
channels[handle] = null
}
}
}
data class ChannelHandler<T>(val channelFactory: () -> ReceiveChannel<T>, val action: (T) -> Unit) {
fun start(coroutineScope: CoroutineScope): Job {
return coroutineScope.launch {
channelFactory().consumeEach { value ->
action(value)
}
}
}
}
}
/**
* Start/Cancel subscribing [ReceiveChannel] depending on a provided [lifecycleObserver].
*/
fun <E> (() -> ReceiveChannel<E>).consumeEach(lifecycleObserver: ChannelLifecycleObserver, action: (E) -> Unit) {
lifecycleObserver.add(
ChannelLifecycleObserver.ChannelHandler(
channelFactory = this,
action = action
)
)
}
実装はシンプルで、LiveData
の処理を参考にしつつViewを更新できる状態になったらChannelを購読開始、Viewに触れない状態になったらキャッシュしておいたJobをキャンセル、としている。 Activity/Fragmentでこんな感じに使う。
class FooActivity: AppCompatActivity() {
val channelLifecycle by lazy { ChannelLifecycleObserver(this) }
val viewModel: FooViewModel by lazy { /* ViewModelを取得 */ }
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_foo)
// add to lifecycle
lifecyle.addObserver(channelLifecycle)
// use method reference!
viewModel::states.consumeEach(channelLifecycle) { state ->
Log.d("FooActivity", "State Updated: $state")
}
}
}
class FooActivity: AppCompatActivity() {
val channelLifecycle by lazy { ChannelLifecycleObserver(this) }
val viewModel: FooViewModel by lazy { /* ViewModelを取得 */ }
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_foo)
// add to lifecycle
lifecyle.addObserver(channelLifecycle)
// use method reference!
viewModel::states.consumeEach(channelLifecycle) { state ->
Log.d("FooActivity", "State Updated: $state")
}
}
}
メソッド参照の拡張関数になっている点が若干気持ち悪いが、これは ReceiveChannel
がcancelされたあとに再利用できないからだ。
かと言って BroadcastChannel
のまま公開するわけにもいかないので、まあ仕方ない。
ViewModel側で最新の値はキャッシュしている前提で、 ReceiveChannel
を毎回作り直すことにした。
LiveData
のパターンもだいたい同じ感じでできるはずだけど、 LiveData
内でのキャッシュとか考え始めたらめんどくさくなって深く考えてない。
class ChannelLiveData<T>(
private val channelFactory: () -> ReceiveChannel<T>
) : LiveData<T>(), CoroutineScope by MainScope() {
private var job: Job? = null
override fun onActive() {
super.onActive()
job = launch {
channelFactory().consumeEach { value ->
setValue(value)
}
}
}
override fun onInactive() {
super.onInactive()
job?.cancel()
}
}
fun <T> (() -> ReceiveChannel<T>).toLiveData(): LiveData<T> = ChannelLiveData(this)
class ChannelLiveData<T>(
private val channelFactory: () -> ReceiveChannel<T>
) : LiveData<T>(), CoroutineScope by MainScope() {
private var job: Job? = null
override fun onActive() {
super.onActive()
job = launch {
channelFactory().consumeEach { value ->
setValue(value)
}
}
}
override fun onInactive() {
super.onInactive()
job?.cancel()
}
}
fun <T> (() -> ReceiveChannel<T>).toLiveData(): LiveData<T> = ChannelLiveData(this)
どっちの実装もそうだけど、onResume
とか画面のライフサイクル内で複数回呼ばれる場所に書いてしまうと実行されるたびにChannelの購読数が増え、処理が重複してしまうので注意が必要。
単純に LiveData
の onActive/onInactive
でフラグをトグルして setValue
呼ぶかどうか判定する感じにしたらもしかしたら一番楽かもしれない。でもなんか購読しっぱなしが気持ち悪くてなかったことにした。