likes
comments
collection
share

Flutter 入门与实战(五十五):和 Provider 一起玩 WebSocket

作者站长头像
站长
· 阅读数 10

前言

在实际的应用中,我们经常会遇到数据流的处理,比如监听实时位置时会产生实时的位置信息流,比如 Socket 通讯时需要监听Socket 数据流。对于数据流,在 Flutter 中称之为 Stream,而 ProviderStream专门设计了一个 StreamProvider 来监听数据流的变化,当数据流产生新的数据时就会通知监听组件刷新。本篇我们就来介绍如何利用 StreamProvider 监听 WebSocket 数据。

本篇设计的知识点如下:

  • WebSokcet 客户端封装插件 socket_io_client的使用。
  • StreamProvider 监听 Socket数据。

为了能够让程序正常运行,已经在后端代码增加了简单的 socket,使用的是和 socket_io_client 配套的服务端socket.ionpm 地址为socket.io,版本为4.1.3。注意 Flutter 的插件存在版本兼容性,配套该版本的当前处于开发中的版本:^2.0.0-beta.4-nullsafety.0(稳定版的1.0.1插件不支持服务端的4.1.3版本)。后端源码地址为:后端配套源码

socket_io_client 介绍

socket_io_client 是广受欢迎的 Socket 客户端插件,javascript 也有对应的版本。socket_io_client 对 Dart 的 Socket 进行了封装,从而简化了 socket 的使用。

创建连接的方式如下,其中$host$port对应主机名和端口,后面的是配置参数。使用websocket 时,需要指定 transports(数组)的元素包括 websocketautoConnect 为创建后是否自动连接,默认会自动连接。forceNew 为是否每次创建新的 Socket 对象,如果为 false(默认) 则会使用旧的连接(没有关闭的情况下)。这里我们使用的是每次使用新的 Socket 对象 这是因为退出页面后我们要关闭连接。

_socket = SocketIO.io('ws://$host:$port', <String, dynamic>{
  'transports': ['websocket'],
  'autoConnect': true,
  'forceNew': true
});

创建好之后就可以设置 Socket的事件响应回调方法,常用的由如下方法:

  • onConnect(EventHandler handler):连接建立成功回调;
  • onConnectTimeout(EventHandler handler):连接超时回调;
  • onConnectError(EventHandler handler):连接错误回调;
  • onError(EventHandler handler):发生错误时回调;
  • on(String event, (EventHandler handler):订阅指定事件的消息,服务端发送该事件的消息时可以在该函数接收。
  • onDisconnect(EventHandler handler):断开连接时回调;
  • connect/disconnect:主动连接/断开连接方法;
  • open/close:打开和关闭方法。

重点是创建连接后,当接收到的数据加入到一个流处理对象中。Flutter 提供了StreamController 类处理 Stream 对象。StreamController 只允许有一个订阅者,可以使用 sink 属性的 add 方法添加新的流数据,完成添加后会通知其订阅者有新的数据产生。因此我们可以使用 StreamProvider 来订阅这个流数据。

我们构建一个 Socket 管理类 StreamSocket,代码如下:

class StreamSocket {
  final _socketResponse = StreamController<String>();

  Stream<String> get getResponse => _socketResponse.stream;

  final String host;
  final int port;
  late final Socket _socket;

  StreamSocket({required this.host, required this.port}) {
    _socket = SocketIO.io('ws://$host:$port', <String, dynamic>{
      'transports': ['websocket'],
      'autoConnect': true,
      'forceNew': true
    });
  }

  void connectAndListen() {
    _socket.onConnect((_) {
      debugPrint('connected');
    });

    _socket.onConnectTimeout((data) => debugPrint('timeout'));
    _socket.onConnectError((error) => debugPrint(error.toString()));
    _socket.onError((error) => debugPrint(error.toString()));
    _socket.on('msg', (data) {
      _socketResponse.sink.add(data);
    });
    _socket.onDisconnect((_) => debugPrint('disconnect'));
  }

  void sendTextMessage(String message) {
    _socket.emit('msg', message);
  }

  void close() {
    _socketResponse.close();
    _socket.disconnect().close();
  }
}

StreamProvider 应用

StreamProvider 的应用和 FutureProvider,ChangeNotifierProvider 类似,也分为create 和 value两种形式:

// create 形式
StreamProvider({
  Key? key,
  required Create<Stream<T>?> create,
  required T initialData,
  ErrorBuilder<T>? catchError,
  UpdateShouldNotify<T>? updateShouldNotify,
  bool? lazy,
  TransitionBuilder? builder,
  Widget? child,
}) 
  
// value 形式
StreamProvider.value({
  Key? key,
  required Stream<T>? value,
  required T initialData,
  ErrorBuilder<T>? catchError,
  UpdateShouldNotify<T>? updateShouldNotify,
  bool? lazy,
  TransitionBuilder? builder,
  Widget? child,
}) 

这里我们首先使用一个 StatefulWidget 来管理 StreamSocket 的初始化以及 Socket 的关闭,并将实际业务组件用两个 Provider 包裹,一个是 StreamProvider,一个是 MessageModelStreamProvider用于自动监听 Socket 的数据流,并显示在界面上;MessageModel 目前仅用于发送消息。

class _SocketClientWrapperState extends State<SocketClientWrapper> {
  final StreamSocket streamSocket = StreamSocket(host: '127.0.0.1', port: 3001);
  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text('Stream Provicer'),
      ),
      body: Stack(
        alignment: Alignment.bottomCenter,
        children: [
          StreamProvider<String>(
            create: (context) => streamSocket.getResponse,
            initialData: '',
            child: StreamDemo(),
          ),
          ChangeNotifierProvider<MessageModel>(
            child: MessageReplyBar(messageSendHandler: (message) {
              streamSocket.sendTextMessage(message);
            }),
            create: (context) => MessageModel(),
          ),
        ],
      ),
    );
  }

  @override
  void initState() {
    streamSocket.connectAndListen();
    super.initState();
  }

  @override
  void dispose() {
    streamSocket.close();
    super.dispose();
  }
}

这里我们使用了 Stack将消息的输入条MessageReplyBar固定在底部,需要注意的是,如果想要输入条自动随着键盘浮动,则需要将其放入到 Scaffoldbody 里,否则会一直固定在底部而被遮挡住。

StreamDemoMessageReplyBar两个组件都很简单,业务逻辑为 MessageReplyBar点击发送按钮将消息通过Socket 发出到服务端。然后 StreamDemo 回显 StreamProviderSocket 接收到的服务端消息。

运行效果

我们配合服务端的打印日志来看运行效果:可以看到连接的建立、发送消息、接收消息和断开连接的过程都是正常的。

Flutter 入门与实战(五十五):和 Provider 一起玩 WebSocket

源码已上传至:状态管理相关代码 - null safety 版本

总结

本篇介绍了ProviderStreamProvider流状态管理,同时引入了 socket_io_client插件实现了与服务端的 WebSocket 通信,通过StreamProvider订阅流数据,并通知界面刷新,实现了界面层的无侵入Socket数据展示。这一篇比较简单,更多地是介绍两个工具的使用,下一篇我们用这两个工具来实现一个即时聊天应用。