Jetpack Compose-基于flow实现EventBus(组件间消息传递)

前言

Jetpack Compose并没有提供组件之间的事件交互,这里我们可以使用flow自定义一个EventBus来实现。

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.ConcurrentHashMap

/**
* 基于 Flow 的自定义事件总线
* 支持:生命周期感知、粘性事件、线程切换、多事件类型
*/
object ZFlowEventBus {
// 存储事件流(事件类型 -> SharedFlow)
private val eventFlows = ConcurrentHashMap<Class<*>, MutableSharedFlow<Any>>()

// 互斥锁,保证并发安全
private val mutex = Mutex()

/**
* 发送事件(挂起函数,需在协程中调用)
* @param event 事件对象
*/
suspend fun <T : Any> post(event: T) {
val flow = eventFlows.getOrPut(event::class.java) {
MutableSharedFlow(replay = 0) // 非粘性事件,默认不重播
}
flow.emit(event)
}

/**
* 发送粘性事件(新订阅者可获取最新事件)
* @param event 事件对象
*/
suspend fun <T : Any> postSticky(event: T) {
val flow = eventFlows.getOrPut(event::class.java) {
MutableSharedFlow(replay = 1) // 粘性事件,重播1条
}
flow.emit(event)
}

/**
* 订阅事件(生命周期感知)
* @param lifecycleOwner 生命周期所有者(如 Activity/Fragment)
* @param eventType 事件类型(Class)
* @param block 事件处理逻辑
* @param dispatcher 指定收集线程(默认 Main)
*/
fun <T : Any> subscribe(
lifecycleOwner: LifecycleOwner,
eventType: Class<T>,
dispatcher: CoroutineDispatcher = Dispatchers.Main,
block: suspend (T) -> Unit,
) {
lifecycleOwner.lifecycleScope.launch {
// 仅在 CREATED 状态及以上收集事件
lifecycleOwner.repeatOnLifecycle(Lifecycle.State.CREATED) {
// 获取或创建事件流
val flow = eventFlows.getOrPut(eventType) {
MutableSharedFlow(replay = if (eventType.isAnnotationPresent(Sticky::class.java)) 1 else 0)
} as MutableSharedFlow<T>

// 收集事件(自动切换线程)
flow.collect { event ->
withContext(dispatcher) {
block(event)
}
}
}
}
}

suspend fun <T : Any> removeEvent(event: T) {
mutex.withLock {
eventFlows.remove(event::class.java)
}
}

/**
* 移除指定事件的粘性事件
* @param eventType 事件类型
*/
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun <T : Any> removeStickyEvent(eventType: Class<T>) {
mutex.withLock {
eventFlows[eventType]?.let { flow ->
// 清空 SharedFlow 的 replay 缓存(模拟移除粘性事件)
flow.resetReplayCache()
}
}
}
}

/**
* 粘性事件注解(标记需要粘性的事件类型)
*/
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.RUNTIME)
annotation class Sticky

定义事件

1
2
3
4
5
6
7
8
9
// 普通事件
class ZEventSelectSubject()

// 普通事件
data class ZEventSelectSubjectCb(val subjectId: String)

// 粘性事件(需添加 @Sticky 注解)
@Sticky
data class ZEventThemeChanged(val theme: String)

订阅事件

1
2
3
4
5
6
lifecycleScope.launch {
ZFlowEventBus.subscribe(lifecycleOwner, ZEventSelectSubjectCb::class.java) { event ->
// 更新 UI(自动在 Main 线程执行)

}
}

Compose组件内使用

1
2
3
4
5
6
7
8
9
val lifecycleOwner = LocalLifecycleOwner.current
LaunchedEffect(Unit) {
ZFlowEventBus.subscribe(
lifecycleOwner, ZEventSelectSubject::class.java,
block = {
vm.showSubjectDialog.value = true
}
)
}

带参数的

1
2
3
4
5
6
7
8
9
10
11
12
13
val lifecycleOwner = LocalLifecycleOwner.current
LaunchedEffect(Unit) {
ZFlowEventBus.subscribe(
lifecycleOwner, ZEventSelectSubjectCb::class.java
) {
ZFlowEventBus.removeEvent(it)
navController.navigate(
MyScreen.ScreenMistakeQuesGaopin.createRoute(
it.subjectId
)
)
}
}

接收到事件后记得移除事件

1
ZFlowEventBus.removeEvent(it)

发送事件

1
2
3
4
5
6
7
8
9
// 在 ViewModel 或协程中发送普通事件
viewModelScope.launch {
ZFlowEventBus.post(ZEventSelectSubjectCb("123"))
}

// 发送粘性事件(新订阅者可获取最新主题)
viewModelScope.launch {
ZFlowEventBus.postSticky(ZEventThemeChanged("Dark"))
}

Composable 组件中

1
2
3
4
val coroutineScope = rememberCoroutineScope()                             
coroutineScope.launch {
ZFlowEventBus.post(ZEventSelectSubject())
}

移除事件

1
2
3
4
// 在需要时移除粘性事件(如退出登录)
lifecycleScope.launch {
ZFlowEventBus.removeStickyEvent(ZEventThemeChanged::class.java)
}