golang中零停机重启服务之套接字复用,endless
创始人
2024-01-23 04:03:39
0

本文借助 endless 源码来阐述。重点看 fork() 之后的逻辑。

func (srv *endlessServer) fork() (err error) {runningServerReg.Lock()defer runningServerReg.Unlock()// only one server instance should fork!if runningServersForked {return errors.New("Another process already forked. Ignoring this one.")}runningServersForked = truevar files = make([]*os.File, len(runningServers))var orderArgs = make([]string, len(runningServers))// get the accessor socket fds for _all_ server instancesfor _, srvPtr := range runningServers {// introspect.PrintTypeDump(srvPtr.EndlessListener)switch srvPtr.EndlessListener.(type) {case *endlessListener:// normal listenerfiles[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()default:// tls listenerfiles[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()}orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr}env := append(os.Environ(),"ENDLESS_CONTINUE=1",)if len(runningServers) > 1 {env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ",")))}path := os.Args[0]var args []stringif len(os.Args) > 1 {args = os.Args[1:]}cmd := exec.Command(path, args...)cmd.Stdout = os.Stdoutcmd.Stderr = os.Stderrcmd.ExtraFiles = filescmd.Env = enverr = cmd.Start()if err != nil {log.Fatalf("Restart: Failed to launch, error: %v", err)}return
}func (el *endlessListener) File() *os.File {// returns a dup(2) - FD_CLOEXEC flag *not* set// 这个注释不对tl := el.Listener.(*net.TCPListener)fl, _ := tl.File()return fl
}// File returns a copy of the underlying os.File.
// It is the caller's responsibility to close f when finished.
// Closing l does not affect f, and closing f does not affect l.
//
// The returned os.File's file descriptor is different from the
// connection's. Attempting to change properties of the original
// using this duplicate may or may not have the desired effect.
func (l *TCPListener) File() (f *os.File, err error) {if !l.ok() {return nil, syscall.EINVAL}f, err = l.file()if err != nil {return nil, &OpError{Op: "file", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}}return
}// dup 得到的是 fd 的一个副本
func (ln *TCPListener) file() (*os.File, error) {f, err := ln.fd.dup()if err != nil {return nil, err}return f, nil
}// Network file descriptor.
type netFD struct {pfd poll.FD// immutable until Closefamily      intsotype      intisConnected bool // handshake completed or use of association with peernet         stringladdr       Addrraddr       Addr
}// net/fd_unix.go
func (fd *netFD) dup() (f *os.File, err error) {ns, call, err := fd.pfd.Dup()if err != nil {if call != "" {err = os.NewSyscallError(call, err)}return nil, err}return os.NewFile(uintptr(ns), fd.name()), nil
}// internal/poll/fd_mutex.go
// incref adds a reference to mu.
// It reports whether mu is available for reading or writing.// decref removes a reference from mu.
// It reports whether there is no remaining reference.// internal/poll/fd_unix.go
// Dup duplicates the file descriptor.
func (fd *FD) Dup() (int, string, error) {if err := fd.incref(); err != nil {return -1, "", err}defer fd.decref()return DupCloseOnExec(fd.Sysfd)
}// dupCloseOnExecOld is the traditional way to dup an fd and
// set its O_CLOEXEC bit, using two system calls.
func dupCloseOnExecOld(fd int) (int, string, error) {syscall.ForkLock.RLock()defer syscall.ForkLock.RUnlock()newfd, err := syscall.Dup(fd)if err != nil {return -1, "dup", err}// 为新的fd设置close_on_exec标志位syscall.CloseOnExec(newfd)return newfd, "", nil
}

fork() 调用 File() 方法得到的是 listener fd 的一个副本 fd-1,并且 fdfd-1 都被设置了 O_CLOEXEC 标志位。

