likes
comments
collection
share

JavaScript 世界中的量子纠缠(从 Comlink 谈谈 IPC 抽象封装)

作者站长头像
站长
· 阅读数 71

楔子

这两年的工作经常需要处理 IPC(进程间通信)问题,例如:

  • 内嵌 iframe 页面通信
  • Web Workers 服务调用
  • Figma 插件 UI 与沙箱进程通信
  • Chrome 扩展页面间通信
  • Electron 主进程与渲染进程
  • ……

有问题自然会有解决手段,初见 Comlink 着实惊艳,它同量子纠缠般的鬼魅身法以一种很巧妙的方式抹平了业务双发的通信细节,通信双方只需要关注各自提供支持的业务实现。这里主要分享一下 comlink 的核心实现,以及如何将这一套机制一般化扩展到其他应用环境中

这里也分享下我最近造的轮子:comlink-adapters 一个涵盖各个应用环境的 comlink 适配器,有需求的同学不妨试一下,也欢迎一起来完善它。

古早味

先看看 Web 中比较常见的进程间通信处理方式,即在消息的接收方定义一大堆不同的策略方法来处理调用方请求,实现也比较简单,以 iframe 的父子窗口通信为例:

// iframe.ts
const handlers = {
    add: (a: number, b: number) => a + b,
    subtract: (a: number, b: number) => a - b,
};

window.addEventListener('message', function (event) {
    const { method, args, id } = event.data;
    const result = handlers[method](...args);
    window.top!.postMessage({
        result,
        id,
    });
});
// parent.ts
const iframeWindow = document.querySelector('iframe')!.contentWindow!;

const invoke = (method: 'add' | 'subtract', args: any[]) => {
    return new Promise((resolve) => {
        const uuid = Math.random().toString(36).substring(2);

        const handler = (event: MessageEvent) => {
            const { result, id } = event.data;
            if (id && id === uuid) {
                window.removeEventListener('message', handler);
                resolve(result);
            }
        };
        window.addEventListener('message', handler);

        iframeWindow.postMessage(
            {
                method,
                args,
                id: uuid,
            },
            '*'
        );
    });
};

const res0 = await invoke('add', [1, 2]); // output: 3
const res1 = await invoke('subtract', [2, 1]); // output: 1

这种基于策略模式的处理方法可以解决简单的消息调用,但在应对复杂业务场景时并不是很优雅:

  • 消息接收双方都需要维护一份策略列表,当业务场景复杂时所维护策略列表也随之复杂。
  • 消息调用方与接收方需要都需要维护一份类型文件,无法直接共享类型文件。
  • 消息参数只能是可序列化的数据(无法使用诸如函数一类的参数)。

相较之下 comlink 的处理方式就优雅多了。

Comlink

同样的例子,使用 comlink 处理:

handlers.ts

export const handlers = {
    add: (a: number, b: number) => a + b,
    subtract: (a: number, b: number) => a - b,
};

export type HandlersType = typeof handlers;

iframe.ts

import { expose, windowEndpoint } from 'comlink';
import { handlers } from './handlers';
// 接收方暴露支持的业务对象
expose(handlers, windowEndpoint(window.top!));

parent.ts

import { wrap, windowEndpoint } from 'comlink';
import type { HandlersType } from './handlers';

(async function () {
    const iframeWindow = document.querySelector('iframe')!.contentWindow!;
    // 调用方包装远程的业务对象
    const remoteHandlers = wrap<HandlersType>(windowEndpoint(iframeWindow));
    const addRes = await remoteHandlers.add(1, 2); // output: 3
    const subtractRes = await remoteHandlers.subtract(2, 1); // output: 1
})();

使用时与直接使用 handlers 几乎无异,只是原本同步方法都变成了异步调用,而且可以直接享受到 handlers 类型支持。

JavaScript 世界中的量子纠缠(从 Comlink 谈谈 IPC 抽象封装)

而 Comlink 还支持函数类型的参数,看个官方的示例:

main.js

import * as Comlink from 'https://unpkg.com/comlink/dist/esm/comlink.mjs';

function callback(value) {
    alert(`Result: ${value}`);
}

async function init() {
    const remoteFunction = Comlink.wrap(new Worker('worker.js'));
    await remoteFunction(Comlink.proxy(callback));
}

init();

worker.js

importScripts('https://unpkg.com/comlink/dist/umd/comlink.js');

async function remoteFunction(cb) {
    await cb('A string from a worker');
}

Comlink.expose(remoteFunction);

