H5与小程序的SSE实践
站长
· 阅读数 4
场景
- AI对话,uni-app
逻辑
- 具体逻辑:对话模式,用户输入后,监听流。
- 实现一个类,内部包含所有逻辑操作
- 流的实际内容是
markdown
格式的内容(解码后) - 使用 ua-markdown 进行展示
h5
- EventSource
小程序
- 流请求时需要携带 enableChunked 参数为true
- 获取到流信息之后得到的是 uint8Array,需要进行解码,这里依赖
iconv-lite
插件 - 插件会返回正确的内容,但是内容是断断续续的,需要手动拼接
实现(bug部分已更正,感兴趣的话详情查看bug标签):
// dialogContainer.ts
// #ifdef H5
// h5实现插件
import { EventSourcePolyfill } from 'event-source-polyfill';
// #endif
// #ifdef MP-WEIXIN
// 小程序实现插件
import * as TextEncoding from 'text-encoding-shim';
// #endif
import * as api from '../../../api/freeDialoging'
import utils from '../../../utils'
import storage from '../../../utils/storage';
// 小程序内容组合
import { combindStrV2 } from './wxMpStreamGenerateStr';
// 待处理的对话组
export interface IPreDialogItem {
type : '0' | '1'
content : string // 内容
desc ?: string // 二级文字描述
answer ?: (ls : IDialogItem[]) => void // 回话cb
load ?: boolean // 是否加载中
}
// 对话组
export interface IDialogItem {
time : string
type : '0' | '1' // 0 --- 机器人 1---人
content : string
desc ?: string
id ?: string
load ?: boolean
}
class DialogControler {
// 对话组列表
list : IDialogItem[] = []
// h5流
sseH5Source : EventSource
// 小程序流
sseWxMpStream : UniApp.RequestTask
// ai回复回调 返回最新对话列表
sseAnswerCb : (ls : IDialogItem[]) => void
// 创建新的对话
createNewDialog({ answer: cb, ...preItem } : IPreDialogItem) : string {
const time = utils.formatNowTime(new Date().valueOf())
const id = (Math.random() * 100000000).toFixed().toString()
const item : IDialogItem = {
...preItem,
time,
id
}
this.addToList(item)
if (preItem.type === '1') {
// 开始回话
this.sseAnswerCb = cb
this.getAnswer(preItem.content)
}
return id
}
private addToList(item : IDialogItem) {
this.list.push(item)
}
getList() : IDialogItem[] {
return this.list
}
// 回话生成
private async getAnswer(str : string) {
const res = await api.getSseId()
const id = res.data.sseId
// 从这里要根据环境决定如何处理
// #ifdef MP-WEIXIN
this.createWxMpDialog(id, str)
// #endif
// #ifdef H5
this.createH5SSE(id, str)
// #endif
}
// 创建 小程序 对话
private async createWxMpDialog(id : string, str : string) {
// 在请求流的时候,一定要带上 enableChunked 参数 为 true 来确认是流传输,api具体内容和下面的h5请求差不多
const reqTask = api.getWxMpStreamMessage(id)
this.sseWxMpStream = reqTask
const currentId = this.createNewDialog({
content: '',
type: '0',
load: true
})
this.wxMpDialogListener(currentId)
await api.sendMessage(id, str)
}
// 小程序流监听
private wxMpDialogListener = (currentId) => {
let str = ''
let isDone = false
// 流传输监听
this.sseWxMpStream.onChunkReceived(e => {
if (isDone) {
return
}
try {
// 小程序内流传输得到解析后的str,但是这时的str是断断续续的,需要进行拼接
// const strData = iconvLite.decode(e.data, 'utf-8')
const strData = new TextEncoding.TextDecoder('utf-8').decode(new Int8Array(e.data));
/*
{
paragraphIsEnd: 该段落是否结束
isDone: 回话结束
content: 段落内容
}
*/
// 拼接函数(在下面)
const tObj = combindStrV2(strData)
if (tObj) {
isDone = tObj.isDone
this.streamPush(currentId, tObj.content.replace('/\n/g', '<br/>'), tObj.isDone)
}
// v1版本
// const tObj = combindStr(strData, str)
// if (tObj.isDone) {
// isDone = true
// this.streamPush(currentId, tObj.content.replace('/\n/g', '<br/>'), true)
// } else if (tObj.paragraphIsEnd) {
// this.streamPush(currentId, tObj.content.replace('/\n/g', '<br/>'), false)
// str = ''
// } else {
// // 未结束,先记录内容,下次带入
// str += tObj.content
// }
} catch {
console.error(e);
this.streamPush(currentId, '内容生成错误,请重试!', true)
}
})
}
// 创建h5 sse
private async createH5SSE(id : string, str : string) {
if (!('EventSource' in window)) {
throw Error('该浏览器不支持SSE!!')
}
const baseUrl = utils.returnEnvConstants('baseUrl')
const that = this
this.sseH5Source = new EventSourcePolyfill(`${baseUrl}sse/create/${id}`,
{
headers: {
'Authorization': storage.get('token', ''),
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
},
}
)
await api.sendMessage(id, str)
// 创建对应对话
const currentId = this.createNewDialog({
content: '',
type: '0',
load: true
})
this.sseH5Source.onopen = function () {
//当连接正式建立时触发
// console.log('连接打开.')
}
this.sseH5Source.onmessage = function (msg) {
//当连接正式建立时触发
// console.log('onmessage cb!!', msg, msg.data);
const isDone = msg.lastEventId === '[DONE]'
const txt = JSON.parse(msg.data).data
that.streamPush(currentId, isDone ? '' : txt, isDone)
}
this.sseH5Source.onerror = function (e) {
//当连接发生error时触发
}
}
// 持续推送内容
streamPush(currentId : string, str : string, isDone = false) {
const findIdx = this.list.findIndex(({ id }) => id === currentId)
const cloneList = utils.deepClone(this.list)
cloneList[findIdx].content += str
cloneList[findIdx].load = !isDone
this.list = cloneList
this.sseAnswerCb(cloneList)
if (isDone) {
this.closeSSE()
}
}
closeSSE() {
// #ifdef MP-WEIXIN
this.sseWxMpStream.abort()
this.sseWxMpStream = null
// #endif
// #ifdef H5
this.sseH5Source.close()
this.sseH5Source = null
// #endif
}
}
export default DialogControler
// ./wxMpStreamGenerateStr.ts
export interface IStreamGenerateStrReturn {
paragraphIsEnd ?: boolean
isDone ?: boolean
content ?: string
}
// 解析str return:[是否传输结束]
/*
插件解析的内容是断断续续的,所以需要拼接组装。
*/
// 模型V1
export function combindStr(strData : string, latestStr : string) : IStreamGenerateStrReturn {
const strs = strData.split('\n')
const idItem = strs.find((item : string) => item.startsWith('id:'))
if (idItem) {
// 这个块存在id
const [, id] = idItem.split('id:')
if (id === '[DONE]') {
return {
isDone: true
}
}
}
const strArs = strData.match(/(?<=data\:).+(?=\nretry\:)/g);
// 完整信息
if (strArs) {
const parseStrs = strArs.map(it => JSON.parse(it).data)
return {
paragraphIsEnd: true,
content: parseStrs.join('')
}
}
// 需要拼接:
// 拼接分三种情况,有头没尾,没头有尾,没头没尾
// content
const [, strStart] = strData.split('data:')
// 有开头
if (strStart) {
const [strContnet, isEnd] = strStart.split('retry:')
// 有内容
if (strContnet) {
const str = tilterEmptyStr(strContnet)
return {
// 有 retry: 说明结尾了
paragraphIsEnd: !!isEnd,
content: latestStr + str
}
}
} else if (strData.endsWith('data:')) {
const str = tilterEmptyStr(latestStr)
// 在结尾,但是没有实际数据
return {
// 还没有结束,下次继续拼接
paragraphIsEnd: false,
content: str
}
} else {
// 接着上次接受的信息进行拼接
const [strContnet, isEnd] = strData.split('retry:')
const str = tilterEmptyStr(strContnet)
return {
// 还没有结束,下次继续拼接
paragraphIsEnd: !!isEnd,
content: latestStr + str
}
}
}
// 模型V2 (正在使用)
// 相对第一个,少了str 没有断掉的信息了,但是会有多个信息的str需要分隔。
export function combindStrV2(strData : string) : IStreamGenerateStrReturn {
const strs = strData.split('\n')
let contents = strs.filter(str => str.startsWith('data:{"data":') && str !== 'data:{"data":"[DONE]"}')
contents = contents.map(str => {
const s = str.split('data:')[1]
if (s) {
return JSON.parse(s).data
} else return ''
})
const endStr = strs.filter(str => str.startsWith('id:[DONE]'))
if (contents.length > 0) {
return {
content: contents.join(''),
isDone: false
}
} else if (endStr.length > 0) {
return {
content: '',
isDone: true
}
}
}
// 排除空白内容
const tilterEmptyStr = (str : string) => {
const s = str.trim()
if (s) {
const jsonS = JSON.parse(s) || {}
return jsonS.data || ''
}
return ''
}
使用
使用这个组件的时候,它的自动导入会将components导入到根目录,我是将它的位置重新放到了src/compontns下,否则小程序内打包后无法展示。
小结
h5的话还行,主要是小程序经历了一番波折,一直纠结如何解析 uint8Array,结果最终在ai上找到答案了hh,绝了。
bug进行时(24-2-21
windows系统,iconv-lite在小程序的npm构建环节中报错
报错内容截图:
测试的时候终究不出意外的出意外了。。tm的mac上面模拟器显示是完全没问题的,别人clone代码之后在windows上运行模拟器就会爆错误,并且会将这个错误延续到打包内容中
- 经过排查,这个问题应该是 微信开放社区:npm构建iconv-lite失败?
找了半天没有结果,最终的解决方案就是换一个编译的插件,最终选择了 text-encoding-shim
直接将对应的编译代码改成:
即可。
new Int8Array
这块是一个大坑(下文)!!
小程序 request enableChunked 流接收
这块是一个天坑。。。
他这里描述返回类型是 ArrayBuffer
是OK的,但是在模拟器中,显示返回是 Unit8Array
。。 就很离谱,随后我很多的解析查找也都是基于Uint8Array去找的,结果昨天真机模拟过程中,发现类型变了,又成了 ArrayBuffer,随后找文档确认才更正的。。(这里是解释为什么上面的那个bug中,函数入参为什么要 new Unit8Array)
所以说,任何方法的入参及返回都要以文档为准!!!(我就是不爱看,kao!
h5中 部分页面进入显示 无法连接服务器
这个经过排查,是因为在这个页面中针对小程序和h5有不同的处理,但是插件都是直接导入的,将对应的插件用 #ifdef xxx #endif
隔离即可。