Flutter之isolate的使用及通信原理
由于Dart
是一种单线程模型语言,所以可以避免多线程环境下产生的一系列降低运行效率问题。但单线程模型却有一个非常严重的缺陷,那就是执行计算密集型任务时会阻塞当前任务的执行,从而产生不好的影响(如UI的卡顿等),这时候就需要提供一个新的线程或类似线程的东西来异步执行计算密集型任务。由于Dart
无法创建线程,所以就提供了Isolate
来异步执行计算密集型任务。
在刚开始学习Isolate
时,以为它就是一个类似线程一样的东西。但随着学习的深入,发现Isolate
远比线程复杂,甚至都可以看作一个进程。也由于isolate
比较复杂,所以本文仅分析isolate
的使用、创建及通信。
1、Isolate的使用
在Dart
中,Isolate
的使用及通信都较为复杂,主要是通过
Isolate.spawn
及Isolate.spawnUri
来创建Isolate
,ReceivePort
来进行Isolate
间通信。下面就来看如何使用Isolate
。
1.1、Isolate单向通信
先来看Isolate
间的单向通信,代码如下。
//在父Isolate中调用
Isolate isolate;
start() async {
ReceivePort receivePort = ReceivePort();
//创建子Isolate对象
isolate = await Isolate.spawn(getMsg, receivePort.sendPort);
//监听子Isolate的返回数据
receivePort.listen((data) {
print('data:$data');
receivePort.close();
//关闭Isolate对象
isolate?.kill(priority: Isolate.immediate);
isolate = null;
});
}
//子Isolate对象的入口函数,可以在该函数中做耗时操作
getMsg(sendPort) => sendPort.send("hello");
运行代码后,就会输出新创建Isolate
对象返回的数据,如下。

1.2、Isolate双向通信
再来看多个Isolate
之间的通信实现,代码如下。
//当前函数在父Isolate中
Future<dynamic> asyncFactoriali(n) async {
//父Isolate对应的ReceivePort对象
final response = ReceivePort();
//创建一个子Isolate对象
await Isolate.spawn(_isolate, response.sendPort);
final sendPort = await response.first as SendPort;
final answer = ReceivePort();
//给子Isolate发送数据
sendPort.send([n, answer.sendPort]);
return answer.first;
}
//子Isolate的入口函数,可以在该函数中做耗时操作
//_isolate必须是顶级函数(不能存在任何类中)或者是静态函数(可以存在类中)
_isolate(SendPort initialReplyTo) async {
//子Isolate对应的ReceivePort对象
final port = ReceivePort();
initialReplyTo.send(port.sendPort);
final message = await port.first as List;
final data = message[0] as int;
final send = message[1] as SendPort;
//给父Isolate的返回数据
send.send(syncFactorial(data));
}
//运行代码
start() async {
print("计算结果:${await asyncFactoriali(4)}");
}
start();
通过在新创建的Isolate
中计算并返回数据后,得到如下返回结果。

通过上面代码,我们就可以能够通过Isolate
来执行异步任务。下面再来看其具体实现原理。
2、isolate的创建与运行
先从下面的时序图来看isolate
是如何创建、初始化及运行的。