worker.js 中 remoteFunction 的参数 cb 是一个函数,而我们可以在 main.js 中通过 Comlink 提供的 proxy 方法将普通 callback 函数其变成一个可被远程函数使用的参数对象。这其中使用了很巧妙的处理技巧,接下来我们试着实现一个简单版本的 comlink 来剖析其实现的核心机制。

简单实现

Comlink 的核心实现基于 ProxypostMessage,而诸如:支持函数类型参数、使用 new 构造远程对象、Comlink.createEndpoint 等功能则是依赖 MessageChannel  与  MessagePortTransferable 特性实现的。

先来看一下 Comlink 所支持代理对象操作:

counter.ts

export class Counter {
    constructor(public count = 0) {}

    add() {
        this.count += 1;
    }

    subtract() {
        this.count -= 1;
    }

    async use(handler: (count: number) => number) {
        this.count = await handler(this.count);
    }
}

export type CounterType = typeof Counter;

worker.ts

import { expose } from 'comlink';
import { Counter } from './counter';
expose(Counter);

main.ts

import { wrap, proxy } from 'comlink';
import type { CounterType } from './counter';

(async function () {
    const RemoteCounter = wrap<CounterType>(new Worker('worker.ts'));
    // CONSTRUCT
    const remoteCounter = await new RemoteCounter();
    // GET
    const value = await remoteCounter.count;
    // SET
    await (remoteCounter.count = 1);
    // APPLY
    await remoteCounter.add();
    await remoteCounter.subtract();
    await remoteCounter.use(proxy((count) => count * 2));
})();
  • CONSTRUCT 构造远程对象
  • GET 远程对象取值
  • SET 远程对象赋值
  • APPLY 远程对象方法调用

目前 Comlink 支持了上述 4 种针对远程对象的操作,我们的实现也将围绕这四种操作展开。

Endpoint

既然是消息通信,首先需要确认通信双方,在 Comlink 中通信的双方被称为 Endpoint,一个 Endpoint 对象需要满足下面接口定义:

export interface EventSource {
    addEventListener(
        type: string,
        listener: EventListenerOrEventListenerObject,
        options?: {}
    ): void;

    removeEventListener(
        type: string,
        listener: EventListenerOrEventListenerObject,
        options?: {}
    ): void;
}

export interface Endpoint extends EventSource {
    postMessage(message: any, transfer?: Transferable[]): void;

    start?: () => void;
}

简单来说一个 Endpoint 对象需要支持:

  • postMessage 接口用于向通信目标发送消息
  • addEventListener('message', (evt) => {}) 用于监听通信目标发出消息
  • removeEventListener('message', (evt) => {}) 用于移除通信目标消息监听

Comlink 的通信对象需要是满足 Endpoint 接口的对象,目前可直接支持的有:

而针对一些不支持的应用环境中我们如果模拟出这一套接口,则也能完成 Comlink 的适配。

后面会给出一个 Socket.IO 适配器的实现,使之脱离 IPC 环境,实现 Comlink 的客户端与服务端 RPC 调用。

Proxy 拦截与操作序列化

Comlink 之所以能像本地对象一样的操作远程对象,其核心是消息发送方使用 Proxy 代理了针对远程对象的操作,并将此操作序列化发送到接收方,接收方根据数据反序列对应操作,并将结果返回。

这里以 GET 操作为例:

function generateUUID(): string {
    return new Array(4)
        .fill(0)
        .map(() =>
            Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(16)
        )
        .join('-');
}

function requestResponseMessage(
    ep: Endpoint,
    msg: any,
    transfers?: Transferable[]
): Promise<any> {
    return new Promise((resolve) => {
        const id = generateUUID();
        ep.addEventListener('message', function l(ev: MessageEvent) {
            if (!ev.data || !ev.data.id || ev.data.id !== id) {
                return;
            }
            ep.removeEventListener('message', l as any);
            resolve(ev.data);
        } as any);

        if (ep.start) {
            ep.start();
        }

        ep.postMessage({ id, ...msg }, transfers);
    });
}

function fromWireValue(data: any) {
    return data.value;
}

function createProxy(
    ep: Endpoint,
    path: (string | number | symbol)[] = [],
    target: object = function () {}
) {
    const proxy: Object = new Proxy(target, {
        get(_target, prop) {
            // 只有 prop 为 then 才做真正的取值操作
            if (prop === 'then') {
                if (path.length === 0) {
                    return { then: () => proxy };
                }
                const r = requestResponseMessage(ep, {
                    type: 'GET',
                    path: path.map((p) => p.toString()),
                }).then(fromWireValue);
                return r.then.bind(r);
            }

            return createProxy(ep, [...path, prop]);
        },
    });

    return proxy;
}

