Fastapi框架-冷饭再炒-基础知识补充篇(10)-玩玩websocket案例并扩展实现倒计时服务端主动关闭的实现
websocket简单概念理解
websocket其实可以理解:web+socket= websocket,它是基于socket之上工作于应用层的一种在单个 TCP 连接上进行全双工通讯的协议。
websocket它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,也可以理解为websocket属于服务器推送技术的一种。
当然它还有其他特点等等:
- 不受同源策略的影响,客户端可以与任意服务器通信
- 内心消息可以发送文本,也可以发送二进制数据
- 与 HTTP 协议有着良好的兼容性
刚好因为我的后台,需要进行一些消息的通知推送到前端,所有需要进行相关的websocket通信。至于具体的一些websocket的知识点,我这里不会展开太多,这里主要是实践记录为主。之前对于flask有websocket需求也是基于gevent-websocket来实现的。刚好fastapi也自带了支持websocket,这个可以很好实现我的简单需求。
1 官网示例简单介绍
1.1官网示例1:websocket填坑
首先,如果你按照官网的示例进行复制黏贴上来跑的话,你会发现你的你跑起来的,当尝试连接你的websocket的时候,它就一直是连不上滴!
官网的最简单的示例代码为:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
if __name__ == '__main__':
import uvicorn
uvicorn.run(app='main:app', host="127.0.0.1", port=8000, reload=True, debug=True)
uvicorn.run('masin:app', port=8011, )
但是,跑起来的时候呀!浏览器会一直报错提示,开始以为是浏览器问题还有不同的端口号的问题啊之类的~但是各自尝试后还是一样的问题。
failed: Error during WebSocket handshake: xxxxxx: 400
百思不得其解!于是乎决定重新的安装一下我们uvicorn:
pip install uvicorn[standard]
必须使用上面的那种方式,就算你单独的安装standard有是有也不行!或者可能是我的没有重启!
又或者重新安装一下websockets:
pip install websockets
相关的对应的版本如下:
后来可以愉快的进行连接上我们的WebSocket了!
当我们的服务跑起来后,前端可以直接的通过连接我们的@app.websocket("/ws")进行通讯 1:打开浏览器后查看我们的链接状态
2:随意输入,就可以看到我们的自己输入信息再浏览器了!
不过上面的示例是存在缺陷的,没有进行退出的异常捕获处理!当我们的浏览器刷新之后,因为没有进行链接的释放和关闭和引发了异常!
1.2 官网示例2-新增依赖注入
增加token依赖的校验!示例完整代码为:
from typing import Optional
from fastapi import Cookie, Depends, FastAPI, Query, WebSocket, status
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label>
<label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label>
<button onclick="connect(event)">Connect</button>
<hr>
<label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = null;
function connect(event) {
var itemId = document.getElementById("itemId")
var token = document.getElementById("token")
ws = new WebSocket("ws://localhost:8000/items/" + itemId.value + "/ws?token=" + token.value);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
event.preventDefault()
}
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
async def get_cookie_or_token(
websocket: WebSocket,
session: Optional[str] = Cookie(None),
token: Optional[str] = Query(None),
):
if session is None and token is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return session or token
@app.websocket("/items/{item_id}/ws")
async def websocket_endpoint(
websocket: WebSocket,
item_id: str,
q: Optional[int] = None,
cookie_or_token: str = Depends(get_cookie_or_token), # 依赖于get_cookie_or_token的信息
):
# 等待连接
await websocket.accept()
# 处理链接
while True:
# 接收发送过来的数据信息
data = await websocket.receive_text()
# 把接收过来的数据再一次的发送回去
await websocket.send_text( f"Session cookie or query token value is: {cookie_or_token}" )
# 如果存在参数信息
if q is not None:
# 也会吧参数信息再一次的发送回去
await websocket.send_text(f"Query parameter q is: {q}")
# 发送其他的信息
await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}")
if __name__ == '__main__':
import uvicorn
uvicorn.run(app='main:app', host="127.0.0.1", port=8000, reload=True, debug=True)
uvicorn.run('masin:app', port=8011, )
示例步骤的说明:
首先就是浏览器打开页面,会进行websocket的连接:
1:连接上来说,当我们的第一次点击发送send,它因为需要依赖于我们的get_cookie_or_token,所以会跑里面执行后直接的返回了!查询token由依赖项进行处理。
2:第二次的时候,才正式的进入到我们的里面进行数据发送和接收。
1.3 官网示例3-多人文字聊天
前两个的示例中一直没有处理关于,当我们的客户端断开连接之后,引发的服务端的WebSocketDisconnect的异常问题,主要是需要及时进行异常捕获和关闭的处理即可!
如官网给出的完整的具体示例如下:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : masin
文件功能描述 : 功能描述
创建人 : 小钟同学
"""
from typing import List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>你的用户ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
class ConnectionManager:
def __init__(self):
# 保存当前所有的链接的websocket对象
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
# 太那几到我们激活的链接队列李敏
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
# 从队列里面删除我们的已经断开链接的websocket对象
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
#发现消息
await websocket.send_text(message)
async def broadcast(self, message: str):
# 循环变量给所有在线激活的链接发送消息-全局广播
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
# 前端默认的
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"你输入了:: {data}", websocket)
await manager.broadcast(f"Client #{client_id} 说了: {data}")
# 对应客户端断开链接--关闭浏览器之后的处理,链接的关闭处理
except WebSocketDisconnect:
manager.disconnect(websocket)
# 进行全局的广播所有的在线链接的所有用户消息
await manager.broadcast(f"Client #{client_id} 离开了聊天室")
if __name__ == '__main__':
import uvicorn
uvicorn.run(app='main:app', host="127.0.0.1", port=8000, reload=True, debug=True)
uvicorn.run('masin:app', port=8011, )
然后我们可以使用在线测试工具进行连接测试,或直接的打开浏览器测试:
使用测试工具开一个客户端:
请求地址为:ws://localhost:8000/ws/xxxxxxid
使用浏览器开一个客户端并发送信息:
查看结果:
2 扩展-客服系统倒计时消息恢复处理
假如你现在使用websocket做一个客服的系统,此时需要处理用户消息处理,但是假如一个用户连接上来了之后,一直没有发送任何消息的情况下,它也不关闭我们的浏览器的情况下,我们服务端该如何处理这种一直不说话又占坑位的用户咨询的呐?
一般肯定是对我们的接收的消息机制那进行超时判断的处理,判断多久没有收到相关的消息了,但是如何处理好这个超时的呐? 简单的来说可以是:
---服务端主动询问是否还有什么问题需要咨询,如果没有就则开始倒计时 ---然后每隔多久询问一次,直到累计一定次数后,还是一直没有收到这个用户ID发来的消息的话,那么我们就主动的断开这个用户的链接。
协程的超时机制其实有两种的实现方式:
- 一种是基于:async_timeout的timeout来实现,timeout使用的是上下文管理器的形式
- 一种是基于asyncio自带的 asyncio.wait_for
基于官网提供的示例3来实现:
2.1 使用async_timeout的timeout来实现的方式:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : masin
文件功能描述 : 功能描述
创建人 : 小钟同学
"""
from typing import List
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>你的用户ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
class ConnectionManager:
def __init__(self):
# 保存当前所有的链接的websocket对象
# self.active_connections: List[WebSocket] = []
self.active_connections = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
# 太那几到我们激活的链接队列李敏
self.active_connections.append(websocket)
async def disconnect(self, websocket: WebSocket):
# 从队列里面删除我们的已经断开链接的websocket对象
self.active_connections.remove(websocket)
# await websocket.close()
async def send_personal_message(self, message: str, websocket: WebSocket):
#发现消息
await websocket.send_text(message)
async def broadcast(self, message: str):
# 循环变量给所有在线激活的链接发送消息-全局广播
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
# 前端默认的
await manager.connect(websocket)
timeout_count = 0
try:
while True:
try:
async with async_timeout.timeout(1) as cm:
data =await websocket.receive_text()
timeout_count = 0
await manager.send_personal_message(f"你输入了:: {data}", websocket)
await manager.broadcast(f"Client #{client_id} 说了: {data}")
print(cm.expired)
except asyncio.TimeoutError:
timeout_count = timeout_count+1
await manager.send_personal_message(f"您很久没有输入消息了,请问您还有什么需要咨询的吗?::({timeout_count}) ", websocket)
# 注意的地方!!!!!!!!!!!!!!!
# 注意的地方!!!!!!!!!!!!!!!
# 注意的地方!!!!!!!!!!!!!!!
# 注意的地方!!!!!!!!!!!!!!!
if timeout_count >5:
# 只能在这里再主动的抛出一次短链,彻底的进行删除
raise WebSocketDisconnect()
# 不可以直接的进行删除,不然会有问题!---比较奇葩
# await manager.disconnect(websocket)
# 对应客户端断开链接--关闭浏览器之后的处理,链接的关闭处理
except WebSocketDisconnect:
await manager.disconnect(websocket)
# 进行全局的广播所有的在线链接的所有用户消息
await manager.broadcast(f"Client #{client_id} 离开了聊天室")
if __name__ == '__main__':
import uvicorn
#
uvicorn.run(app='main:app', host="127.0.0.1", port=8000, reload=True, debug=True)
2.2 使用asyncio.wait_for来实现的方式:
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
# 前端默认的
await manager.connect(websocket)
timeout_count = 0
try:
while True:
try:
# 间隔1秒检测是否有收到信息,如果没有则就抛出超时,通知前端
# 间隔1秒检测是否有收到信息,如果没有则就抛出超时,通知前端
data = await asyncio.wait_for(websocket.receive_text(), 1)
await manager.send_personal_message(f"你输入了:: {data}", websocket)
# 通知全部的激活的在线链接的对象
await manager.broadcast(f"Client #{client_id} 说了: {data}")
timeout_count = 0
# data = await asyncio.wait_for(websocket.receive_text(), 1)
# await manager.send_personal_message(f"你输入了:: {data}", websocket)
# 通知全部的激活的在线链接的对象
# await manager.broadcast(f"Client #{client_id} 说了: {data}")
except asyncio.TimeoutError:
timeout_count = timeout_count + 1
await manager.send_personal_message(f"您很久没有输入消息了,请问您还有什么需要咨询的吗?::({timeout_count}) ", websocket)
# 注意的地方!!!!!!!!!!!!!!!
# 注意的地方!!!!!!!!!!!!!!!
# 注意的地方!!!!!!!!!!!!!!!
# 注意的地方!!!!!!!!!!!!!!!
if timeout_count > 5:
# 只能在这里再主动的抛出一次短链,彻底的进行删除
raise WebSocketDisconnect()
# 不可以直接的进行删除,不然会有问题!---比较奇葩
#await manager.disconnect(websocket)
# 对应客户端断开链接--关闭浏览器之后的处理,链接的关闭处理
except WebSocketDisconnect:
await manager.disconnect(websocket)
# 进行全局的广播所有的在线链接的所有用户消息
await manager.broadcast(f"Client #{client_id} 离开了聊天室")
测试结果:
上面需要注意地方有:再超时处理的时候,不可以直接的await manager.disconnect(websocket),不然会说找不到你的加入的websocket的对象!
ValueError: list.remove(x): x not in list
这个地方比较诡异!暂时不知道为啥会出现这样的情况!
3 新增使用类方式定义websocket路由
前面的这几个示例都是直接的使用我们的app.websocket来定义路由
@app.websocket("/ws/{client_id}")
但是其实我们的可以也可以继承于WebSocketEndpoint来实现一个新的类的方式来实现路由的定义,然后使用
@app.websocket_route
来装饰类实现路由的websocket路由 或者使用添加的的方式:
app.add_websocket_route("/ws/{user_id}",EchoSever,name="ws")
具体的实现过程如下:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from starlette.endpoints import WebSocketEndpoint
from fastapi.responses import HTMLResponse
from enum import Enum
from typing import Any, Dict, List, Optional
import asyncio
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>你的用户ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
class ConnectionManager:
def __init__(self):
# 保存当前所有的链接的websocket对象
# self.active_connections: List[WebSocket] = []
self.active_connections = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
# 添加到当前已链接成功的队列中进行管理
self.active_connections.append(websocket)
async def close(self, websocket: WebSocket):
# 主动的断开的客户端的链接,不是抛出异常的方式断开
await websocket.close()
self.active_connections.remove(websocket)
async def disconnect(self, websocket: WebSocket):
# 从队列里面删除我们的已经断开链接的websocket对象
self.active_connections.remove(websocket)
# await websocket.close()
async def send_personal_message(self, message: str, websocket: WebSocket):
# 发现消息
await websocket.send_text(message)
async def broadcast(self, message: str):
# 循环变量给所有在线激活的链接发送消息-全局广播
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.get("/")
async def get():
#用于渲染一个前端的页面用于测试链接,其实可以直接使用工具来测试即可!
return HTMLResponse(html)
@app.websocket_route("/ws/{user_id}", name="ws")
class EchoSever(WebSocketEndpoint):
encoding: str = "text"
session_name: str = ""
count: int = 0
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 从args中提取对应的输入的参数信息
print(args[0]['endpoint'])
print(args[0]['path_params'])
self.user_id: Optional[str] = args[0]['path_params'].get('user_id')
# async def daojishi(self, websocket):
#
# setattr(websocket,'timeout_count',0)
#
# try:
# while True:
# try:
# await asyncio.wait_for(websocket.receive_text(), 1)
# except asyncio.TimeoutError:
# timeout_count = getattr(websocket,'timeout_count')
# timeout_count = timeout_count + 1
# setattr(websocket, 'timeout_count', timeout_count)
# await manager.send_personal_message(f"您很久没有输入消息了,请问您还有什么需要咨询的吗?::({timeout_count}) ", websocket)
# if timeout_count > 5:
# # 只能在这里再主动的抛出一次短链,彻底的进行删除
# raise WebSocketDisconnect()
# # 不可以直接的进行删除,不然会有问题!---比较奇葩
# # await manager.disconnect(websocket)
# # 对应客户端断开链接--关闭浏览器之后的处理,链接的关闭处理
# except WebSocketDisconnect:
# pass
# print('主动断开了')
# await manager.disconnect(websocket)
# # 进行全局的广播所有的在线链接的所有用户消息
# await manager.broadcast(f"游客: {self.user_id} 离开了聊天室")
# 开始有链接上来的时候对应的处理
async def on_connect(self, websocket):
await manager.connect(websocket)
await manager.broadcast(f"游客: {self.user_id}进入了房间!")
# await self.daojishi(websocket)
# 客户端开始有数据发送过来的时候的处理
async def on_receive(self, websocket, data):
# timeout_count = getattr(websocket, 'timeout_count')
# setattr(websocket, 'timeout_count', 0)
await manager.broadcast(f"游客:{self.user_id} 说》{data}")
# 客户端断开链接的时候
async def on_disconnect(self, websocket, close_code):
# 进行全局的广播所有的在线链接的所有用户消息
try:
await manager.disconnect(websocket)
# 广播给其他所有在线的websocket
await manager.broadcast(f"游客: {self.user_id} 离开了聊天室")
except ValueError:
# 倒计时自动结束的之后,客户端再点击一次断开的时候异常处理!
pass
if __name__ == '__main__':
import uvicorn
uvicorn.run(app='main_ro:app', host="127.0.0.1", port=8000, reload=True, debug=True)
上面的示例也可以完成一个多人的聊天室的对话。
ps 上面的示例中在ConnectionManager新增了一个关闭处理,这个关闭的方法:
async def close(self, websocket: WebSocket): # 主动的断开的客户端的链接,不是抛出异常的方式断开 await websocket.close() self.active_connections.remove(websocket)
会正常通知我们的前端链接已经断开,如果是直接raise WebSocketDisconnect(),前端无法正常的获取已断开的提示。
总结:
以上是官网提供的一些简单案例,如果真正需要应用到生产环境上的,涉及的问题还是比较多,比如
- 用户鉴权等结合用户登入相关机制可以实现
- 还有用户消息处理解包和封包
- 心跳机制检测
- 客户端重连机制
- 离线的检测规则
结尾
简单小笔记!仅供参考!
转载自:https://juejin.cn/post/6974673318829883406