Art虚拟机中的implicit suspend check
记得几年前看到过如下的一段代码,被红框里面的Thread.sleep(0)搞的一头雾水,后来google得知是为了避免java代码一直运行,导致GC得不到及时的执行。因为Thread.sleep是一个native的方法,根据Java语言规范,虚拟机运行到native方法时,线程状态会从runnale转换到native(native状态本身就是一个safepoint),状态转换过程中会做suspend check。
suspend check一般用于GC实现stop the world,GC线程需要独占的访问某些对象(比如GC Root)时,需要等所有工作线程都进入一个安全状态(也叫safepoint),在safepoint处理GC线程发过来的指令(比如suspend),以保证线程安全。因此如果一个工作线程长时间到不了safepoint,GC线程就会一直得不到执行。
为了解决这个问题,现代的虚拟机中每隔一段代码、以及非runnable状态转换到runnable状态时都会插入safepoint,因此Thread.sleep(0)的做法对于现在的jdk来说是多余的。art虚拟机会在函数入口、跳转指令处做suspend check。比如art解释器代码中,遇到跳转指令就会插入一个suspend check
.macro BRANCH
leaq (rPC, rINSTq, 2), rPC
// Update method counter and do a suspend check if the branch is negative or zero.
testq rINSTq, rINSTq
jle 3f //跳转指令
2: // We use 2 and not 1 for this local label as the users of the BRANCH macro have a 1 label.
FETCH_INST //顺序执行不需要suspend check
GOTO_NEXT
3:
movq (%rsp), %rdi
movzwl ART_METHOD_HOTNESS_COUNT_OFFSET(%rdi), %esi
#if (NTERP_HOTNESS_VALUE != 0)
#error Expected 0 for hotness value
#endif
// If the counter is at zero, handle this in the runtime.
testw %si, %si
je NterpHandleHotnessOverflow
// Update counter.
addl $$-1, %esi
movw %si, ART_METHOD_HOTNESS_COUNT_OFFSET(%rdi) //更新hotness counter
DO_SUSPEND_CHECK continue_label=2b //suspend check检查
jmp 2b
.endm
在老版本的aot编译器中,会在循环回边和函数入口处额外插入一些检查的指令
void InstructionCodeGeneratorX86_64::HandleGoto(HInstruction* got, HBasicBlock* successor) {
if (successor->IsExitBlock()) {
DCHECK(got->GetPrevious()->AlwaysThrows());
return; // no code needed
}
HBasicBlock* block = got->GetBlock();
HInstruction* previous = got->GetPrevious();
HLoopInformation* info = block->GetLoopInformation();
if (info != nullptr && info->IsBackEdge(*block) && info->HasSuspendCheck()) { //循环回边
codegen_->MaybeIncrementHotness(/* is_frame_entry= */ false);
GenerateSuspendCheck(info->GetSuspendCheck(), successor); //生成suspend检查指令
return;
}
if (block->IsEntryBlock() && (previous != nullptr) && previous->IsSuspendCheck()) {
GenerateSuspendCheck(previous->AsSuspendCheck(), nullptr);
}
if (!codegen_->GoesToNextBlock(got->GetBlock(), successor)) {
__ jmp(codegen_->GetLabelOf(successor));
}
}
但是这做法会增加不少的代码体积,比如下面这段代码,会增加2个test_suspend检查,一个在函数入口,一个在循环跳转处。
public class SuspendCheck {
public static void main(String[] args) {
String s = args.length == 1 ? args[0] : null; // test suspend
for (String arg : args) {
if (arg.contains(s)) {
break;
}
} // test suspend
}
}
生成的机器码,额外增加了10条指令,以及2个stackmap,占了代码尺寸的20%。
CODE: (code_offset=0x00001020 size=200)...
0x00001020: d1400bf0 sub x16, sp, #0x2000 (8192)
0x00001024: b940021f ldr wzr, [x16]
StackMap[0] (native_pc=0x1028, dex_pc=0x0, register_mask=0x0, stack_mask=0b)
0x00001028: f81c0fe0 str x0, [sp, #-64]!
0x0000102c: a9015ff6 stp x22, x23, [sp, #16]
0x00001030: a90267f8 stp x24, x25, [sp, #32]
0x00001034: a9037bfa stp x26, lr, [sp, #48]
0x00001038: b9400270 ldr w16, [tr] ; state_and_flags // 检查state_and_flags的低3位
0x0000103c: 72000a1f tst w16, #0x7 // 如果任意一位被置位,跳转到test_suspend入口
0x00001040: 540004c1 b.ne #+0x98 (addr 0x10d8)
0x00001044: b9400836 ldr w22, [x1, #8]
StackMap[1] (native_pc=0x1048, dex_pc=0x0, register_mask=0x0, stack_mask=0b)
...
0x000010b4: b9400270 ldr w16, [tr] ; state_and_flags // 第2处suspend check
0x000010b8: 72000a1f tst w16, #0x7
0x000010bc: 54fffd80 b.eq #-0x50 (addr 0x106c)
...
0x000010d4: d65f03c0 ret
0x000010d8: 94000022 bl #+0x88 (addr 0x1160) ; pTestSuspend // test suspend 入口
StackMap[4] (native_pc=0x10dc, dex_pc=0x0, register_mask=0x2, stack_mask=0b) //由于pTestSuspend的线程有可能GC线程遍历stack,因此需要保存调用点的stackmap
0x000010dc: 17ffffda b #-0x98 (addr 0x1044)
0x000010e0: 94000020 bl #+0x80 (addr 0x1160) ; pTestSuspend
0x000010f2: 65FF142500050000 call gs:[1280] ; pTestSuspend //第2处test suspend 入口
StackMap[5] (native_pc=0x10e4, dex_pc=0xa, register_mask=0x6, stack_mask=0b)
0x000010e4: 17ffffe2 b #-0x78 (addr 0x106c)
为了减少代码尺寸,Android 10开始ARM64编译器引入了implicit suspend check,其实现原理提交记录有比较详细的说明:
Vladimir Marko, 6 years ago (April 6th, 2018 12:59 AM) ARM64: Implicit suspend checks using LDR.
Implement implicit suspend checks in compiled managed code. Use a single instruction
ldr x21, [x21, #0]
for the check wherex21
points to a field inThread
that points to itself until we request a checkpoint or suspension and set it to null. After the null becomes visible to a running thread, it requires two loads to get a segmentation fault that is intercepted and redirected to a suspend check.
简单来说就是利用segmentation fault信号机制实现代码打断,这也是一种经典的虚拟机优化手段,比如art里面的implicit null check、Concurrent Mark Compact GC都是利用了信号机制来实现代码监控(managed code
),减少代码尺寸,优化性能。
既然是利用segmentation fault信号,那由谁来触发呢?显然还是原来插入test suspend的地方改为触发segmentation fault的指令:ldr x21, [x21, #0]
,只要寄存器x21指向一个非法的地址就能触发segmentation fault,这样就能用1条指令实现原来5条指令实现的test suspend。编译流程如下:
void InstructionCodeGeneratorARM64::GenerateSuspendCheck(HSuspendCheck* instruction,
HBasicBlock* successor) {
...
if (codegen_->CanUseImplicitSuspendCheck()) {
//使用implicit suspend check,只生成一条ldr指令,然后再记录一个stackmap用于GC爬栈
__ Ldr(kImplicitSuspendCheckRegister, MemOperand(kImplicitSuspendCheckRegister)); // kImplicitSuspendCheckRegister就是寄存器x21
codegen_->RecordPcInfo(instruction, instruction->GetDexPc());
if (successor != nullptr) {
__ B(codegen_->GetLabelOf(successor));
}
return;
}
// 否则要增加slowpath、以及线程状态检查的指令
SuspendCheckSlowPathARM64* slow_path =
down_cast<SuspendCheckSlowPathARM64*>(instruction->GetSlowPath());
...
UseScratchRegisterScope temps(codegen_->GetVIXLAssembler());
Register temp = temps.AcquireW();
__ Ldr(temp, MemOperand(tr, Thread::ThreadFlagsOffset<kArm64PointerSize>().SizeValue()));
__ Tst(temp, Thread::SuspendOrCheckpointRequestFlags()); //生成线程状态检查的指令
if (successor == nullptr) {
__ B(ne, slow_path->GetEntryLabel()); // 检查不通过,跳转到slowpath入口
__ Bind(slow_path->GetReturnLabel());
}
...
}
上面那段java代码,用implicit suspend check的方式编译后,相比原来减少了8条指令,生成的机器码大小只有168字节,相比原来的200字节减少了16%,这样代码尺寸减少,cpu运行的指令也减少了。
CODE: (code_offset=0x00001020 size=168)...
0x00001020: d1400bf0 sub x16, sp, #0x2000 (8192)
0x00001024: b940021f ldr wzr, [x16]
StackMap[0] (native_pc=0x1028, dex_pc=0x0, register_mask=0x0, stack_mask=0b)
0x00001028: f81c0fe0 str x0, [sp, #-64]!
0x0000102c: a9015ff6 stp x22, x23, [sp, #16]
0x00001030: a90267f8 stp x24, x25, [sp, #32]
0x00001034: a9037bfa stp x26, lr, [sp, #48]
0x00001038: f94002b5 ldr x21, [x21] // implicit suspend check
StackMap[1] (native_pc=0x103c, dex_pc=0x0, register_mask=0x2, stack_mask=0b)
0x0000103c: b9400836 ldr w22, [x1, #8]
...
0x000010ac: f94002b5 ldr x21, [x21] // implicit suspend check
StackMap[5] (native_pc=0x10b0, dex_pc=0xa, register_mask=0x6, stack_mask=0b)
0x000010b0: 17ffffed b #-0x4c (addr 0x1064)
0x000010b4: a9415ff6 ldp x22, x23, [sp, #16]
0x000010b8: a94267f8 ldp x24, x25, [sp, #32]
0x000010bc: a9437bfa ldp x26, lr, [sp, #48]
0x000010c0: 910103ff add sp, sp, #0x40 (64)
0x000010c4: d65f03c0 ret
上面说完了代码生成的逻辑,那么运行时,GC是如何将寄存器x21指向非法地址,GC完成后又恢复的?先看下GC的suspend触发逻辑
void RemoveSuspendTrigger() {
tlsPtr_.suspend_trigger = reinterpret_cast<uintptr_t*>(&tlsPtr_.suspend_trigger);
}
void TriggerSuspend() {
tlsPtr_.suspend_trigger = nullptr;
}
只需要把tlsPtr_.suspend_trigger变量映射到寄存器x21就可以了,因为tlsPtr_.suspend_trigger = nullptr就是个非法地址。然后在segmentation fault信号处理逻辑里面完成GC线程提交过来的suspend请求:
bool SuspensionHandler::Action([[maybe_unused]] int sig,
[[maybe_unused]] siginfo_t* info,
void* context) {
constexpr uint32_t kSuspendCheckRegister = 21; //寄存器x21
constexpr uint32_t checkinst =
0xf9400000 | (kSuspendCheckRegister << 5) | (kSuspendCheckRegister << 0);
uint32_t inst = *reinterpret_cast<uint32_t*>(mc->pc);
VLOG(signals) << "checking suspend; inst: " << std::hex << inst << " checkinst: " << checkinst;
if (inst != checkinst) { // 检查是不是test suspend触发的,即检查指令是不是`ldr x21, [x21,#0]`
// The instruction is not good, not ours.
return false;
}
...
mc->pc = reinterpret_cast<uintptr_t>(art_quick_implicit_suspend); // 信号处理处理完成,转到suspend请求的处理函数art_quick_implicit_suspend
...
return true;
}
转载自:https://juejin.cn/post/7352091152583884811