本文借助 endless 源码来阐述。重点看 fork() 之后的逻辑。
func (srv *endlessServer) fork() (err error) {runningServerReg.Lock()defer runningServerReg.Unlock()// only one server instance should fork!if runningServersForked {return errors.New("Another process already forked. Ignoring this one.")}runningServersForked = truevar files = make([]*os.File, len(runningServers))var orderArgs = make([]string, len(runningServers))// get the accessor socket fds for _all_ server instancesfor _, srvPtr := range runningServers {// introspect.PrintTypeDump(srvPtr.EndlessListener)switch srvPtr.EndlessListener.(type) {case *endlessListener:// normal listenerfiles[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()default:// tls listenerfiles[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()}orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr}env := append(os.Environ(),"ENDLESS_CONTINUE=1",)if len(runningServers) > 1 {env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ",")))}path := os.Args[0]var args []stringif len(os.Args) > 1 {args = os.Args[1:]}cmd := exec.Command(path, args...)cmd.Stdout = os.Stdoutcmd.Stderr = os.Stderrcmd.ExtraFiles = filescmd.Env = enverr = cmd.Start()if err != nil {log.Fatalf("Restart: Failed to launch, error: %v", err)}return
}func (el *endlessListener) File() *os.File {// returns a dup(2) - FD_CLOEXEC flag *not* set// 这个注释不对tl := el.Listener.(*net.TCPListener)fl, _ := tl.File()return fl
}// File returns a copy of the underlying os.File.
// It is the caller's responsibility to close f when finished.
// Closing l does not affect f, and closing f does not affect l.
//
// The returned os.File's file descriptor is different from the
// connection's. Attempting to change properties of the original
// using this duplicate may or may not have the desired effect.
func (l *TCPListener) File() (f *os.File, err error) {if !l.ok() {return nil, syscall.EINVAL}f, err = l.file()if err != nil {return nil, &OpError{Op: "file", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}}return
}// dup 得到的是 fd 的一个副本
func (ln *TCPListener) file() (*os.File, error) {f, err := ln.fd.dup()if err != nil {return nil, err}return f, nil
}// Network file descriptor.
type netFD struct {pfd poll.FD// immutable until Closefamily intsotype intisConnected bool // handshake completed or use of association with peernet stringladdr Addrraddr Addr
}// net/fd_unix.go
func (fd *netFD) dup() (f *os.File, err error) {ns, call, err := fd.pfd.Dup()if err != nil {if call != "" {err = os.NewSyscallError(call, err)}return nil, err}return os.NewFile(uintptr(ns), fd.name()), nil
}// internal/poll/fd_mutex.go
// incref adds a reference to mu.
// It reports whether mu is available for reading or writing.// decref removes a reference from mu.
// It reports whether there is no remaining reference.// internal/poll/fd_unix.go
// Dup duplicates the file descriptor.
func (fd *FD) Dup() (int, string, error) {if err := fd.incref(); err != nil {return -1, "", err}defer fd.decref()return DupCloseOnExec(fd.Sysfd)
}// dupCloseOnExecOld is the traditional way to dup an fd and
// set its O_CLOEXEC bit, using two system calls.
func dupCloseOnExecOld(fd int) (int, string, error) {syscall.ForkLock.RLock()defer syscall.ForkLock.RUnlock()newfd, err := syscall.Dup(fd)if err != nil {return -1, "dup", err}// 为新的fd设置close_on_exec标志位syscall.CloseOnExec(newfd)return newfd, "", nil
}
fork()
调用 File()
方法得到的是 listener fd
的一个副本 fd-1
,并且 fd
和 fd-1
都被设置了 O_CLOEXEC
标志位。
1、关于dup
系统调用syscall.Dup(fd)
2、关于 O_CLOEXEC
在golang中,出于安全考虑,所有打开的文件描述符都会被设置 O_CLOEXEC,这使得子进程中继承过去的 fd 都会被内核关掉(这和传统的 exec
的效果不一样,默认子进程会继承父进程打开的 fd),因此,想要传递过去 fd ,就需要手动来操作,比如赋值 Cmd.ExtraFiles
。比如打开文件操作,以及上面的 socket fd 皆是如此。
func openFile(name string, flag int, perm FileMode) (file *File, err error) {r, e := syscall.Open(fixLongPath(name), flag|syscall.O_CLOEXEC, syscallMode(perm))if e != nil {return nil, e}return newFile(r, name, "file"), nil
}
在golang中,对于 Stdin, Stdout, Stderr 三个,可以通过 Cmd 结构体来传递,如果不传,会默认填充为/dev/null
。
这也解释了,为什么golang中可以使用 3+i
来给Cmd.ExtraFiles
中的文件编号,因为0-stdin,1-stdout,2-stderr
,多余的 fd 都被清掉了,剩下的就是Cmd.ExtraFiles
,而 fd 的编号是从可以使用的最小的开始,比如现在有0,1,2,3,4,5,6,然后4被释放了,新来的 fd 就是4,而不是7,
至此,fd-1 文件描述符就被成功传入子进程中,它是 fd 的副本,文件描述符都会对应一个内核中已经打开的文件,只不过 socket fd 比较特殊,它不是指向文件系统,而是保存在内存中的结构体SOCKET,在这个结构体里面有一个发送队列和一个接收队列。fd, fd-1 目前同时指向了这个SOCKET。
下面进入子进程的启动流程,首先是getListener()
func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {if srv.isChild {var ptrOffset uint = 0runningServerReg.RLock()defer runningServerReg.RUnlock()if len(socketPtrOffsetMap) > 0 {ptrOffset = socketPtrOffsetMap[laddr]}// 使用 fd 编号构建 *os.File 对象f := os.NewFile(uintptr(3+ptrOffset), "")// 使用此 *os.File 对象构建 net.Listener 对象l, err = net.FileListener(f)if err != nil {err = fmt.Errorf("net.FileListener error: %v", err)return}} else {l, err = net.Listen("tcp", laddr)if err != nil {err = fmt.Errorf("net.Listen error: %v", err)return}}return
}
fd-1 已经传递了进来,编号为3,但是还需要从代码层面进行包装才能被使用,首先构建成*os.File
对象,然后构建成net.Listener
对象,这里只是做了包装操作,并没有其他操作,因为 fd-1 对应的SOCKET已经处于监听状态了,这是父进程中net.Listen()
完成的。
至此,就变成了两个进程来消费SOCKET中的一个接收队列,这两个进程可以同时存在,以轮训的方式来消费。
// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)if err != nil {return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}}sl := &sysListener{ListenConfig: *lc,network: network,address: address,}var l Listenerla := addrs.first(isIPv4)switch la := la.(type) {case *TCPAddr:l, err = sl.listenTCP(ctx, la) // *******case *UnixAddr:l, err = sl.listenUnix(ctx, la)default:return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}}if err != nil {return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer}return l, nil
}func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)if err != nil {return nil, err}return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}// 在这里完成了 socker_create, socker_bind, socket_listen 等基础工作
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {raddr = raddr.toLocal(net)}family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}
然后去发信号关闭父进程。
func (el *endlessListener) Close() error {if el.stopped {return syscall.EINVAL}el.stopped = truereturn el.Listener.Close()
}
// internal/poll/fd_unix.go
// Close closes the FD. The underlying file descriptor is closed by the
// destroy method when there are no remaining references.
func (fd *FD) Close() error {if !fd.fdmu.increfAndClose() {return errClosing(fd.isFile)}// Unblock any I/O. Once it all unblocks and returns,// so that it cannot be referring to fd.sysfd anymore,// the final decref will close fd.sysfd. This should happen// fairly quickly, since all the I/O is non-blocking, and any// attempts to block in the pollDesc will return errClosing(fd.isFile).fd.pd.evict()// The call to decref will call destroy if there are no other// references.err := fd.decref()// Wait until the descriptor is closed. If this was the only// reference, it is already closed. Only wait if the file has// not been set to blocking mode, as otherwise any current I/O// may be blocking, and that would block the Close.// No need for an atomic read of isBlocking, increfAndClose means// we have exclusive access to fd.if fd.isBlocking == 0 {runtime_Semacquire(&fd.csema)}return err
}
Listener.Close()
只是关闭了fd,使其不再accept,而底层的SOCKET是由内核来回收的,当没有fd再指向它的时候就会被销毁。在这之前其实 fd-1 就已经指向了它,所以监听继续进行。
关于查看进程的fd
lsof -p 1299
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
testproc 1299 root cwd DIR 0,58 4096 415 /data/www/golang/project/hashcompare
testproc 1299 root rtd DIR 253,0 4096 128 /
testproc 1299 root txt REG 0,58 6223068 433 /data/www/golang/project/hashcompare/testproc
testproc 1299 root mem REG 253,0 2156240 33632456 /usr/lib64/libc-2.17.so
testproc 1299 root mem REG 253,0 142144 34128089 /usr/lib64/libpthread-2.17.so
testproc 1299 root mem REG 253,0 163312 34127918 /usr/lib64/ld-2.17.so
testproc 1299 root 0u CHR 136,0 0t0 3 /dev/pts/0
testproc 1299 root 1u CHR 136,0 0t0 3 /dev/pts/0
testproc 1299 root 2u CHR 136,0 0t0 3 /dev/pts/0
testproc 1299 root 3u REG 0,58 166 435 /data/www/golang/project/hashcompare/testproc_f1.txt
testproc 1299 root 4u a_inode 0,9 0 4554 [eventpoll]
testproc 1299 root 5r FIFO 0,8 0t0 936660 pipe
testproc 1299 root 6w FIFO 0,8 0t0 936660 pipe
testproc 1299 root 7u IPv6 936664 0t0 TCP *:6060 (LISTEN)
或者
ls -l /proc/1299/fd
总用量 0
lrwx------ 1 root root 64 11月 16 09:53 0 -> /dev/pts/0
lrwx------ 1 root root 64 11月 16 09:53 1 -> /dev/pts/0
lrwx------ 1 root root 64 11月 16 09:53 2 -> /dev/pts/0
lrwx------ 1 root root 64 11月 16 09:53 3 -> /data/www/golang/project/hashcompare/testproc_f1.txt
lrwx------ 1 root root 64 11月 16 09:53 4 -> anon_inode:[eventpoll]
lr-x------ 1 root root 64 11月 16 09:53 5 -> pipe:[936660]
l-wx------ 1 root root 64 11月 16 09:53 6 -> pipe:[936660]
lrwx------ 1 root root 64 11月 16 09:53 7 -> socket:[936664]
上一篇:SiPM使用总结