export function wrap(ep: Endpoint) {
    return createProxy(ep, []);
}

export function expose(obj: any, ep: Endpoint = globalThis as any) {
    ep.addEventListener('message', function callback(ev: MessageEvent) {
        if (!ev || !ev.data) {
            return;
        }

        const { id, type, path } = {
            path: [] as string[],
            ...(ev.data as any),
        };

        let returnValue;
        const rawValue = path.reduce((obj, prop) => obj[prop], obj);

        switch (type) {
            case 'GET':
                {
                    returnValue = rawValue;
                }
                break;
            // 其他操作
            default:
                return;
        }

        Promise.resolve(returnValue).then((value) => {
            ep.postMessage({ type: 'RAW', value, id }, []);
        });
    } as any);

    if (ep.start) {
        ep.start();
    }
}

在发送方主要关注 createProxy 的实现,这里 Proxy 创建一个代理对象,而针对目标的取值是代理 GET 实现的。实现也很简单,获取目标的 path(取值路径)通过 endpoint 对象发送到接收方,接收方执行 GET 取值并再通过 postMessage 返回。

requestResponseMessage 方法在发送消息时会生成一个唯一 id 标识,当接受方回复消息也会带上这个 id,用于区分此次消息调用。

GET 操作有一点需要注意,只有 prop 为 then 时才发送消息执行取值操作,原因是我们需要明确的知道是对远程对象取值还是继续创建代理,有点绕,看个例子:

假设代理的远程对象为:

const obj = {
    props: {
        name: 'xxx',
    },
};

expose(obj);

其使用的形式有可能是:

const remoteObjProxy = wrap<typeof obj>(endpoint);
// 直接取值
const props = await remoteObj.props;

// 这里返回的是 props 的 proxy 对象
const remoteObjPropsProxy = remoteObj.props;
// 这里才是取值
const props = await remoteObjPropsProxy;
const name = await remoteObjProps.name;

很好理解,无论我们如何去抽象 IPC 的通信方式,其底层接口使用的是异步消息,那么上层代理操作的结果也只能是异步的。体现在 Comlink 上就是针对代理对象的操作都需要使用 await 获取。

await remoteObj.props 取值操作实际背后执行的是:

await Promise.resolve(remoteObj.props);

Promise.resolve(target) 会尝试调用 targetthen 方法,这就是上述代理 get 操作检查 key 为 then 的原因。

awaitPromise.resolve 的规范可参考:

这里扩展一个问题,假设 IPC 通信线程间支持同步的消息调用,那么针对远程代理对象的操作是不是可以同本地调用一样呢?

是可以的,经历过 Electron 开发的同学可能使用过 @electron/remote 库,其抹平了渲染进程到主进程的消息调用,使之可以直接在渲染进程直接“使用”主进程的暴露的对象,例如主进程中的 dialog

其背后原理和 Comlink 相似,不同于 Comlink 基于 Proxy 机制拦截对象操作,@electron/remote 是将远程对象先序列化,再根据获取到的序列化结果创建一个“影子对象”,针对影子对象的操作会通过消息通信在接收方执行。在使用上 @electron/remote 会更无感,不需要将远程对象操作都变成 Promise 调用,原因是其在底层实现中使用了 Electron 的同步消息通信,即:

由于是同步通信,在使用 @electron/remote 特别注意不要在主进程中造成阻塞,因为这会导致应用程序界面冻结。在大多数情况下,异步通信是更好的选择,因为它不会阻塞任何进程。

MessagePort 传递

GET 的操作实现展示 Comlink 最核心的思想:代理对象操作、操作序列化、反序列化操作。接下来我们将揭示 Comlink 中一些高级特性的面纱,窥一窥其中究竟。

Comlink 很神奇的一个特性就是使用函数类型的参数构造远程对象,其背后实现都是一样的:将无法序列化的对象转换成一个代理对象,对代理对象的操作变成对远程对象的消息调用。可能有点难理解,慢慢道来。

这些特性依赖于下述的接口实现:

首先是 Transferable Objects 可转移对象,在浏览器的 postMessage 接口支持不同类型的可转移对象参数,一般这个能力是用于优化两个线程之间的数据交互。例如在 worker 中可以将 TypeArray 的结果通过 transfer 参数传递给主线程,这样可以规避跨线程传递数据的克隆操作,针对线程间传递大量数据是一个很好的优化手段,原理上类似于两个线程共享同一段内存数据。