1、关于dup系统调用syscall.Dup(fd)

  • dup系统调用会创建文件描述符的一个拷贝。
  • 新生成的文件描述符是进程当前可用的文件描述符编号的最小值。
  • 如果拷贝成功,两者都指向同一个打开的文件,因此共享所有的锁定,读写指针,和各项权限或标志位,因此可能存在交叉使用。
  • 如果拷贝错误则返回-1,错误代码存入errno中。
  • 调用dup族类函数得到的新文件描述符将清除O_CLOEXEC模式,这也是为什么源码中在dup之后重新设置O_CLOEXEC

2、关于 O_CLOEXEC

  • 在一个进程中通过exec调用启动一个子进程,这个子进程会继承父进程打开的fd以及进行操作,出于安全考虑,有些fd,我们要求在子进程中无法继承,就需要设置这些fd的 O_CLOEXEC ,那么在启动子进程成功之后,内核会关闭子进程中的这些fd。
  • 调用open函数 O_CLOEXEC 模式打开的文件描述符在执行exec调用得到的新程序中关闭,且为原子操作。
  • 调用open函数不使用 O_CLOEXEC 模式打开的文件描述符,然后调用 fcntl 函数设置 FD_CLOEXEC 选项,效果和使用 O_CLOEXEC 选项open函数相同,但分别调用open、fcnt两个l函数,不是原子操作,多线程环境中存在竞态条件,故用open函数O_CLOEXEC选项代替之。

在golang中,出于安全考虑,所有打开的文件描述符都会被设置 O_CLOEXEC,这使得子进程中继承过去的 fd 都会被内核关掉(这和传统的 exec 的效果不一样,默认子进程会继承父进程打开的 fd),因此,想要传递过去 fd ,就需要手动来操作,比如赋值 Cmd.ExtraFiles。比如打开文件操作,以及上面的 socket fd 皆是如此。

func openFile(name string, flag int, perm FileMode) (file *File, err error) {r, e := syscall.Open(fixLongPath(name), flag|syscall.O_CLOEXEC, syscallMode(perm))if e != nil {return nil, e}return newFile(r, name, "file"), nil
}

在golang中,对于 Stdin, Stdout, Stderr 三个,可以通过 Cmd 结构体来传递,如果不传,会默认填充为/dev/null

这也解释了,为什么golang中可以使用 3+i来给Cmd.ExtraFiles中的文件编号,因为0-stdin,1-stdout,2-stderr,多余的 fd 都被清掉了,剩下的就是Cmd.ExtraFiles,而 fd 的编号是从可以使用的最小的开始,比如现在有0,1,2,3,4,5,6,然后4被释放了,新来的 fd 就是4,而不是7,

至此,fd-1 文件描述符就被成功传入子进程中,它是 fd 的副本,文件描述符都会对应一个内核中已经打开的文件,只不过 socket fd 比较特殊,它不是指向文件系统,而是保存在内存中的结构体SOCKET,在这个结构体里面有一个发送队列和一个接收队列。fd, fd-1 目前同时指向了这个SOCKET。

下面进入子进程的启动流程,首先是getListener()

func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {if srv.isChild {var ptrOffset uint = 0runningServerReg.RLock()defer runningServerReg.RUnlock()if len(socketPtrOffsetMap) > 0 {ptrOffset = socketPtrOffsetMap[laddr]}// 使用 fd 编号构建 *os.File 对象f := os.NewFile(uintptr(3+ptrOffset), "")// 使用此 *os.File 对象构建 net.Listener 对象l, err = net.FileListener(f)if err != nil {err = fmt.Errorf("net.FileListener error: %v", err)return}} else {l, err = net.Listen("tcp", laddr)if err != nil {err = fmt.Errorf("net.Listen error: %v", err)return}}return
}

fd-1 已经传递了进来,编号为3,但是还需要从代码层面进行包装才能被使用,首先构建成*os.File对象,然后构建成net.Listener对象,这里只是做了包装操作,并没有其他操作,因为 fd-1 对应的SOCKET已经处于监听状态了,这是父进程中net.Listen()完成的。

至此,就变成了两个进程来消费SOCKET中的一个接收队列,这两个进程可以同时存在,以轮训的方式来消费。

// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)if err != nil {return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}}sl := &sysListener{ListenConfig: *lc,network:      network,address:      address,}var l Listenerla := addrs.first(isIPv4)switch la := la.(type) {case *TCPAddr:l, err = sl.listenTCP(ctx, la) // *******case *UnixAddr:l, err = sl.listenUnix(ctx, la)default:return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}}if err != nil {return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer}return l, nil
}func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)if err != nil {return nil, err}return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}// 在这里完成了 socker_create, socker_bind, socket_listen 等基础工作
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {raddr = raddr.toLocal(net)}family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

