Golang 基于信号的异步抢占与处理
By admin
- 5 minutes read - 1054 words在Go1.14版本开始实现了 基于信号的协程抢占调度
模式,在此版本以前执行以下代码是永远也无法执行最后一条println语句。
本文基于go version 1.16
package main
import (
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(1)
go func() {
for {
}
}()
time.Sleep(time.Millisecond)
println("OK")
}
原因很简单:在main函数里只有一个CPU,从上到下执行到 time.Sleep()
函数的时候,会将 main goroutine
放入运行队列,出让了P,开始执行匿名函数,但匿名函数是一个for循环,没有任何 IO
语句,也就无法引起对 G
的调度,所以当前仅有的一个 P
永远被其占用,导致无法打印OK。
这个问题在1.14版本开始有所改变,主要是因为引入了基于信号的抢占模式
。在程序启动时,初始化信号,并在 runtime.sighandler
函数注册了 SIGURG
信号的处理函数 runtime.doSigPreempt
,然后在触发垃圾回收的栈扫描
时或执行 sysmon
监控线程时,调用函数挂起goroutine,并向M发送信号,M收到信号后,会让当前goroutine陷入休眠继续执行其他的goroutine。
本篇从发送与接收信号并处理两方面来看一下它是如何实现的。
发送信号
在上篇文章( 认识sysmon监控线程)介绍 sysmon 的时候,我们知道监控线程会在无P的情况下一直运行,定期扫描所有的P,将长时间运行的G 进行解除。
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
......
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
......
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
}
......
}
通过 retake()
函数对所有 P
进行检查。
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
// 如果 P 运行得太久, 则抢占 G
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
// 如果超过了10ms就需要进行抢占了
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
......
}
}
如果一个 P
的 pd.schedwhen+forcePreemptNS <= now
,则说明P上的G运行的时间太长,则需要通过函数 [preemptone()](https://github.com/golang/go/blob/go1.16/src/runtime/proc.go#L5350-L5385)
进行抢占,其实就是将当前P绑定的G 中的 g.preempt
和 p.preempt
设置为 true
。
// src/runtime/proc.go
// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
// 获取p绑定的m
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
// 当前m绑定的g
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
// 设置g的抢占标识,表示当前的g允许被抢占,下面也会设置 p.preempt =true,这样其它G就可以和这个P进行绑定,重新生成一个 GMP
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
// 设置栈抢占 stackPreempt,这是一个很大的值比任何栈都大,
// 在 goroutine 内部的每次调用都会比较栈顶指针和 g.stackguard0,用以判断是否发生了栈溢出。
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
// 对P发一个异步抢占请示
if preemptMSupported && debug.asyncpreemptoff == 0 {
// 设置 p 的抢占标点
_p_.preempt = true
preemptM(mp)
}
return true
}
这里主要对 g 和 p 都设置了抢占标识位 preempt=true
,接着调用 [preemptM()](https://github.com/golang/go/blob/go1.16/src/runtime/signal_unix.go#L346-L375)
函数发送一个抢占请求到m。
// src/runtime/signal_unix.go
const preemptMSupported = true
// preemptM sends a preemption request to mp. This request may be
// handled asynchronously and may be coalesced with other requests to
// the M. When the request is received, if the running G or P are
// marked for preemption and the goroutine is at an asynchronous
// safe-point, it will preempt the goroutine. It always atomically
// increments mp.preemptGen after handling a preemption request.
func preemptM(mp *m) {
......
if atomic.Cas(&mp.signalPending, 0, 1) {
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, 1)
}
// If multiple threads are preempting the same M, it may send many
// signals to the same M such that it hardly make progress, causing
// live-lock problem. Apparently this could happen on darwin. See
// issue #37741.
// Only send a signal if there isn't already one pending.
signalM(mp, sigPreempt)
}
......
}
这里又调用了 signalM()
函数。
// src/runtime/os_darwin.go
func signalM(mp *m, sig int) {
pthread_kill(pthread(mp.procid), uint32(sig))
}
对于 pthread_kill()
函数我们不再详细介绍。
以上就是发送抢占信号的基本流程,下面我们再看看处理抢占信号的逻辑。
处理信号
给m发送的信息是 [sigPreempt](https://github.com/golang/go/blob/go1.16/src/runtime/signal_unix.go#L41-L71)
,它是一个常量
const sigPreempt = _SIGURG
对于它的详细说明,可以参考官方注释文档。
程序在启动的时候会把所有的信号都注册一次。
// Initialize signals.
// Called by libpreinit so runtime may not be initialized.
//go:nosplit
//go:nowritebarrierrec
func initsig(preinit bool) {
if !preinit {
// It's now OK for signal handlers to run.
signalsOK = true
}
// For c-archive/c-shared this is called by libpreinit with
// preinit == true.
if (isarchive || islibrary) && !preinit {
return
}
for i := uint32(0); i < _NSIG; i++ {
t := &sigtable[i]
if t.flags == 0 || t.flags&_SigDefault != 0 {
continue
}
// We don't need to use atomic operations here because
// there shouldn't be any other goroutines running yet.
fwdSig[i] = getsig(i)
if !sigInstallGoHandler(i) {
// Even if we are not installing a signal handler,
// set SA_ONSTACK if necessary.
if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
setsigstack(i)
} else if fwdSig[i] == _SIG_IGN {
sigInitIgnored(i)
}
continue
}
handlingSig[i] = 1
setsig(i, funcPC(sighandler)) // 注册信号对应的回调方法
}
}
再通过函数 [sighandler()](https://github.com/golang/go/blob/go1.16/src/runtime/signal_unix.go#L525-L697)
进行 _SIGURG
信号回调机制的注册。
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
......
// 如果是抢占信号
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
// Might be a preemption signal.
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}
......
}
然后是执行抢占信号事件
// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
// 执行抢占
ctxt.pushCall(funcPC(asyncPreempt), newpc)
}
}
// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, -1)
}
}
会先判断异步安全点 isAsyncSafePoint()
, 然后返回一个可以插入调用 asyncPreempt()
的PC。
总结
可以看到对于抢占过程大体上可以分两个步骤
runtime
在初始化时,注册一系列的信号量,并注册相应的回调函数,其中有一个抢占信号量_SIGURG
( 信号量介绍)- 在
sysmon
线程中调用retake()
函数来遍历 P 的执行时间是否过长(pd.schedwhen+forcePreemptNS <= now
),如果过长,则对这个G
和P
中的premmept
字段设置为true
表示允许被抢占标志;这时将自动 触发执行第一步中注册的回调函数doSigPreempt(gp, c)
参考资料
- https://mp.weixin.qq.com/s/59eBnrnoigz9A_J5uxIqRg
- https://zhuanlan.zhihu.com/p/216118842
- sysmon 后台监控线程
- https://cloud.tencent.com/developer/article/1450290
- http://xiaorui.cc/archives/6535
- https://www.jianshu.com/p/84b72ee645c2
- https://blog.csdn.net/Windgs_YF/article/details/93204185