const uInt8Array = new Uint8Array(1024 * 1024 * 8).map((v, i) => i);
// Transfer the underlying buffer to a worker
worker.postMessage(uInt8Array, [uInt8Array.buffer]);

可转移对象中有个比较特殊的 MessagePort 对象,一般是通过 MessageChannel 创建一条信道并生成一对可相互通信的端口对象。

const { port1, port2 } = new MessageChannel();

port1.addEventListener('message', (evt) => {
    console.log('from port2:', evt.data);
});

port2.addEventListener('message', (evt) => {
    console.log('from port1:', evt.data);
});

port1.start();
port2.start();
port1.postMessage('i am port1 message');
port2.postMessage('i am port2 message');

JavaScript 世界中的量子纠缠(从 Comlink 谈谈 IPC 抽象封装)

由于可转移对象支持 MessagePort 对象,利用这个机制我们可以在线程中传递端口对象,在两个线程间开辟出新的信道:

worker.js

const { port1, port2 } = new MessageChannel();
self.postMessage(null, [port2]);
port1.start();
port1.postMessage('i am port1');

main.js

const worker = new Worker('./worker.js');
worker.addEventListener('message', (evt) => {
    const workerTransferPort = evt.ports[0];
    workerTransferPort.addEventListener('message', (e) => {
        console.log('from worker port1 message', e.data);
    });
});

实际上,根据业务需要 MessagePort 是跨多个线程传递下去的,例如:业务依赖两个线程 worker1worker2,在 worker1 中通过 MessageChannel 创建出 port1 与 port2,worker1 将 port2 发送到主进程,主进程再将接收到的端口对象转发给 worker2,worker2 中接收到的端口对象还是可以与 worker1 中 port1 相互通信。MessagePort 传递机制使得无论跨越多少个线程,由 MessageChannel 所创建两个端口对象都可以保持相互通信,虽然有些标题党的意思,不过确实和量子纠缠挺像(狗头)。

量子纠缠的一个简单描述:两个或多个粒子在某种意义上成为了一个系统,即使它们被分隔得很远,它们的状态仍然以某种方式相互关联。当我们测量其中一个粒子的状态时,另一个粒子的状态会立即确定,而不考虑它们之间的距离。

Comlink 高级特性的实现依赖于 MessagePort 传递,将无法通过进程传递的对象(如函数)通过 MessagePort 包装新的代理对象。这里以 APPLY 实现为例:

// 方法调用的入参,将无法序列化参数包装成 MessagePort
function processArguments(argumentList: any[]): [WireValue[], Transferable[]] {
    const processed = argumentList.map(toWireValue);
    return [processed.map((v) => v[0]), processed.map((v) => v[1]).flat()];
}

function createProxy(
    ep: Endpoint,
    path: (string | number | symbol)[] = [],
    target: object = function () {}
) {
    const proxy: Object = new Proxy(target, {
        // ...
        apply(_target, _thisArg, rawArgumentList) {
            const [argumentList, transferables] = processArguments(rawArgumentList);
            return requestResponseMessage(
                ep,
                {
                    type: 'APPLY',
                    path: path.map((p) => p.toString()),
                    argumentList,
                },
                transferables
            ).then(fromWireValue);
        },
        // ...
    });

    return proxy;
}

export function wrap(ep: Endpoint) {
    return createProxy(ep, []);
}

export function expose(obj: any, ep: Endpoint = globalThis as any) {
    ep.addEventListener('message', function callback(ev: MessageEvent) {
        if (!ev || !ev.data) {
            return;
        }

        const { id, type, path } = {
            path: [] as string[],
            ...(ev.data as Message),
        };

        // 获取到参数列表还原原始参数,将 MessagePort 包装成代理对象
        const argumentList = (ev.data.argumentList || []).map(fromWireValue);
        let returnValue;

        const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], obj);
        const rawValue = path.reduce((obj, prop) => obj[prop], obj);
        switch (type) {
            // ...
            case 'APPLY':
                {
                    returnValue = rawValue.apply(parent, argumentList);
                }
                break;
            // ...
            default:
                return;
        }

        Promise.resolve(returnValue).then((returnValue) => {
            // 返回值之前也需要做一次数据的 toWireValue 
            const [wireValue, transferables] = toWireValue(returnValue);
            ep.postMessage({ ...wireValue, id }, transferables);
        });
    } as any);

    if (ep.start) {
        ep.start();
    }
}