然后去发信号关闭父进程。

func (el *endlessListener) Close() error {if el.stopped {return syscall.EINVAL}el.stopped = truereturn el.Listener.Close()
}
// internal/poll/fd_unix.go
// Close closes the FD. The underlying file descriptor is closed by the
// destroy method when there are no remaining references.
func (fd *FD) Close() error {if !fd.fdmu.increfAndClose() {return errClosing(fd.isFile)}// Unblock any I/O.  Once it all unblocks and returns,// so that it cannot be referring to fd.sysfd anymore,// the final decref will close fd.sysfd. This should happen// fairly quickly, since all the I/O is non-blocking, and any// attempts to block in the pollDesc will return errClosing(fd.isFile).fd.pd.evict()// The call to decref will call destroy if there are no other// references.err := fd.decref()// Wait until the descriptor is closed. If this was the only// reference, it is already closed. Only wait if the file has// not been set to blocking mode, as otherwise any current I/O// may be blocking, and that would block the Close.// No need for an atomic read of isBlocking, increfAndClose means// we have exclusive access to fd.if fd.isBlocking == 0 {runtime_Semacquire(&fd.csema)}return err
}

Listener.Close()只是关闭了fd,使其不再accept,而底层的SOCKET是由内核来回收的,当没有fd再指向它的时候就会被销毁。在这之前其实 fd-1 就已经指向了它,所以监听继续进行。

关于查看进程的fd

lsof -p 1299
COMMAND   PID USER   FD      TYPE DEVICE SIZE/OFF     NODE NAME
testproc 1299 root  cwd       DIR   0,58     4096      415 /data/www/golang/project/hashcompare
testproc 1299 root  rtd       DIR  253,0     4096      128 /
testproc 1299 root  txt       REG   0,58  6223068      433 /data/www/golang/project/hashcompare/testproc
testproc 1299 root  mem       REG  253,0  2156240 33632456 /usr/lib64/libc-2.17.so
testproc 1299 root  mem       REG  253,0   142144 34128089 /usr/lib64/libpthread-2.17.so
testproc 1299 root  mem       REG  253,0   163312 34127918 /usr/lib64/ld-2.17.so
testproc 1299 root    0u      CHR  136,0      0t0        3 /dev/pts/0
testproc 1299 root    1u      CHR  136,0      0t0        3 /dev/pts/0
testproc 1299 root    2u      CHR  136,0      0t0        3 /dev/pts/0
testproc 1299 root    3u      REG   0,58      166      435 /data/www/golang/project/hashcompare/testproc_f1.txt
testproc 1299 root    4u  a_inode    0,9        0     4554 [eventpoll]
testproc 1299 root    5r     FIFO    0,8      0t0   936660 pipe
testproc 1299 root    6w     FIFO    0,8      0t0   936660 pipe
testproc 1299 root    7u     IPv6 936664      0t0      TCP *:6060 (LISTEN)

或者

