Retrofit2流式接口的调用

前言

我们在调用一些AI的接口的时候都是流式输出,这里就说一下流式接口怎样调用。

Service定义

定义接口的时候要添加@Streaming注解

1
2
3
4
5
6
7
8
9
10
11
12
import okhttp3.ResponseBody
import retrofit2.Call
import retrofit2.http.Body
import retrofit2.http.POST
import retrofit2.http.Streaming

interface ServiceAi {
// 试题AI思路点拨
@Streaming
@POST("/api/que/question/ai-stream")
fun apiQueQuestionAiStream(@Body params: Map<String, @JvmSuppressWildcards Any>): Call<ResponseBody>
}

调用

同步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
viewModelScope.launch {
val paras = mutableMapOf<String, Any>()
paras["id"] = quesId.value
val call = ApiManager.aiService.apiQueQuestionAiStream(
paras
)
flow {
val response = call.execute()
if (response.isSuccessful && response.body() != null) {
val inputStream = response.body()?.byteStream()
val reader = BufferedReader(InputStreamReader(inputStream))
var line: String?
while (reader.readLine().also { line = it } != null) {
if (line != null && line != "[stop]") {
emit(line)
}
}
reader.close()
}
}
.flowOn(Dispatchers.IO)
.collect {
Log.i("AI", "onResponse: $it")
}
}

异步调用

在使用 Retrofit 进行网络请求时,call.enqueue() 方法的行为是:

  • 请求的执行(网络操作)会自动在后台线程中进行,不会阻塞主线程(UI 线程)
  • 回调方法(onResponse()onFailure())则会在主线程(UI 线程) 中执行

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
fun extractData(rawData: String): String? {
// 检查字符串是否以 "data:" 开头
if (rawData.startsWith("data:\"") && rawData.endsWith("\"")) {
// 提取引号之间的内容
var str = Gson().fromJson(rawData.substring(5, rawData.length), String::class.java)
str = str.replace("data:", "<br>")
return str
}
return null
}

viewModelScope.launch {
val paras = mutableMapOf<String, Any>()
paras["id"] = quesId.value
val call = ApiManager.aiService.apiQueQuestionAiStream(
paras
)
call.enqueue(object : Callback<ResponseBody> {
override fun onResponse(
call: Call<ResponseBody>,
response: Response<ResponseBody>
) {
viewModelScope.launch(Dispatchers.IO) {
if (response.isSuccessful && response.body() != null) {
val inputStream = response.body()?.byteStream()
val reader = BufferedReader(InputStreamReader(inputStream))
var line: String?
while (reader.readLine().also { line = it } != null) {
val data = extractData(line ?: "")
data?.let {
if (it != "[stop]") {
_resultMdStr.value += it
}

}
}
delay(300)
_resultMdStr.value += "\n"
reader.close()
}
}

}

override fun onFailure(call: Call<ResponseBody>, t: Throwable) {

}
})
}

高频变化渲染卡顿

如果数据变化的太快,UI渲染就会卡顿,这里把数据转成流,添加防抖机制来解决卡顿问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
val _resultMdStr = mutableStateOf("")


val resultMdStr = mutableStateOf("")

// 将State转换为Flow
val resultMdFlow: Flow<String> = snapshotFlow {
// 这里返回需要监听的状态值,当该值变化时会触发Flow发射
_resultMdStr.value
}.throttleFirst(300)

init {
viewModelScope.launch {
resultMdFlow.collect {
resultMdStr.value = it
}
}
}

// 节流操作符(需引入 coroutines-flow 相关依赖)
fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> = flow {
var lastEmissionTime = 0L
collect { value ->
val currentTime = System.currentTimeMillis()
if (currentTime - lastEmissionTime >= windowDuration) {
lastEmissionTime = currentTime
emit(value)
}
}
}