代理对象的方法调用实现并不复杂,一样是拦截 appy 操作,序列化操作与发送操作数据,最需要关注的是:

  • 消息发送前的数据处理(toWireValue 操作)
  • 消息接收后的数据处理(fromWireValue 操作)

具体看一下 toWireValuefromWireValue 的实现就可以知晓函数参数传递实现了。

interface RawWireValue {
    id?: string;
    type: 'RAW';
    value: {};
}

interface HandlerWireValue {
    id?: string;
    type: 'HANDLER';
    name: string;
    value: unknown;
}

type WireValue = RawWireValue | HandlerWireValue;

interface TransferHandler<T, S> {
    canHandle(value: unknown): value is T;

    serialize(value: T): [S, Transferable[]];

    deserialize(value: S): T;
}

interface ProxyMarked {
    [proxyMarker]: true;
}

// 用于标记无法被序列化需要转换成 MessagePort 的对象
const proxyMarker = Symbol('Comlink.proxy');

const isObject = (val: unknown): val is object =>
    (typeof val === 'object' && val !== null) || typeof val === 'function';

// 用于处理 proxyMarker 标记对象的序列化与反序列化
const proxyTransferHandler: TransferHandler<object, MessagePort> = {
    // 检查当前值是否能被处理
    canHandle: (val): val is object => isObject(val) && Reflect.get(val, proxyMarker),
    // 将当前无法被序列化的值,转换成 MessagePort 对象
    serialize(obj) {
        const { port1, port2 } = new MessageChannel();
        expose(obj, port1);
        return [port2, [port2]];
    },
    // 将接收的 MessagePort 包装成代理对象
    deserialize(port) {
        port.start();
        return wrap(port);
    },
};

// 不同对象处理策略
export const transferHandlers = new Map<string, TransferHandler<unknown, unknown>>([
    ['proxy', proxyTransferHandler],
]);

// 用于标记不可序列化的对象
export function proxy<T extends {}>(obj: T): T & ProxyMarked {
    return Object.assign(obj, { [proxyMarker]: true }) as any;
}

// 发送前序列化数据
function toWireValue(value: any): [WireValue, Transferable[]] {
    for (const [name, handler] of transferHandlers) {
        // 判断当前值能否被当前 handler 处理
        if (handler.canHandle(value)) {
            // 将无法序列化的对象转换成 MessagePort
            const [serializedValue, transferables] = handler.serialize(value);
            return [
                {
                    type: 'HANDLER',
                    name,
                    value: serializedValue,
                },
                transferables,
            ];
        }
    }

    return [
        {
            type: 'RAW',
            value,
        },
        [],
    ];
}

// 接收到数据后做反序列化
function fromWireValue(value: WireValue): any {
    switch (value.type) {
        case 'HANDLER':
            return transferHandlers.get(value.name)!.deserialize(value.value);
        case 'RAW':
            return value.value;
    }
}

无论 toWireValue 还是 fromWireValue 都是围绕着 transferHandlers 实现的,简单来说:

  • toWireValue 用于数据发送前将数据序列化,将不可序列化的对象(如函数)转换成可转移对象(MessagePort),并进行 expose 操作。
  • fromWireValue 用于将接收到的数据做反序列化,并将可转移对象(MessagePort)包装(wrap 操作)成代理对象。

在 Comlink 中 transferHandlers 用于配置各种对象的序列化与反序列化策略,当然也允许用户自定义策略,这里只展示 proxy 对象的 TransferHandler,函数一类的不可序列化对象都是通过此 Handler 处理的。

const proxyTransferHandler: TransferHandler<object, MessagePort> = {
    // 检查当前值是否能被处理
    canHandle: (val): val is object => isObject(val) && Reflect.get(val, proxyMarker),
    // 将当前无法被序列化的值,转换成 MessagePort 对象
    serialize(obj) {
        const { port1, port2 } = new MessageChannel();
        expose(obj, port1);
        return [port2, [port2]];
    },
    // 将接收的 MessagePort 包装成代理对象
    deserialize(port) {
        port.start();
        return wrap(port);
    },
};