ls -l /proc/1299/fd
总用量 0
lrwx------ 1 root root 64 11月 16 09:53 0 -> /dev/pts/0
lrwx------ 1 root root 64 11月 16 09:53 1 -> /dev/pts/0
lrwx------ 1 root root 64 11月 16 09:53 2 -> /dev/pts/0
lrwx------ 1 root root 64 11月 16 09:53 3 -> /data/www/golang/project/hashcompare/testproc_f1.txt
lrwx------ 1 root root 64 11月 16 09:53 4 -> anon_inode:[eventpoll]
lr-x------ 1 root root 64 11月 16 09:53 5 -> pipe:[936660]
l-wx------ 1 root root 64 11月 16 09:53 6 -> pipe:[936660]
lrwx------ 1 root root 64 11月 16 09:53 7 -> socket:[936664]

相关内容

热门资讯

画虎画皮难画骨的近义词 画虎画皮难画骨的近义词有:知人知面不知心,画虎画皮难画骨[huà hǔ huà pí nán huà...
胡言乱语的近义词 胡言乱语的近义词有:一片胡言,一簧两舌,乱语胡言,信口开河,信口雌黄,口不择言,天花乱坠,奇谈怪论,...
光天化日,光天化日的意思,光... 光天化日guāng tiān huà rì [释义]充满阳光的天空;化生万物的太阳。旧时比喻太...
特意的近义词   一、【近义词】特地  二、【基本解释】  [释义](副)特地。(同音)特地。  [构成]偏正式:...
官高爵显的近义词 官高爵显的近义词有:高官显爵,官高爵显[guān gāo jué xiǎn]的意思:爵:爵位,官爵;...
切要关头的近义词 切要关头的近义词有:紧要关头,切要关头[qiè yào guān tóu]的意思:关头:关口。比喻有...
面面俱到,面面俱到的意思,面... 面面俱到miàn miàn jù dào [释义]俱:都。各方面都照顾到。也指虽然各方面都照顾...
人单势孤的近义词 人单势孤的近义词有:势单力薄,人单势孤[rén dān shì gū]的意思:人数少,力量单薄。出自...
间断的近义词   一、【近义词】  中断、中止、拆开、休止  二、【基本解释】  [释义](动)(连续的事情)中间...
乡壁虚造的近义词 乡壁虚造的近义词有:凭空捏造,无中生有,面壁虚构,乡壁虚造[xiāng bì xū zào]的意思:...
盎盂相敲的近义词 盎盂相敲的近义词有:盎盂相击,盎盂相敲[àng yú xiāng qiāo]的意思:比喻一家人争吵。...
高枕而卧的近义词 高枕而卧的近义词有:无忧无虑,高枕安卧,高枕安寝,高枕无忧,高枕而卧[gāo zhěn ér wò]...
破题儿第一遭的近义词 破题儿第一遭的近义词有:破题儿,破题儿头一遭,破题儿第一遭[pò tí ér dì yī zāo]的...
如堕烟海的近义词 如堕烟海的近义词有:如坐云雾,如堕烟雾,雾里看花,如堕烟海[rú duò yān hǎi]的意思:好...
雁杳鱼沉的近义词 雁杳鱼沉的近义词有:信断音绝,雁断鱼沉,雁逝鱼沉,雁杳鱼沉[yàn yǎo yú chén]的意思:...
汪洋大海的近义词 汪洋大海的近义词有:东洋大海,声势浩大,波澜壮阔,汪洋大海[wāng yáng dà hǎi]的意思...
明效大验的近义词 明效大验的近义词有:明验大效,明效大验[míng xiào dà yà]的意思:显著而又巨大的效验。...
难乎为继的近义词 难乎为继的近义词有:难以为继,难乎为继[nán hū wéi jì]的意思:难于继续下去。出自:清 ...
在所难免的近义词 在所难免的近义词有:在劫难逃,在所不免,在所无免,在所难免[zài suǒ nán miǎn]的意思...
恨入心髓的近义词 恨入心髓的近义词有:恨之入骨,恨之切骨,恨入骨髓,恨入心髓[hèn rù xīn suǐ]的意思:恨...