前言
我们在调用一些AI的接口的时候都是流式输出,这里就说一下流式接口怎样调用。
Service定义
定义接口的时候要添加@Streaming注解
1 2 3 4 5 6 7 8 9 10 11 12
| import okhttp3.ResponseBody import retrofit2.Call import retrofit2.http.Body import retrofit2.http.POST import retrofit2.http.Streaming
interface ServiceAi { @Streaming @POST("/api/que/question/ai-stream") fun apiQueQuestionAiStream(@Body params: Map<String, @JvmSuppressWildcards Any>): Call<ResponseBody> }
|
调用
同步调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| viewModelScope.launch { val paras = mutableMapOf<String, Any>() paras["id"] = quesId.value val call = ApiManager.aiService.apiQueQuestionAiStream( paras ) flow { val response = call.execute() if (response.isSuccessful && response.body() != null) { val inputStream = response.body()?.byteStream() val reader = BufferedReader(InputStreamReader(inputStream)) var line: String? while (reader.readLine().also { line = it } != null) { if (line != null && line != "[stop]") { emit(line) } } reader.close() } } .flowOn(Dispatchers.IO) .collect { Log.i("AI", "onResponse: $it") } }
|
异步调用
在使用 Retrofit 进行网络请求时,call.enqueue() 方法的行为是:
- 请求的执行(网络操作)会自动在后台线程中进行,不会阻塞主线程(UI 线程)
- 回调方法(
onResponse() 和 onFailure())则会在主线程(UI 线程) 中执行
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| fun extractData(rawData: String): String? { if (rawData.startsWith("data:\"") && rawData.endsWith("\"")) { var str = Gson().fromJson(rawData.substring(5, rawData.length), String::class.java) str = str.replace("data:", "<br>") return str } return null }
viewModelScope.launch { val paras = mutableMapOf<String, Any>() paras["id"] = quesId.value val call = ApiManager.aiService.apiQueQuestionAiStream( paras ) call.enqueue(object : Callback<ResponseBody> { override fun onResponse( call: Call<ResponseBody>, response: Response<ResponseBody> ) { viewModelScope.launch(Dispatchers.IO) { if (response.isSuccessful && response.body() != null) { val inputStream = response.body()?.byteStream() val reader = BufferedReader(InputStreamReader(inputStream)) var line: String? while (reader.readLine().also { line = it } != null) { val data = extractData(line ?: "") data?.let { if (it != "[stop]") { _resultMdStr.value += it }
} } delay(300) _resultMdStr.value += "\n" reader.close() } }
}
override fun onFailure(call: Call<ResponseBody>, t: Throwable) {
} }) }
|
高频变化渲染卡顿
如果数据变化的太快,UI渲染就会卡顿,这里把数据转成流,添加防抖机制来解决卡顿问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| val _resultMdStr = mutableStateOf("")
val resultMdStr = mutableStateOf("")
val resultMdFlow: Flow<String> = snapshotFlow { _resultMdStr.value }.throttleFirst(300)
init { viewModelScope.launch { resultMdFlow.collect { resultMdStr.value = it } } }
fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> = flow { var lastEmissionTime = 0L collect { value -> val currentTime = System.currentTimeMillis() if (currentTime - lastEmissionTime >= windowDuration) { lastEmissionTime = currentTime emit(value) } } }
|