通过 proxyTransferHandler 就可以解答为什么 Comlnik 可以使用函数类型的参数了,在 toWireValue 处理函数时实际上使用的是 proxyTransferHandler.serialize, 其并不是真正将函数对象发送到远程,而是在本地通过 MessageChannel 创建了一对 MessagePort,并用 expose 对象与其中一个端口,而另一个端口则作为可转移对象被发送到远程。消息接收方则使用 fromWireValue 处理接收到的数据,针对接收到的可转移对象(MessagePort)则使用 proxyTransferHandler.deserialize 将其包装(wrap 操作)成一个新的代理对象,在接收方所有针对函数参数的调用都会通过 wrap 生成代理对象反映到发送方真正的函数对象上,它们之间的消息通信则是走的 MessageChannel 所创建的信道。

同样的我们可以实现 SET 与 CONSTRUCT 操作拦截:

function createProxy<T>(
    ep: Endpoint,
    path: (string | number | symbol)[] = [],
    target: object = function () {}
): Remote<T> {
    const proxy = new Proxy(target, {
        // ...
        set(_target, prop, rawValue) {
            const [value, transferables] = toWireValue(rawValue);
            return requestResponseMessage(
                ep,
                {
                    type: 'SET',
                    path: [...path, prop].map((p) => p.toString()),
                    value,
                },
                transferables
            ).then(fromWireValue) as any;
        },
        construct(_target, rawArgumentList) {
            const [argumentList, transferables] = processArguments(rawArgumentList);
            return requestResponseMessage(
                ep,
                {
                    type: 'CONSTRUCT',
                    path: path.map((p) => p.toString()),
                    argumentList,
                },
                transferables
            ).then(fromWireValue);
        },
        // ...
    });

    return proxy as any;
}
export function expose(obj: any, ep: Endpoint = globalThis as any) {
    ep.addEventListener('message', function callback(ev: MessageEvent) {
        if (!ev || !ev.data) {
            return;
        }

        const { id, type, path } = {
            path: [] as string[],
            ...(ev.data as Message),
        };
        const argumentList = (ev.data.argumentList || []).map(fromWireValue);
        let returnValue;
        const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], obj);
        const rawValue = path.reduce((obj, prop) => obj[prop], obj);

        switch (type) {
            // ...
            case 'SET':
                {
                    parent[path.slice(-1)[0]] = fromWireValue(ev.data.value);
                    returnValue = true;
                }
                break;

            case 'CONSTRUCT':
                {
                    const value = new rawValue(...argumentList);
                    returnValue = proxy(value);
                }
                break;
            // ...
            default:
                return;
        }

        Promise.resolve(returnValue).then((returnValue) => {
            const [wireValue, transferables] = toWireValue(returnValue);
            ep.postMessage({ ...wireValue, id }, transferables);
        });
    } as any);

    if (ep.start) {
        ep.start();
    }
}

抽象与泛化

理解了 Comlink 的实现原理后可以思考一个问题,Comlink 的这套机制是否能进行抽象与泛化将其应用到其他开发环境中?答案是肯定的,但在我们考虑实践前不妨来探讨下 Comlink 的抽象。

首先是基本的功能实现,其依赖于 ProxypostMessage 实现,但只要目标环境支持或能模拟出 Endpoint 接口也是可以使用 Comlink 的。那如果目标环境不支持 Proxy 或者压根就不是 JavaScript 语言环境又该如何处理?

首先是 Proxy,在不支持 Proxy 环境我们也是有其他方法将对象操作序列化的,@electron/remote 已经给出过解决方案,其是通过创建“影子对象”解决的,所以 Proxy 不是必须的特性。

那非 JavaScript 语言环境我们又该如何处理?上面我们有说明过 Endpoint 接口要求,而实际上 Endpoint 只是描述两个可以相互通信的端点,只要目标环境支持双向通信那我们就能实现 Endpoint 接口。

这里我们就可以做个抽象:只要目标环境支持将操作序列化与端点间双向通信,就可以实现 Comlink 基本功能的迁移

至于 Comlink 的高级功能,其依赖于 MessagePort 的传递机制,而 MessagePort 传递最终目的是为了开辟出一条新的的信道。所以目标环境的端点间通信如果支持开辟多条信道则也能迁移实现 Comlink 的高级功能

这里演示一个好玩的例子,我们将跳脱浏览器(跨线程通信)环境,实现 Socket.IO 的 Comlink 适配器,将 Comlink 的能力扩展到 RPC(远程过程调用) 调用。

JavaScript 世界中的量子纠缠(从 Comlink 谈谈 IPC 抽象封装)

JavaScript 世界中的量子纠缠(从 Comlink 谈谈 IPC 抽象封装)

