Flutter 入门与实战(五十五):和 Provider 一起玩 WebSocket
前言
在实际的应用中,我们经常会遇到数据流的处理,比如监听实时位置时会产生实时的位置信息流,比如 Socket 通讯时需要监听Socket 数据流。对于数据流,在 Flutter 中称之为 Stream,而 Provider 为 Stream专门设计了一个 StreamProvider 来监听数据流的变化,当数据流产生新的数据时就会通知监听组件刷新。本篇我们就来介绍如何利用 StreamProvider 监听 WebSocket 数据。
本篇设计的知识点如下:
WebSokcet客户端封装插件socket_io_client的使用。StreamProvider监听 Socket数据。
为了能够让程序正常运行,已经在后端代码增加了简单的 socket,使用的是和 socket_io_client 配套的服务端socket.io,npm 地址为socket.io,版本为4.1.3。注意 Flutter 的插件存在版本兼容性,配套该版本的当前处于开发中的版本:^2.0.0-beta.4-nullsafety.0(稳定版的1.0.1插件不支持服务端的4.1.3版本)。后端源码地址为:后端配套源码。
socket_io_client 介绍
socket_io_client 是广受欢迎的 Socket 客户端插件,javascript 也有对应的版本。socket_io_client 对 Dart 的 Socket 进行了封装,从而简化了 socket 的使用。
创建连接的方式如下,其中$host和$port对应主机名和端口,后面的是配置参数。使用websocket 时,需要指定 transports(数组)的元素包括 websocket,autoConnect 为创建后是否自动连接,默认会自动连接。forceNew 为是否每次创建新的 Socket 对象,如果为 false(默认) 则会使用旧的连接(没有关闭的情况下)。这里我们使用的是每次使用新的 Socket 对象 这是因为退出页面后我们要关闭连接。
_socket = SocketIO.io('ws://$host:$port', <String, dynamic>{
'transports': ['websocket'],
'autoConnect': true,
'forceNew': true
});
创建好之后就可以设置 Socket的事件响应回调方法,常用的由如下方法:
onConnect(EventHandler handler):连接建立成功回调;onConnectTimeout(EventHandler handler):连接超时回调;onConnectError(EventHandler handler):连接错误回调;onError(EventHandler handler):发生错误时回调;on(String event, (EventHandler handler):订阅指定事件的消息,服务端发送该事件的消息时可以在该函数接收。onDisconnect(EventHandler handler):断开连接时回调;connect/disconnect:主动连接/断开连接方法;open/close:打开和关闭方法。
重点是创建连接后,当接收到的数据加入到一个流处理对象中。Flutter 提供了StreamController 类处理 Stream 对象。StreamController 只允许有一个订阅者,可以使用 sink 属性的 add 方法添加新的流数据,完成添加后会通知其订阅者有新的数据产生。因此我们可以使用 StreamProvider 来订阅这个流数据。
我们构建一个 Socket 管理类 StreamSocket,代码如下:
class StreamSocket {
final _socketResponse = StreamController<String>();
Stream<String> get getResponse => _socketResponse.stream;
final String host;
final int port;
late final Socket _socket;
StreamSocket({required this.host, required this.port}) {
_socket = SocketIO.io('ws://$host:$port', <String, dynamic>{
'transports': ['websocket'],
'autoConnect': true,
'forceNew': true
});
}
void connectAndListen() {
_socket.onConnect((_) {
debugPrint('connected');
});
_socket.onConnectTimeout((data) => debugPrint('timeout'));
_socket.onConnectError((error) => debugPrint(error.toString()));
_socket.onError((error) => debugPrint(error.toString()));
_socket.on('msg', (data) {
_socketResponse.sink.add(data);
});
_socket.onDisconnect((_) => debugPrint('disconnect'));
}
void sendTextMessage(String message) {
_socket.emit('msg', message);
}
void close() {
_socketResponse.close();
_socket.disconnect().close();
}
}
StreamProvider 应用
StreamProvider 的应用和 FutureProvider,ChangeNotifierProvider 类似,也分为create 和 value两种形式:
// create 形式
StreamProvider({
Key? key,
required Create<Stream<T>?> create,
required T initialData,
ErrorBuilder<T>? catchError,
UpdateShouldNotify<T>? updateShouldNotify,
bool? lazy,
TransitionBuilder? builder,
Widget? child,
})
// value 形式
StreamProvider.value({
Key? key,
required Stream<T>? value,
required T initialData,
ErrorBuilder<T>? catchError,
UpdateShouldNotify<T>? updateShouldNotify,
bool? lazy,
TransitionBuilder? builder,
Widget? child,
})
这里我们首先使用一个 StatefulWidget 来管理 StreamSocket 的初始化以及 Socket 的关闭,并将实际业务组件用两个 Provider 包裹,一个是 StreamProvider,一个是 MessageModel。StreamProvider用于自动监听 Socket 的数据流,并显示在界面上;MessageModel 目前仅用于发送消息。
class _SocketClientWrapperState extends State<SocketClientWrapper> {
final StreamSocket streamSocket = StreamSocket(host: '127.0.0.1', port: 3001);
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Stream Provicer'),
),
body: Stack(
alignment: Alignment.bottomCenter,
children: [
StreamProvider<String>(
create: (context) => streamSocket.getResponse,
initialData: '',
child: StreamDemo(),
),
ChangeNotifierProvider<MessageModel>(
child: MessageReplyBar(messageSendHandler: (message) {
streamSocket.sendTextMessage(message);
}),
create: (context) => MessageModel(),
),
],
),
);
}
@override
void initState() {
streamSocket.connectAndListen();
super.initState();
}
@override
void dispose() {
streamSocket.close();
super.dispose();
}
}
这里我们使用了 Stack将消息的输入条MessageReplyBar固定在底部,需要注意的是,如果想要输入条自动随着键盘浮动,则需要将其放入到 Scaffold 的body 里,否则会一直固定在底部而被遮挡住。
StreamDemo和MessageReplyBar两个组件都很简单,业务逻辑为 MessageReplyBar点击发送按钮将消息通过Socket 发出到服务端。然后 StreamDemo 回显 StreamProvider从 Socket 接收到的服务端消息。
运行效果
我们配合服务端的打印日志来看运行效果:可以看到连接的建立、发送消息、接收消息和断开连接的过程都是正常的。

源码已上传至:状态管理相关代码 - null safety 版本。
总结
本篇介绍了Provider 的 StreamProvider流状态管理,同时引入了 socket_io_client插件实现了与服务端的 WebSocket 通信,通过StreamProvider订阅流数据,并通知界面刷新,实现了界面层的无侵入Socket数据展示。这一篇比较简单,更多地是介绍两个工具的使用,下一篇我们用这两个工具来实现一个即时聊天应用。
转载自:https://juejin.cn/post/6997538437897125895