2.1、isolate的创建
首先来看isolate
的创建,在上面例子中是通过Isolate.spawn
来创建Isolate
对象。
class Isolate {
//声明外部实现
external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused: false,
bool errorsAreFatal,
SendPort onExit,
SendPort onError,
@Since("2.3") String debugName});
}
这里的external
关键字主要是声明spawn
这个函数,具体实现由外部提供。在Dart
中,该函数的具体实现是在isolate_patch.dart
中。先来看spawn
的具体实现。
@patch
class Isolate {
@patch
static Future<Isolate> spawn<T>(void entryPoint(T message), T message,
{bool paused: false,
bool errorsAreFatal,
SendPort onExit,
SendPort onError,
String debugName}) async {
// `paused` isn't handled yet.
RawReceivePort readyPort;
try {
//该函数执行是异步的
_spawnFunction(
readyPort.sendPort,
script.toString(),
entryPoint,
message,
paused,
errorsAreFatal,
onExit,
onError,
null,
packageConfig,
debugName);
return await _spawnCommon(readyPort);
} catch (e, st) {
...
}
}
static Future<Isolate> _spawnCommon(RawReceivePort readyPort) {
Completer completer = new Completer<Isolate>.sync();
//监听Isolate是否创建完毕,当子Isolate创建完毕后会通知父Isolate
readyPort.handler = (readyMessage) {
//关闭端口
readyPort.close();
if (readyMessage is List && readyMessage.length == 2) {//子Isolate创建成功
SendPort controlPort = readyMessage[0];
List capabilities = readyMessage[1];
completer.complete(new Isolate(controlPort,
pauseCapability: capabilities[0],
terminateCapability: capabilities[1]));
} else if (readyMessage is String) {...} else {...}
};
return completer.future;
}
......
//调用虚拟机中的Isolate_spawnFunction函数
static void _spawnFunction(
SendPort readyPort,
String uri,
Function topLevelFunction,
var message,
bool paused,
bool errorsAreFatal,
SendPort onExit,
SendPort onError,
String packageRoot,
String packageConfig,
String debugName) native "Isolate_spawnFunction";
......
}
这里的_spawnFunction
调用的是Dart VM中的Isolate_spawnFunction
函数,该函数就是把Isolate
对象的创建交给线程池执行,所以Isolate
对象的创建是异步的。这里的线程池是在Dart VM初始化的时候创建的。
[->third_party/dart/runtime/lib/isolate.cc]
DEFINE_NATIVE_ENTRY(Isolate_spawnFunction, 0, 11) {
...
if (closure.IsClosure()) {
...
//异步执行,thread_pool是一个线程池,该线程池是在Dart VM创建时创建的
Dart::thread_pool()->Run<SpawnIsolateTask>(isolate, std::move(state));
return Object::null();
}
}
...
return Object::null();
}
SpawnIsolateTask
是一个类似Java中实现了Runable
接口的类,在该类中主要是进行子Isolate
对象的创建及运行,来看其具体实现。
[->third_party/dart/runtime/lib/isolate.cc]
//在子线程中执行
class SpawnIsolateTask : public ThreadPool::Task {
void Run() override {
auto group = state_->isolate_group();
// create_group_callback是在Dart VM创建时初始化的。
Dart_IsolateGroupCreateCallback create_group_callback =
Isolate::CreateGroupCallback();
...
// OnIsolateInitialize是在Dart VM初始化时设置的
Dart_InitializeIsolateCallback initialize_callback =
Isolate::InitializeCallback();
...
char* error = nullptr;
Isolate* isolate = nullptr;
//group及initialize_callback都是在虚拟机初始化的时候设置的
if (!FLAG_enable_isolate_groups || group == nullptr ||
initialize_callback == nullptr) {
Dart_IsolateFlags api_flags = *(state_->isolate_flags());
//创建一个新的isolate
isolate = reinterpret_cast<Isolate*>((create_group_callback)(
state_->script_url(), name, nullptr, state_->package_config(),
&api_flags, parent_isolate_->init_callback_data(), &error));
parent_isolate_->DecrementSpawnCount();
parent_isolate_ = nullptr;
} else {
...
}
...
// isolate是否是可运行的
// 是在OnIsolateInitialize中设置的
if (isolate->is_runnable()) {
//运行isolate
//[见2.3小节]
isolate->Run();
}
}
};
默认情况下,FLAG_enable_isolate_groups
为false,group
及initialize_callback
都不为null。所以新的isolate
是通过create_group_callback
来创建的。
[->third_party/dart/runtime/vm/dart.cc]
char* Dart::Init(const uint8_t* vm_isolate_snapshot,
const uint8_t* instructions_snapshot,
Dart_IsolateGroupCreateCallback create_group,
Dart_InitializeIsolateCallback initialize_isolate,
Dart_IsolateShutdownCallback shutdown,
Dart_IsolateCleanupCallback cleanup,
Dart_IsolateGroupCleanupCallback cleanup_group,
Dart_ThreadExitCallback thread_exit,
Dart_FileOpenCallback file_open,
Dart_FileReadCallback file_read,
Dart_FileWriteCallback file_write,
Dart_FileCloseCallback file_close,
Dart_EntropySource entropy_source,
Dart_GetVMServiceAssetsArchive get_service_assets,
bool start_kernel_isolate,
Dart_CodeObserver* observer) {
...
Isolate::SetCreateGroupCallback(create_group);
Isolate::SetInitializeCallback_(initialize_isolate);
...
}
这里来看create_group_callback
对应的函数实现,它是在调用init
函数时传递过来的。根据调用链来看,在DartVM
对象初始化时,会将调用init
函数。
[->flutter/runtime/dart_vm.cc]
DartVM::DartVM(std::shared_ptr<const DartVMData> vm_data,
std::shared_ptr<IsolateNameServer> isolate_name_server)
: settings_(vm_data->GetSettings()),
concurrent_message_loop_(fml::ConcurrentMessageLoop::Create()),
skia_concurrent_executor_(
[runner = concurrent_message_loop_->GetTaskRunner()](
fml::closure work) { runner->PostTask(work); }),
vm_data_(vm_data),
isolate_name_server_(std::move(isolate_name_server)),
service_protocol_(std::make_shared<ServiceProtocol>()) {
...
{
Dart_InitializeParams params = {};
...
//create_group_callback对应函数实现
params.create_group = reinterpret_cast<decltype(params.create_group)>(
DartIsolate::DartIsolateGroupCreateCallback);
//initialize_isolate对应函数实现
params.initialize_isolate =
reinterpret_cast<decltype(params.initialize_isolate)>(
DartIsolate::DartIsolateInitializeCallback);
...
char* init_error = Dart_Initialize(¶ms);
...
}
...
}
下面再来看DartIsolateGroupCreateCallback
的实现。
[->flutter/runtime/dart_isolate.cc]
Dart_Isolate DartIsolate::DartIsolateGroupCreateCallback(
const char* advisory_script_uri,
const char* advisory_script_entrypoint,
const char* package_root,
const char* package_config,
Dart_IsolateFlags* flags,
std::shared_ptr<DartIsolate>* parent_isolate_data,
char** error) {
...
//创建DartIsolate对象,此时DartIsolate对象处于Uninitialized状态
auto isolate_data = std::make_unique<std::shared_ptr<DartIsolate>>(
std::shared_ptr<DartIsolate>(new DartIsolate(
(*isolate_group_data)->GetSettings(), // settings
null_task_runners, // task_runners
fml::WeakPtr<SnapshotDelegate>{}, // snapshot_delegate
fml::WeakPtr<IOManager>{}, // io_manager
fml::RefPtr<SkiaUnrefQueue>{}, // unref_queue
fml::WeakPtr<ImageDecoder>{}, // image_decoder
advisory_script_uri, // advisory_script_uri
advisory_script_entrypoint, // advisory_script_entrypoint
false)));
//创建isolate对象
Dart_Isolate vm_isolate = CreateDartIsolateGroup(
std::move(isolate_group_data), std::move(isolate_data), flags, error);
...
return vm_isolate;
}
Dart_Isolate DartIsolate::CreateDartIsolateGroup(
std::unique_ptr<std::shared_ptr<DartIsolateGroupData>> isolate_group_data,
std::unique_ptr<std::shared_ptr<DartIsolate>> isolate_data,
Dart_IsolateFlags* flags,
char** error) {
// 创建Isoalte对象
Dart_Isolate isolate = Dart_CreateIsolateGroup(
(*isolate_group_data)->GetAdvisoryScriptURI().c_str(),
(*isolate_group_data)->GetAdvisoryScriptEntrypoint().c_str(),
(*isolate_group_data)->GetIsolateSnapshot()->GetDataMapping(),
(*isolate_group_data)->GetIsolateSnapshot()->GetInstructionsMapping(),
flags, isolate_group_data.get(), isolate_data.get(), error);
if (isolate == nullptr) {
return nullptr;
}
// 将Isolate的控制权交给Dart VM
std::shared_ptr<DartIsolate> embedder_isolate(*isolate_data);
isolate_group_data.release();
isolate_data.release();
//初始化isoalte
if (!InitializeIsolate(std::move(embedder_isolate), isolate, error)) {
return nullptr;
}
return isolate;
}
在DartIsolateGroupCreateCallback
中通过CreateDartIsolateGroup
来调用Dart VM中的Dart_CreateIsolateGroup
函数实现Isolate
的创建,代码实现如下。
[->third_party/dart/runtime/vm/dart_api_impl.cc]
DART_EXPORT Dart_Isolate
Dart_CreateIsolateGroup(const char* script_uri,
const char* name,
const uint8_t* snapshot_data,
const uint8_t* snapshot_instructions,
Dart_IsolateFlags* flags,
void* isolate_group_data,
void* isolate_data,
char** error) {
Dart_IsolateFlags api_flags;
if (flags == nullptr) {
Isolate::FlagsInitialize(&api_flags);
flags = &api_flags;
}
const char* non_null_name = name == nullptr ? "isolate" : name;
std::unique_ptr<IsolateGroupSource> source(
new IsolateGroupSource(script_uri, non_null_name, snapshot_data,
snapshot_instructions, nullptr, -1, *flags));
auto group = new IsolateGroup(std::move(source), isolate_group_data);
IsolateGroup::RegisterIsolateGroup(group);
//创建新的isolate
Dart_Isolate isolate =
CreateIsolate(group, non_null_name, isolate_data, error);
if (isolate != nullptr) {
group->set_initial_spawn_successful();
}
return isolate;
}
...
static Dart_Isolate CreateIsolate(IsolateGroup* group,
const char* name,
void* isolate_data,
char** error) {
auto source = group->source();
Isolate* I = Dart::CreateIsolate(name, source->flags, group);
...
Dart::ShutdownIsolate();
return reinterpret_cast<Dart_Isolate>(NULL);
}
经过一系列调用,最终调用dart.cc中的CreateIsolate
函数,该函数很简单,就是创建一个新的Isolate
对象。
[->third_party/dart/runtime/vm/dart.cc]
Isolate* Dart::CreateIsolate(const char* name_prefix,
const Dart_IsolateFlags& api_flags,
IsolateGroup* isolate_group) {
// Create a new isolate.
Isolate* isolate =
Isolate::InitIsolate(name_prefix, isolate_group, api_flags);
return isolate;
}
[->third_party/dart/runtime/vm/isolate.cc]
//初始化Isolate
Isolate* Isolate::InitIsolate(const char* name_prefix,
IsolateGroup* isolate_group,
const Dart_IsolateFlags& api_flags,
bool is_vm_isolate) {
//1、创建一个Isolate对象
Isolate* result = new Isolate(isolate_group, api_flags);
...
//2、创建Isolate对应的堆空间,在该堆空间中,存在对象的分配,垃圾回收等。
Heap::Init(result,
is_vm_isolate
? 0 // New gen size 0; VM isolate should only allocate in old.
: FLAG_new_gen_semi_max_size * MBInWords,//MBInWords值是128kb,
(is_service_or_kernel_isolate ? kDefaultMaxOldGenHeapSize
: FLAG_old_gen_heap_size) *
MBInWords);
//3、将Isolate与Thread相关联
if (!Thread::EnterIsolate(result)) {
// We failed to enter the isolate, it is possible the VM is shutting down,
// return back a NULL so that CreateIsolate reports back an error.
if (KernelIsolate::IsKernelIsolate(result)) {
KernelIsolate::SetKernelIsolate(nullptr);
}
if (ServiceIsolate::IsServiceIsolate(result)) {
ServiceIsolate::SetServiceIsolate(nullptr);
}
delete result;
return nullptr;
}
// Setup the isolate message handler.
//4、设置isolate的消息处理器
MessageHandler* handler = new IsolateMessageHandler(result);
result->set_message_handler(handler);
// Setup the Dart API state.
//5、启动Dart API状态
ApiState* state = new ApiState();
result->set_api_state(state);
//6、设置主端口
result->set_main_port(PortMap::CreatePort(result->message_handler()));
// Add to isolate list. Shutdown and delete the isolate on failure.
//7、将当前的Isolate添加到链表中(一个单链表)
if (!AddIsolateToList(result)) {
//添加失败,销毁该Isolate
result->LowLevelShutdown();
//取消线程与Isolate的关联
Thread::ExitIsolate();
//如果是虚拟机内部的Isolate
if (KernelIsolate::IsKernelIsolate(result)) {
KernelIsolate::SetKernelIsolate(nullptr);
}
//如果是Service Isolate
if (ServiceIsolate::IsServiceIsolate(result)) {
ServiceIsolate::SetServiceIsolate(nullptr);
}
//删除当前Isolate对象
delete result;
return nullptr;
}
return result;
}
InitIsolate
函数比较重要,主要做了以下事情。
- 创建
Isolate
对象 - 创建
Isolate
中的堆空间,在Isolate
仅有一块堆空间。存在堆空间也就会存在对象分配、垃圾回收等。 - 将
Isolate
对象与一个线程进行关联,也就是可以说一个线程对应着一个Isolate
对象。 - 设置消息处理器(
IsolateMessageHandler
),主要是对于Isolate
中的消息处理。子Isolate
可以通过端口向父Isolate
的MessageHandler
中添加消息,反之亦然。这也是Isolate
间的通信的实现。 - 设置api state,暂时没搞懂这个是干啥的。
- 设置主端口。
- 将当前
Isolate
添加到链表中。
当上面的一些操作执行完毕后,一个Isolate
对象就创建成功了。