Socket.IO 是针对 WebSockets 的封装库,WebSockets 支持了客户端与服务端双向通信的能力,能支持两端点双向通信,那就可以实现 Comlink 的基础功能。封装也很简单实现 Endpoint 接口即可:

import type { Endpoint } from 'comlink';
import type { Socket as ServerSocket } from 'socket.io';
import type { Socket as ClientSocket } from 'socket.io-client';

const MESSAGE_CHANNEL_NAME = '__COMLINK_MESSAGE_CHANNEL__';

const MESSAGE_EVENT_NAME = 'message';

const MESSAGE_EVENT_ERROR = 'Only message event is supported';

export function socketIoEndpoint(options: {
    socket: ServerSocket | ClientSocket;
    messageChannel?: string;
}): Endpoint {
    const listeners = new WeakMap();
    const { socket, messageChannel = MESSAGE_CHANNEL_NAME } = options;

    return {
        postMessage: (message: any, _transfer: MessagePort[]) => {
            socket.emit(messageChannel, message);
        },

        addEventListener: (eventName, eventHandler) => {
            if (eventName !== MESSAGE_EVENT_NAME) {
                throw new Error(MESSAGE_EVENT_ERROR);
            }

            const handler = (data: any) => {
                if ('handleEvent' in eventHandler) {
                    eventHandler.handleEvent({
                        data,
                        ports: [],
                    } as unknown as MessageEvent);
                } else {
                    eventHandler({
                        data,
                        ports: [],
                    } as unknown as MessageEvent);
                }
            };

            socket.on(messageChannel, handler);
            listeners.set(eventHandler, handler);
        },

        removeEventListener: (eventName, eventHandler) => {
            if (eventName !== MESSAGE_EVENT_NAME) {
                throw new Error(MESSAGE_EVENT_ERROR);
            }

            const handler = listeners.get(eventHandler);

            if (handler) {
                socket.off(messageChannel, handler);
                listeners.delete(eventHandler);
            }
        },
    };
}

但目前实现的适配器还不支持 Comlink 的高级特性,函数参数与构造远程对象需要将不可序列化对象变成可转移对象,而本质是需要能开辟一条两端点间的新信道。

Comlink 的默认 proxy TransferHandler 实现如下:

const proxyTransferHandler: TransferHandler<object, MessagePort> = {
    // 检查当前值是否能被处理
    canHandle: (val): val is object => isObject(val) && Reflect.get(val, proxyMarker),
    // 将当前无法被序列化的值,转换成 MessagePort 对象
    serialize(obj) {
        const { port1, port2 } = new MessageChannel();
        expose(obj, port1);
        return [port2, [port2]];
    },
    // 将接收的 MessagePort 包装成代理对象
    deserialize(port) {
        port.start();
        return wrap(port);
    },
};

而在 Socket.IO 中,我们可以使用 socket 的通信机制再开辟一条新的信道,实现 MessageChannel 平替:

const PROXY_MESSAGE_CHANNEL_ID = '__PROXY_MESSAGE_CHANNEL_ID__';

const createProxyTransferHandler = (socket: ServerSocket | ClientSocket) => {
    const proxyTransferHandler: TransferHandler<object, any> = {
        canHandle: (val): val is ProxyMarked => {
            return isObject(val) && (val as ProxyMarked)[proxyMarker];
        },
        serialize(obj) {
            // 将原来有 MessageChannel 创建 MessagePort 的操作变成新建一个 socket 事件名(messageChannel)
            const proxyMessageChannelID = generateUUID();
            expose(
                obj,
                socketIoEndpoint({
                    socket,
                    messageChannel: proxyMessageChannelID,
                })
            );

            return [{ [PROXY_MESSAGE_CHANNEL_ID]: proxyMessageChannelID }, []];
        },
        deserialize(target) {
            return wrap(
                socketIoEndpoint({
                    socket,
                    messageChannel: Reflect.get(
                        target,
                        PROXY_MESSAGE_CHANNEL_ID
                    ),
                })
            );
        },
    };

    return proxyTransferHandler;
};

完整实现如下:

import {
    wrap,
    expose,
    proxyMarker,
    ProxyMarked,
    transferHandlers,
} from 'comlink';
import { isObject, generateUUID } from './utils';
import {
    MESSAGE_CHANNEL_NAME,
    MESSAGE_EVENT_NAME,
    MESSAGE_EVENT_ERROR,
} from './constant';

