Flutter 入门与实战(五十五):和 Provider 一起玩 WebSocket
前言
在实际的应用中,我们经常会遇到数据流的处理,比如监听实时位置时会产生实时的位置信息流,比如 Socket
通讯时需要监听Socket
数据流。对于数据流,在 Flutter
中称之为 Stream
,而 Provider
为 Stream
专门设计了一个 StreamProvider
来监听数据流的变化,当数据流产生新的数据时就会通知监听组件刷新。本篇我们就来介绍如何利用 StreamProvider
监听 WebSocket
数据。
本篇设计的知识点如下:
WebSokcet
客户端封装插件socket_io_client
的使用。StreamProvider
监听 Socket数据。
为了能够让程序正常运行,已经在后端代码增加了简单的 socket
,使用的是和 socket_io_client
配套的服务端socket.io
,npm
地址为socket.io,版本为4.1.3。注意 Flutter 的插件存在版本兼容性,配套该版本的当前处于开发中的版本:^2.0.0-beta.4-nullsafety.0
(稳定版的1.0.1插件不支持服务端的4.1.3版本)。后端源码地址为:后端配套源码。
socket_io_client 介绍
socket_io_client 是广受欢迎的 Socket 客户端插件,javascript 也有对应的版本。socket_io_client 对 Dart 的 Socket 进行了封装,从而简化了 socket 的使用。
创建连接的方式如下,其中$host
和$port
对应主机名和端口,后面的是配置参数。使用websocket
时,需要指定 transports
(数组)的元素包括 websocket
,autoConnect
为创建后是否自动连接,默认会自动连接。forceNew
为是否每次创建新的 Socket
对象,如果为 false
(默认) 则会使用旧的连接(没有关闭的情况下)。这里我们使用的是每次使用新的 Socket
对象 这是因为退出页面后我们要关闭连接。
_socket = SocketIO.io('ws://$host:$port', <String, dynamic>{
'transports': ['websocket'],
'autoConnect': true,
'forceNew': true
});
创建好之后就可以设置 Socket
的事件响应回调方法,常用的由如下方法:
onConnect(EventHandler handler)
:连接建立成功回调;onConnectTimeout(EventHandler handler)
:连接超时回调;onConnectError(EventHandler handler)
:连接错误回调;onError(EventHandler handler)
:发生错误时回调;on(String event, (EventHandler handler)
:订阅指定事件的消息,服务端发送该事件的消息时可以在该函数接收。onDisconnect(EventHandler handler)
:断开连接时回调;connect/disconnect
:主动连接/断开连接方法;open/close
:打开和关闭方法。
重点是创建连接后,当接收到的数据加入到一个流处理对象中。Flutter
提供了StreamController
类处理 Stream
对象。StreamController
只允许有一个订阅者,可以使用 sink
属性的 add
方法添加新的流数据,完成添加后会通知其订阅者有新的数据产生。因此我们可以使用 StreamProvider
来订阅这个流数据。
我们构建一个 Socket 管理类 StreamSocket,代码如下:
class StreamSocket {
final _socketResponse = StreamController<String>();
Stream<String> get getResponse => _socketResponse.stream;
final String host;
final int port;
late final Socket _socket;
StreamSocket({required this.host, required this.port}) {
_socket = SocketIO.io('ws://$host:$port', <String, dynamic>{
'transports': ['websocket'],
'autoConnect': true,
'forceNew': true
});
}
void connectAndListen() {
_socket.onConnect((_) {
debugPrint('connected');
});
_socket.onConnectTimeout((data) => debugPrint('timeout'));
_socket.onConnectError((error) => debugPrint(error.toString()));
_socket.onError((error) => debugPrint(error.toString()));
_socket.on('msg', (data) {
_socketResponse.sink.add(data);
});
_socket.onDisconnect((_) => debugPrint('disconnect'));
}
void sendTextMessage(String message) {
_socket.emit('msg', message);
}
void close() {
_socketResponse.close();
_socket.disconnect().close();
}
}
StreamProvider 应用
StreamProvider 的应用和 FutureProvider,ChangeNotifierProvider 类似,也分为create 和 value两种形式:
// create 形式
StreamProvider({
Key? key,
required Create<Stream<T>?> create,
required T initialData,
ErrorBuilder<T>? catchError,
UpdateShouldNotify<T>? updateShouldNotify,
bool? lazy,
TransitionBuilder? builder,
Widget? child,
})
// value 形式
StreamProvider.value({
Key? key,
required Stream<T>? value,
required T initialData,
ErrorBuilder<T>? catchError,
UpdateShouldNotify<T>? updateShouldNotify,
bool? lazy,
TransitionBuilder? builder,
Widget? child,
})
这里我们首先使用一个 StatefulWidget
来管理 StreamSocket
的初始化以及 Socket
的关闭,并将实际业务组件用两个 Provider
包裹,一个是 StreamProvider
,一个是 MessageModel
。StreamProvider
用于自动监听 Socket
的数据流,并显示在界面上;MessageModel
目前仅用于发送消息。
class _SocketClientWrapperState extends State<SocketClientWrapper> {
final StreamSocket streamSocket = StreamSocket(host: '127.0.0.1', port: 3001);
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Stream Provicer'),
),
body: Stack(
alignment: Alignment.bottomCenter,
children: [
StreamProvider<String>(
create: (context) => streamSocket.getResponse,
initialData: '',
child: StreamDemo(),
),
ChangeNotifierProvider<MessageModel>(
child: MessageReplyBar(messageSendHandler: (message) {
streamSocket.sendTextMessage(message);
}),
create: (context) => MessageModel(),
),
],
),
);
}
@override
void initState() {
streamSocket.connectAndListen();
super.initState();
}
@override
void dispose() {
streamSocket.close();
super.dispose();
}
}
这里我们使用了 Stack将消息的输入条MessageReplyBar
固定在底部,需要注意的是,如果想要输入条自动随着键盘浮动,则需要将其放入到 Scaffold
的body
里,否则会一直固定在底部而被遮挡住。
StreamDemo
和MessageReplyBar
两个组件都很简单,业务逻辑为 MessageReplyBar
点击发送按钮将消息通过Socket
发出到服务端。然后 StreamDemo
回显 StreamProvider
从 Socket
接收到的服务端消息。
运行效果
我们配合服务端的打印日志来看运行效果:可以看到连接的建立、发送消息、接收消息和断开连接的过程都是正常的。
源码已上传至:状态管理相关代码 - null safety 版本。
总结
本篇介绍了Provider
的 StreamProvider
流状态管理,同时引入了 socket_io_client
插件实现了与服务端的 WebSocket
通信,通过StreamProvider
订阅流数据,并通知界面刷新,实现了界面层的无侵入Socket
数据展示。这一篇比较简单,更多地是介绍两个工具的使用,下一篇我们用这两个工具来实现一个即时聊天应用。
转载自:https://juejin.cn/post/6997538437897125895