2.2、DartIsolate初始化
在CreateDartIsolateGroup
中调用Dart_CreateIsolateGroup
函数创建isolate
对象成功后,还会进行DartIsolate
对象的初始化,其初始化是在InitializeIsolate
函数中实现的。
[->flutter/runtime/dart_isolate.cc]
bool DartIsolate::InitializeIsolate(
std::shared_ptr<DartIsolate> embedder_isolate,
Dart_Isolate isolate,
char** error) {
//isolate的初始化,此时DartIsolate对象处于Initialized状态
if (!embedder_isolate->Initialize(isolate)) {
return false;
}
//加载library,此时DartIsolate对象处于LibrariesSetup状态
if (!embedder_isolate->LoadLibraries()) {
return false;
}
//如果是非RootIsolate,那么DartIsolate对象将处于Ready状态,也只有DartIsolate对象处于Ready状态时,Isolate才可以运行。
if (!embedder_isolate->IsRootIsolate()) {
auto child_isolate_preparer =
embedder_isolate->GetIsolateGroupData().GetChildIsolatePreparer();
if (!child_isolate_preparer(embedder_isolate.get())) {
return false;
}
}
return true;
}
在DartIsolate
初始化后,DartIsolate
对象就由Uninitialized
状态切换为Ready
状态,这时候isolate
也就是可运行的。
2.3、isolate的运行
isolate
可运行后,接下来就会来运行isolate
。在SpawnIsolateTask
类中,运行isolate
调用的是其Run
函数,实现如下。
[->third_party/dart/runtime/vm/isolate.cc]
void Isolate::Run() {
//向消息处理器中添加的第一个消息
//记住该RunIsolate函数,在后面会说到
message_handler()->Run(Dart::thread_pool(), RunIsolate, ShutdownIsolate,
reinterpret_cast<uword>(this));
}
[->third_party/dart/runtime/vm/MessageHandler.cc]
void MessageHandler::Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data) {
MonitorLocker ml(&monitor_);
pool_ = pool;
start_callback_ = start_callback;
end_callback_ = end_callback;
callback_data_ = data;
task_running_ = true;
//在线程池中执行任务
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
}
然后继续异步执行,但这次是在子Isolate
中执行的。下面再来看MessageHandlerTask
,在MessageHandlerTask
的run
函数中执行的是TaskCallback
函数。
[->third_party/dart/runtime/vm/message_handler.cc]
void MessageHandler::TaskCallback() {
MessageStatus status = kOK;
bool run_end_callback = false;
bool delete_me = false;
EndCallback end_callback = NULL;
CallbackData callback_data = 0;
{
...
if (status == kOK) {
//仅当子Isolate第一次运行时,start_callback_才不为null
if (start_callback_ != nullptr) {
ml.Exit();
//调用Isolate的第一个函数(允许多线程并发执行)
status = start_callback_(callback_data_);
ASSERT(Isolate::Current() == NULL);
start_callback_ = NULL;
ml.Enter();
}
...
}
...
}
...
}
先不管消息处理[见小结3],这里重点来看start_callback_
,它对应着RunIsolate
这个函数。
[->third_party/dart/runtime/vm/isolate.cc]
//运行Isolate
static MessageHandler::MessageStatus RunIsolate(uword parameter) {
...
{
...
//args是调用Dart层_startIsolate函数所需的参数集合
const Array& args = Array::Handle(Array::New(7));
args.SetAt(0, SendPort::Handle(SendPort::New(state->parent_port())));
args.SetAt(1, Instance::Handle(func.ImplicitStaticClosure()));
args.SetAt(2, Instance::Handle(state->BuildArgs(thread)));
args.SetAt(3, Instance::Handle(state->BuildMessage(thread)));
args.SetAt(4, is_spawn_uri ? Bool::True() : Bool::False());
args.SetAt(5, ReceivePort::Handle(ReceivePort::New(
isolate->main_port(), true /* control port */)));
args.SetAt(6, capabilities);
//调用Dart层的_startIsolate函数,该函数在isolate_patch.dart文件中
const Library& lib = Library::Handle(Library::IsolateLibrary());
const String& entry_name = String::Handle(String::New("_startIsolate"));
const Function& entry_point =
Function::Handle(lib.LookupLocalFunction(entry_name));
ASSERT(entry_point.IsFunction() && !entry_point.IsNull());
result = DartEntry::InvokeFunction(entry_point, args);
if (result.IsError()) {
return StoreError(thread, Error::Cast(result));
}
}
return MessageHandler::kOK;
}
在RunIsolate
中,会调用isolate_patch.dart
中的_startIsolate
函数,从而调用创建Isolate
对象时传递的初始化函数。
@pragma("vm:entry-point", "call")
void _startIsolate(
SendPort parentPort,
Function entryPoint,
List<String> args,
var message,
bool isSpawnUri,
RawReceivePort controlPort,
List capabilities) {
// The control port (aka the main isolate port) does not handle any messages.
if (controlPort != null) {
controlPort.handler = (_) {}; // Nobody home on the control port.
}
if (parentPort != null) {
// Build a message to our parent isolate providing access to the
// current isolate's control port and capabilities.
//
// TODO(floitsch): Send an error message if we can't find the entry point.
var readyMessage = new List(2);
readyMessage[0] = controlPort.sendPort;
readyMessage[1] = capabilities;
// Out of an excess of paranoia we clear the capabilities from the
// stack. Not really necessary.
capabilities = null;
//告诉父Isolate,当前`Isolate`已经创建成功
parentPort.send(readyMessage);
}
// Delay all user code handling to the next run of the message loop. This
// allows us to intercept certain conditions in the event dispatch, such as
// starting in paused state.
RawReceivePort port = new RawReceivePort();
port.handler = (_) {
port.close();
if (isSpawnUri) {
if (entryPoint is _BinaryFunction) {
(entryPoint as dynamic)(args, message);
} else if (entryPoint is _UnaryFunction) {
(entryPoint as dynamic)(args);
} else {
entryPoint();
}
} else {
//初始化函数
entryPoint(message);
}
};
// Make sure the message handler is triggered.
port.sendPort.send(null);
}
在_startIsolate
函数中主要是做了以下几件事。
- 告诉父
Isolate
,子Isolate
已经创建成功。 - 调用子
Isolate
的初始化函数,也就是入口函数。
到此,一个新的Isolate
就已经创建完毕。在创建过程中,会从Dart SDK调用虚拟机函数,然后在新的Isolate
对象中通过异步的方式调用入口函数。
注意:主Isolate
的入口函数就是熟悉的main
函数。
3、isolate之间的通信原理
通过前面一节,知道了Dart
是如何创建一个新的Isolate
对象的。但也还是省略了很多东西的,比如子Isolate
通知父Isolate
的原理,也就是Isolate
间的通信原理。
3.1、ReceivePort与SendPort
在Isolate
给另外一个Isolate
发送消息之前,需要先来熟悉ReceivePort
及SendPort
。代码如下。
abstract class ReceivePort implements Stream {
//声明外部实现
external factory ReceivePort();
}
//在isolate_patch.dart中
@patch
class ReceivePort {
@patch
factory ReceivePort() => new _ReceivePortImpl();
@patch
factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) {
return new _ReceivePortImpl.fromRawReceivePort(rawPort);
}
}
class _ReceivePortImpl extends Stream implements ReceivePort {
_ReceivePortImpl() : this.fromRawReceivePort(new RawReceivePort());
_ReceivePortImpl.fromRawReceivePort(this._rawPort) {
_controller = new StreamController(onCancel: close, sync: true);
_rawPort.handler = _controller.add;
}
//返回一个SendPort对象
SendPort get sendPort {
return _rawPort.sendPort;
}
//监听发送的消息
StreamSubscription listen(void onData(var message),
{Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
...
}
@patch
class RawReceivePort {
@patch
factory RawReceivePort([Function handler]) {
_RawReceivePortImpl result = new _RawReceivePortImpl();
result.handler = handler;
return result;
}
}
@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {
factory _RawReceivePortImpl() native "RawReceivePortImpl_factory";
...
SendPort get sendPort {
return _get_sendport();
}
...
/**** Internal implementation details ****/
_get_id() native "RawReceivePortImpl_get_id";
_get_sendport() native "RawReceivePortImpl_get_sendport";
...
}
在代码中,一个ReceivePort
对象包含一个RawReceivePort
对象及SendPort
对象。其中RawReceivePort
对象是在虚拟机中创建的,它对应着虚拟机中的ReceivePort
类。代码如下。
[->third_party/dart/runtime/lib.isolate.cc]
DEFINE_NATIVE_ENTRY(RawReceivePortImpl_factory, 0, 1) {
ASSERT(
TypeArguments::CheckedHandle(zone, arguments->NativeArgAt(0)).IsNull());
//创建一个Entry对象并返回一个端口号。
Dart_Port port_id = PortMap::CreatePort(isolate->message_handler());
//创建ReceivePort对象
return ReceivePort::New(port_id, false /* not control port */);
}
在创建ReceivePort
对象对象之前,首先会将当前Isolate
中的MessageHandler
对象添加到map中。这里是一个全局的map,在Dart VM初始化的时候创建,每个元素都是一个Entry
对象,在Entry
中,有一个MessageHandler
对象,一个端口号及该端口的状态。
typedef struct {
//端口号
Dart_Port port;
//消息处理器
MessageHandler* handler;
//端口号状态
PortState state;
} Entry;
[->third_party/dart/runtime/vm/port.cc]
Dart_Port PortMap::CreatePort(MessageHandler* handler) {
...
Entry entry;
//分配一个端口号
entry.port = AllocatePort();
//设置消息处理器
entry.handler = handler;
//端口号状态
entry.state = kNewPort;
//查找当前entry的位置
intptr_t index = entry.port % capacity_;
Entry cur = map_[index];
// Stop the search at the first found unused (free or deleted) slot.
//找到空闲或将要被删除的Entry。
while (cur.port != 0) {
index = (index + 1) % capacity_;
cur = map_[index];
}
if (map_[index].handler == deleted_entry_) {
// Consuming a deleted entry.
deleted_--;
}
//插入到map中
map_[index] = entry;
// Increment number of used slots and grow if necessary.
used_++;
//检查是否需要扩容
MaintainInvariants();
...
//返回端口号
return entry.port;
}
注意: 这里的map的初始容量是8,当达到容量的3/4时,会进行扩容,新的容量是旧的容量2倍。熟悉Java的就知道,这跟HashMap
类似,初始容量为8,加载因子为0.75,扩容是指数级增长。
再来看ReceivePort
对象的创建。
[->third_party/dart/runtime/vm/object.cc]
RawReceivePort* ReceivePort::New(Dart_Port id,
bool is_control_port,
Heap::Space space) {
Thread* thread = Thread::Current();
Zone* zone = thread->zone();
const SendPort& send_port =
//创建SendPort对象
SendPort::Handle(zone, SendPort::New(id, thread->isolate()->origin_id()));
ReceivePort& result = ReceivePort::Handle(zone);
{
//创建ReceivePort对象
RawObject* raw = Object::Allocate(ReceivePort::kClassId,//classId
ReceivePort::InstanceSize(),//对象大小
space);
NoSafepointScope no_safepoint;
result ^= raw;
result.StorePointer(&result.raw_ptr()->send_port_, send_port.raw());
}
if (is_control_port) {
//更新端口的状态,设为kControlPort
PortMap::SetPortState(id, PortMap::kControlPort);
} else {
//更新端口的状态,设为kLivePort
PortMap::SetPortState(id, PortMap::kLivePort);
}
return result.raw();
}
[->third_party/dart/runtime/vm/object.cc]
RawSendPort* SendPort::New(Dart_Port id,
Dart_Port origin_id,
Heap::Space space) {
SendPort& result = SendPort::Handle();
{
//创建SendPort对象
RawObject* raw =
Object::Allocate(SendPort::kClassId, //classId
SendPort::InstanceSize(), //对象ID
space);
NoSafepointScope no_safepoint;
result ^= raw;
result.StoreNonPointer(&result.raw_ptr()->id_, id);
result.StoreNonPointer(&result.raw_ptr()->origin_id_, origin_id);
}
return result.raw();
}
这里创建对象时传递的classId是在Isolate
对象初始化时注册的,然后根据该classId来创建相应的对象。在这里,ReceivePort
对应着Dart SDK中的_RawReceivePortImpl
对象,SendPort
对应着Dart SDK中的_SendPortImpl
对象。
也就是当创建ReceivePort
对象时,会通过Dart VM来创建对应的_RawReceivePortImpl
对象及SendPort
对应的_SendPortImpl
对象。
3.2、isolate间通信
当ReceivePort
创建成功后,就可以通过调用_SendPortImpl
的send
函数来发送消息。
@pragma("vm:entry-point")
class _SendPortImpl implements SendPort {
...
/*--- public interface ---*/
@pragma("vm:entry-point", "call")
void send(var message) {
_sendInternal(message);
}
...
// Forward the implementation of sending messages to the VM.
void _sendInternal(var message) native "SendPortImpl_sendInternal_";
}
_sendInternal
的具体实现在Dart VM中。
[->third_party/dart/runtime/lib/isolate.cc]
DEFINE_NATIVE_ENTRY(SendPortImpl_sendInternal_, 0, 2) {
...
//目标Isolate所对应端口号
const Dart_Port destination_port_id = port.Id();
const bool can_send_any_object = isolate->origin_id() == port.origin_id();
if (ApiObjectConverter::CanConvert(obj.raw())) {//如果发送消息为null或者发送消息不是堆对象
PortMap::PostMessage(
Message::New(destination_port_id, obj.raw(), Message::kNormalPriority));
} else {
//创建一个MessageWriter对象——writer
MessageWriter writer(can_send_any_object);
// TODO(turnidge): Throw an exception when the return value is false?
PortMap::PostMessage(writer.WriteMessage(obj, destination_port_id,
Message::kNormalPriority));
}
return Object::null();
}
[->third_party/dart/runtime/vm/port.cc]
bool PortMap::PostMessage(std::unique_ptr<Message> message,
bool before_events) {
MutexLocker ml(mutex_);
//在map中根据目标端口号寻找Entry所在的位置
intptr_t index = FindPort(message->dest_port());
if (index < 0) {
return false;
}
//从map中拿到Entry对象并取出MessageHandler对象
MessageHandler* handler = map_[index].handler;
//这里的handler是目标Isolate中的MessageHandler
handler->PostMessage(std::move(message), before_events);
return true;
}
到这里就已经成功将消息加入到了目标Isolate
的MessageHandler
中,成功完成了Isolate
间消息的传递,但还尚未对消息进行处理。
再来看Isolate
对于消息的处理。
[->third_party/dart/runtime/vm/message_handler.cc]
void MessageHandler::PostMessage(std::unique_ptr<Message> message,
bool before_events) {
Message::Priority saved_priority;
{
MonitorLocker ml(&monitor_);
...
saved_priority = message->priority();
if (message->IsOOB()) {
//加入到OOB类型消息的队列中
oob_queue_->Enqueue(std::move(message), before_events);
} else {
//加入到普通消息队列中
queue_->Enqueue(std::move(message), before_events);
}
if (paused_for_messages_) {
ml.Notify();
}
if (pool_ != nullptr && !task_running_) {
task_running_ = true;
//异步处理
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
}
}
// Invoke any custom message notification.
//如果自定义了消息通知函数,那么在消息处理完毕后会调用该函数
MessageNotify(saved_priority);
}
在PostMessage
中主要是做了以下操作。
- 根据消息级别将消息加入到不同的队列中。主要有OOB消息及普通消息两个级别,OOB消息在队列
oob_queue_
中,普通消息在队列queue_
中。OOB消息级别高于普通消息,会立即处理。 - 将一个消息处理任务
MessageHandlerTask
加入到线程中。
这里的线程池是在Dart VM创建的时候创建的,在Isolate
运行时传递给MessageHandler
的。
下面再来看MessageHandlerTask
,在MessageHandlerTask
的run
函数中执行的是TaskCallback
函数。
[->third_party/dart/runtime/vm/message_handler.cc]
void MessageHandler::TaskCallback() {
MessageStatus status = kOK;
bool run_end_callback = false;
bool delete_me = false;
EndCallback end_callback = NULL;
CallbackData callback_data = 0;
{
...
if (status == kOK) {
...
bool handle_messages = true;
while (handle_messages) {
handle_messages = false;
// Handle any pending messages for this message handler.
if (status != kShutdown) {
//处理消息
status = HandleMessages(&ml, (status == kOK), true);
}
if (status == kOK && HasLivePorts()) {
handle_messages = CheckIfIdleLocked(&ml);
}
}
}
...
}
...
}
消息的处理是在HandleMessages
函数中进行的。
[->third_party/dart/runtime/vm/message_handler.cc]
MessageHandler::MessageStatus MessageHandler::HandleMessages(
MonitorLocker* ml,
bool allow_normal_messages,
bool allow_multiple_normal_messages) {
...
//从队列中获取一个消息,优先OOB消息
std::unique_ptr<Message> message = DequeueMessage(min_priority);
//没有消息时退出循环,停止消息的处理
while (message != nullptr) {
//获取消息的长度
intptr_t message_len = message->Size();
...
//获取消息级别
Message::Priority saved_priority = message->priority();
Dart_Port saved_dest_port = message->dest_port();
MessageStatus status = kOK;
{
DisableIdleTimerScope disable_idle_timer(idle_time_handler);
//消息的处理
status = HandleMessage(std::move(message));
}
...
//如果是已关闭状态,将清除OOB类型消息
if (status == kShutdown) {
ClearOOBQueue();
break;
}
...
//继续从队列中获取消息
message = DequeueMessage(min_priority);
}
return max_status;
}
在HandleMessages
函数中会根据消息的优先级别来遍历所有消息并一一处理,直至处理完毕。具体消息处理是在HandleMessage
函数中进行的。该函数在其子类IsolateMessageHandler
中实现。
[->third_party/dart/runtime/vm/isolate.cc]
MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(
std::unique_ptr<Message> message) {
...
//如果是普通消息
if (!message->IsOOB() && (message->dest_port() != Message::kIllegalPort)) {
//调用Dart层的_lookupHandler函数,返回该函数在isolate_patch.dart中
msg_handler = DartLibraryCalls::LookupHandler(message->dest_port());
...
}
...
MessageStatus status = kOK;
if (message->IsOOB()) {//处理OOB消息
...
} else if (message->dest_port() == Message::kIllegalPort) {//处理OOB消息,主要是处理延迟OOB消息
...
} else {//处理普通消息
...
//调用Dart层的_RawReceivePortImpl对象中的_handleMessage函数,该函数在isolate_patch.dart中
const Object& result =
Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
if (result.IsError()) {
status = ProcessUnhandledException(Error::Cast(result));
} else {
...
}
}
return status;
}
在这里先暂时不管OOB消息的处理,来看普通消息的处理。
- 首先调用Dart SDK中
_RawReceivePortImpl
对象的_lookupHandler
函数,返回一个在创建_RawReceivePortImpl
对象时注册的一个自定义函数。 - 调用Dart SDK中
_RawReceivePortImpl
对象的_handleMessage
函数并传入1中返回的自定义函数,通过该自定义函数将消息分发出去。

至此,一个Isolate
就已经成功的向另外一个Isolate
成功发送并接收消息。而双向通信也很简单,在父Isolate
中创建一个端口,并在创建子Isolate
时,将这个端口传递给子Isolate
。然后在子Isolate
调用其入口函数时也创建一个新端口,并通过父Isolate
传递过来的端口把子Isolate
创建的端口传递给父Isolate
,这样父Isolate
与子Isolate
分别拥有对方的一个端口号,从而实现了通信。具体代码[见小节1.2]。
4、总结
通过上面的内容就可以对isolate
的的创建、运行及通信的实现有了一个基本的了解,当然isolate
也不仅仅只有上述的一些东西。但由于篇幅限制,其他内容(堆的内存分配、对象的垃圾回收等),后面再来一一分析。
【参考资料】
转载自:https://juejin.cn/post/6844904148303872013