import type { Endpoint, TransferHandler } from 'comlink';
import type { Socket as ServerSocket } from 'socket.io';
import type { Socket as ClientSocket } from 'socket.io-client';

const PROXY_MESSAGE_CHANNEL_ID = '__PROXY_MESSAGE_CHANNEL_ID__';

const socketIOTransferHandlers: WeakMap<
    ServerSocket | ClientSocket,
    TransferHandler<unknown, unknown>
> = new Map();

const createProxyTransferHandler = (socket: ServerSocket | ClientSocket) => {
    const proxyTransferHandler: TransferHandler<object, any> = {
        canHandle: (val): val is ProxyMarked => {
            return isObject(val) && (val as ProxyMarked)[proxyMarker];
        },
        serialize(obj) {
            const proxyMessageChannelID = generateUUID();
            expose(
                obj,
                socketIoEndpoint({
                    socket,
                    messageChannel: proxyMessageChannelID,
                })
            );

            return [{ [PROXY_MESSAGE_CHANNEL_ID]: proxyMessageChannelID }, []];
        },
        deserialize(target) {
            return wrap(
                socketIoEndpoint({
                    socket,
                    messageChannel: Reflect.get(
                        target,
                        PROXY_MESSAGE_CHANNEL_ID
                    ),
                })
            );
        },
    };

    return proxyTransferHandler;
};

/**
 * init socketIO transferHandlers
 * @param messageChannelConstructor
 */
const initTransferHandlers = (socket: ServerSocket | ClientSocket) => {
    if (!socketIOTransferHandlers.has(socket)) {
        socketIOTransferHandlers.set(
            socket,
            createProxyTransferHandler(socket)
        );

        socket.on('disconnect', () => {
            if (socketIOTransferHandlers.has(socket)) {
                socketIOTransferHandlers.delete(socket);
            }
        });
    }
    
    // 覆盖默认的 proxy TransferHandler
    transferHandlers.set('proxy', socketIOTransferHandlers.get(socket)!);
};

export function socketIoEndpoint(options: {
    socket: ServerSocket | ClientSocket;
    messageChannel?: string;
}): Endpoint {
    const listeners = new WeakMap();
    const { socket, messageChannel = MESSAGE_CHANNEL_NAME } = options;

    initTransferHandlers(socket);

    return {
        postMessage: (message: any, _transfer: MessagePort[]) => {
            socket.emit(messageChannel, message);
        },

        addEventListener: (eventName, eventHandler) => {
            if (eventName !== MESSAGE_EVENT_NAME) {
                throw new Error(MESSAGE_EVENT_ERROR);
            }

            const handler = (data: any) => {
                if ('handleEvent' in eventHandler) {
                    eventHandler.handleEvent({
                        data,
                        ports: [],
                    } as unknown as MessageEvent);
                } else {
                    eventHandler({
                        data,
                        ports: [],
                    } as unknown as MessageEvent);
                }
            };

            socket.on(messageChannel, handler);
            listeners.set(eventHandler, handler);
        },

        removeEventListener: (eventName, eventHandler) => {
            if (eventName !== MESSAGE_EVENT_NAME) {
                throw new Error(MESSAGE_EVENT_ERROR);
            }

            const handler = listeners.get(eventHandler);

            if (handler) {
                socket.off(messageChannel, handler);
                listeners.delete(eventHandler);
            }
        },
    };
}

具体示例可以看 socket-io-demo ,其他几个应用环境的 Comlink 适配器可以戳 comlink-adapters 自取,这里不做赘述。

应用场景

Comlink 很好用,但也有很明显的限制:通信双方需要能共享服务的描述信息,简单来说就是如何共享暴露对象的类型描述。

一般我们只在 IPC 通信场景下使用 Comlink,如 iframe 通信、worker 通信、Electron 的主进程与渲染进程通信……这些场景下的通信双方的业务都由同个业务团队维护,一般都在一个项目内,很方便同时引入 Comlink 机制,只需生成服务对象的类型文件即可实现类型共享。

而在前后端分离的项目中,前后端分属于不同的业务团队,那么引入 Comlink 与共享类型描述就变得麻烦了,这时候就可能需要引入 BFF 层,并由前端团队维护后端通信的端点,并提供单独服务的类型包支持。

实际上无论是 IPC 还是 RPC 调用,服务调用方都得知晓服务描述,至于如何去共享服务描述又是另一个话题了,这块暂未研究过,有机会可以再分享一下。

参考资料

转载自:https://juejin.cn/post/7271055185416831037
评论
请登录