如何进行gunicorn Arbiter 源码解析
时间:2023-05-13 01:36
如前文所述,Arbiter是gunicorn master进程的核心。Arbiter主要负责管理worker进程,包括启动、监控、杀掉Worker进程;同时,Arbiter在某些信号发生的时候还可以热更新(reload)App应用,或者在线升级gunicorn。Arbiter的核心代码在一个文件里面,代码量也不大,源码在此:https://github.com/benoitc/gunicorn。 Arbiter主要有以下方法: setup: 处理配置项,最重要的是worker数量和worker工作模型 init_signal: 注册信号处理函数 handle_xxx: 各个信号具体的处理函数 kill_worker,kill_workers: 向worker进程发信号 spawn_worker, spawn_workers: fork出新的worker进程 murder_workers: 杀掉一段时间内未响应的worker进程 manage_workers: 根据配置文件的worker数量,以及当前active的worker数量,决定是要fork还是kill worker进程 reexec: 接收到信号SIGUSR2调用,在线升级gunicorn reload: 接收到信号SIGHUP调用,会根据新的配置新启动worker进程,并杀掉之前的worker进程 sleep: 在没有信号处理的时候,利用select的timeout进行sleep,可被唤醒 wakeup: 通过向管道写消息,唤醒进程 run: 主循环 Arbiter真正被其他代码(Application)调用的函数只有__init__和run方法,在一句代码里: Arbiter(self).run() 上面代码中的self即为Application实例,其中__init__调用setup进行配置项设置。下面是run方法伪代码 关于fork子进程 fork子进程的代码在 spawn_worker, 源码如下: Arbiter.spawn_worker 主要流程: (1)加载worker_class并实例化(默认为同步模型 SyncWorker) (2)父进程(master进程)fork之后return,之后的逻辑都在子进程中运行 (3)调用worker.init_process 进入循环,新航道雅思培训的所有工作都在这个循环中 (4)循环结束之后,调用sys.exit(0) (5)最后,在finally中,记录worker进程的退出 下面是我自己写的一点代码,把主要的fork流程简化了一下 在测试环境下输出: fork sub process 9601 fork sub process 9602 sub process will exit 9601 9600 sub process will exit 9602 9600 main process will exit 9600 需要注意的是第20行调用了sys.exit, 保证子进程的结束,否则会继续main函数中for循环,以及之后的逻辑。注释掉第19行重新运行,看输出就明白了。 关于kill子进程 master进程要kill worker进程就很简单了,直接发信号,源码如下: 关于sleep与wakeup 我们再来看看Arbiter的sleep和wakeup。Arbiter在没有信号需要处理的时候会"sleep",当然,不是真正调用time.sleep,否则信号来了也不能第一时间处理。这里得实现比较巧妙,利用了管道和select的timeout。看代码就知道了 代码里面的注释写得非常清楚,要么PIPE可读立即返回,要么等待超时。管道可读是因为有信号发生。这里看看pipe函数 Create a pipe. Return a pair of file descriptors 那我们看一下什么时候管道可读:肯定是往管道写入的东西,这就是wakeup函数的功能 最后附上Arbiter的信号处理: 退出,INT:快速关闭 TERM: 优雅关机。等待工作人员完成其当前请求,直到超时。 HUP:重新加载配置,用新配置启动新的工作进程,并优雅地关闭旧的工作进程。如果应用程序未预加载(使用--preload选项),Gunicorn也将加载新版本。 TTIN:将进程数增加一个 TTOU:将进程数减少一个 USR1:重新打开日志文件 USR2:在飞行中升级Gunicorn。应使用单独的术语信号终止旧进程。此信号也可用于使用预加载应用程序的新版本。 绞盘:当Gunicorn被守护时,优雅地关闭工作进程。 以上就是如何进行gunicorn Arbiter 源码解析的详细内容,更多请关注Gxl网其它相关文章!def run() self.init_signal() self.LISTENERS = create_sockets(self.cfg, self.log) self.manage_workers() while True: if no signal in SIG_QUEUE self.sleep() else: handle_signal()
1 # prefork.py 2 import sys 3 import socket 4 import select 5 import os 6 import time 7 8 def do_sub_process(): 9 pid = os.fork()10 if pid < 0:11 print 'fork error'12 sys.exit(-1)13 elif pid > 0:14 print 'fork sub process %d' % pid15 return16 17 # must be child process18 time.sleep(1)19 print 'sub process will exit', os.getpid(), os.getppid()20 sys.exit(0)21 22 def main():23 sub_num = 224 for i in range(sub_num):25 do_sub_process()26 time.sleep(10)27 print 'main process will exit', os.getpid()28 29 if __name__ == '__main__':30 main()
1 def kill_worker(self, pid, sig): 2 """ 3 Kill a worker 4 5 :attr pid: int, worker pid 6 :attr sig: `signal.SIG*` value 7 """ 8 try: 9 os.kill(pid, sig)10 except OSError as e:11 if e.errno == errno.ESRCH:12 try:13 worker = self.WORKERS.pop(pid)14 worker.tmp.close()15 self.cfg.worker_exit(self, worker)16 return17 except (KeyError, OSError):18 return19 raise
def sleep(self): """ Sleep until PIPE is readable or we timeout. A readable PIPE means a signal occurred. """ ready = select.select([self.PIPE[0]], [], [], 1.0) # self.PIPE = os.pipe() if not ready[0]: return while os.read(self.PIPE[0], 1): pass
os.
pipe
()(r,w)
usable for reading and writing, respectively. def wakeup(self): """ Wake up the arbiter by writing to the PIPE """ os.write(self.PIPE[1], b'.')