ReadSource

目录:

Gunicorn

Where to start

首先从gunicorn的 setup.py 开始吧. 里面的 entry_points 里这样:

entry_points="""
[console_scripts]
gunicorn=gunicorn.app.wsgiapp:run
gunicorn_django=gunicorn.app.djangoapp:run
gunicorn_paster=gunicorn.app.pasterapp:run

[paste.server_runner]
main=gunicorn.app.pasterapp:paste_server
"""

所以我们从最基础的 wsgiapp 看起.

WSGIApplication

入口就在这里:

def run():
    """Here is docstring"""
    from gunicorn.app.wsgiapp import WSGIApplication
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()

然后我们可以看到 WSGIApplication 这个class, 其实代码也不长:

class WSGIApplication(Application):
    def init(self, parser, opts, args):
        if opts.paste and opts.paste is not None:
            app_name = 'main'
            path = opts.paste
            if '#' in path:
                path, app_name = path.split('#')
            path = os.path.abspath(os.path.normpath(
                os.path.join(util.getcwd(), path)))

            if not os.path.exists(path):
                raise ConfigError("%r not found" % path)

            # paste application, load the config
            self.cfgurl = 'config:%s#%s' % (path, app_name)
            self.relpath = os.path.dirname(path)

            from .pasterapp import paste_config
            return paste_config(self.cfg, self.cfgurl, self.relpath)

        if len(args) < 1:
            parser.error("No application module specified.")

        self.cfg.set("default_proc_name", args[0])
        self.app_uri = args[0]

    def chdir(self):
        # chdir to the configured path before loading,
        # default is the current dir
        os.chdir(self.cfg.chdir)

        # add the path to sys.path
        sys.path.insert(0, self.cfg.chdir)

    def load_wsgiapp(self):
        self.chdir()

        # load the app
        return util.import_app(self.app_uri)

    def load_pasteapp(self):
        self.chdir()

        # load the paste app
        from .pasterapp import load_pasteapp
        return load_pasteapp(self.cfgurl, self.relpath, global_conf=self.cfg.paste_global_conf)

    def load(self):
        if self.cfg.paste is not None:
            return self.load_pasteapp()
        else:
            return self.load_wsgiapp()

可以看到这个 WSGIApplication 类继承自 Application .

所以, 我们需要进入到这个父类里一探究竟.

Application

class Application(BaseApplication):

    def get_config_from_filename(self, filename):

        if not os.path.exists(filename):
            raise RuntimeError("%r doesn't exist" % filename)

        cfg = {
            "__builtins__": __builtins__,
            "__name__": "__config__",
            "__file__": filename,
            "__doc__": None,
            "__package__": None
        }
        try:
            execfile_(filename, cfg, cfg)
        except Exception:
            # print("Failed to read config file: %s" % filename, file=sys.stderr)
            traceback.print_exc()
            sys.stderr.flush()
            sys.exit(1)

        return cfg

    def get_config_from_module_name(self, module_name):
        return util.import_module(module_name).__dict__

    def load_config_from_module_name_or_filename(self, location):
        """
        Loads the configuration file: the file is a python file, otherwise raise an RuntimeError
        Exception or stop the process if the configuration file contains a syntax error.
        """

        if location.startswith("python:"):
            module_name = location[len("python:"):]
            cfg = self.get_config_from_module_name(module_name)
        else:
            if location.startswith("file:"):
                filename = location[len("file:"):]
            else:
                filename = location
            cfg = self.get_config_from_filename(filename)

        for k, v in cfg.items():
            # Ignore unknown names
            if k not in self.cfg.settings:
                continue
            try:
                self.cfg.set(k.lower(), v)
            except:
                # print("Invalid value for %s: %s\n" % (k, v), file=sys.stderr)
                sys.stderr.flush()
                raise

        return cfg

    def load_config_from_file(self, filename):
        return self.load_config_from_module_name_or_filename(location=filename)

    def load_config(self):
        # parse console args
        parser = self.cfg.parser()
        args = parser.parse_args()

        # optional settings from apps
        cfg = self.init(parser, args, args.args)

        # Load up the any app specific configuration
        if cfg and cfg is not None:
            for k, v in cfg.items():
                self.cfg.set(k.lower(), v)

        if args.config:
            self.load_config_from_file(args.config)
        else:
            default_config = get_default_config_file()
            if default_config is not None:
                self.load_config_from_file(default_config)

        # Lastly, update the configuration with any command line
        # settings.
        for k, v in args.__dict__.items():
            if v is None:
                continue
            if k == "args":
                continue
            self.cfg.set(k.lower(), v)

    def run(self):
        if self.cfg.check_config:
            try:
                self.load()
            except:
                msg = "\nError while loading the application:\n"
                # print(msg, file=sys.stderr)
                traceback.print_exc()
                sys.stderr.flush()
                sys.exit(1)
            sys.exit(0)

        if self.cfg.spew:
            debug.spew()

        if self.cfg.daemon:
            util.daemonize(self.cfg.enable_stdio_inheritance)

        # set python paths
        if self.cfg.pythonpath and self.cfg.pythonpath is not None:
            paths = self.cfg.pythonpath.split(",")
            for path in paths:
                pythonpath = os.path.abspath(path)
                if pythonpath not in sys.path:
                    sys.path.insert(0, pythonpath)

        super(Application, self).run()

这个 Application 类则继承一个Base类 BaseApplication:

BaseApplication

class BaseApplication(object):
    """
    An application interface for configuring and loading
    the various necessities for any given web framework.
    """
    def __init__(self, usage=None, prog=None):
        self.usage = usage
        self.cfg = None
        self.callable = None
        self.prog = prog
        self.logger = None
        self.do_load_config()

    def do_load_config(self):
        """
        Loads the configuration
        """
        try:
            self.load_default_config()
            self.load_config()
        except Exception as e:
            # print("\nError: %s" % str(e), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)

    def load_default_config(self):
        # init configuration
        self.cfg = Config(self.usage, prog=self.prog)

    def init(self, parser, opts, args):
        raise NotImplementedError

    def load(self):
        raise NotImplementedError

    def load_config(self):
        """
        This method is used to load the configuration from one or several input(s).
        Custom Command line, configuration file.
        You have to override this method in your class.
        """
        raise NotImplementedError

    def reload(self):
        self.do_load_config()
        if self.cfg.spew:
            debug.spew()

    def wsgi(self):
        if self.callable is None:
            self.callable = self.load()
        return self.callable

    def run(self):
        try:
            Arbiter(self).run()
        except RuntimeError as e:
            # print("\nError: %s\n" % e, file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)

所以当实例化 WSGIApplication 的时候, 第一步做的其实是 do_load_config(), 这个方法中, 首先加载了默认配置 load_default_config(), 其实就是实例化了 gunicorn.config.Config() 类. 然后则是加载我们实际的配置: load_config(), load_config() 的时候做了一步 init() 的操作. 这个 init() 就是 WSGIApplication() 类里的 init() 方法. 当然, 我们可以覆写这个 init() 方法. 然后就是调用了 run() 这个方法. run() 方法里, 则尝试加载我们app. util.import_app(self.app_uri) . 最后调用 BaseApplication 类的 run() 方法. 这个 run() 方法中则启动了我们所谓的 master: Arbiter(self).run().

接下来, 我们就需要进入到 gunicorn.arbiter.Arbiter 中一探究竟了.

Main master loop

arbiter.Arbiter 这个类就是一个 main master loop.

run

首先看 run 这个方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
 def run(self):
     "Main master loop."
     self.start()
     util._setproctitle("master [%s]" % self.proc_name)

     try:
         self.manage_workers()

         while True:
             self.maybe_promote_master()

             sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
             if sig is None:
                 self.sleep()
                 self.murder_workers()
                 self.manage_workers()
                 continue

             if sig not in self.SIG_NAMES:
                 self.log.info("Ignoring unknown signal: %s", sig)
                 continue

             signame = self.SIG_NAMES.get(sig)
             handler = getattr(self, "handle_%s" % signame, None)
             if not handler:
                 self.log.error("Unhandled signal: %s", signame)
                 continue
             self.log.info("Handling signal: %s", signame)
             handler()
             self.wakeup()
     except StopIteration:
         self.halt()
     except KeyboardInterrupt:
         self.halt()
     except HaltServer as inst:
         self.halt(reason=inst.reason, exit_status=inst.exit_status)
     except SystemExit:
         raise
     except Exception:
         self.log.info("Unhandled exception in main loop",
                       exc_info=True)
         self.stop(False)
         if self.pidfile is not None:
             self.pidfile.unlink()
         sys.exit(-1)

这个方法就是一个死循环, 不断的检测和管理 worker. 所以 master 其实没干什么事. 只是用来管理 worker 的. 而 worker 才是真正干活的. 这个后面会详细说到.

self.start

现在我们来看 run 函数的第三行: self.start(), 这个 start 就是用来 Initialize the arbiter. Start listening and set pidfile if needed.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 def start(self):
     """\
     Initialize the arbiter. Start listening and set pidfile if needed.
     """
     self.log.info("Starting gunicorn %s", __version__)

     if 'GUNICORN_PID' in os.environ:
         self.master_pid = int(os.environ.get('GUNICORN_PID'))
         self.proc_name = self.proc_name + ".2"
         self.master_name = "Master.2"

     self.pid = os.getpid()
     if self.cfg.pidfile is not None:
         pidname = self.cfg.pidfile
         if self.master_pid != 0:
             pidname += ".2"
         self.pidfile = Pidfile(pidname)
         self.pidfile.create(self.pid)
     self.cfg.on_starting(self)

     self.init_signals()
     if not self.LISTENERS:
         self.LISTENERS = create_sockets(self.cfg, self.log)

     listeners_str = ",".join([str(l) for l in self.LISTENERS])
     self.log.debug("Arbiter booted")
     self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
     self.log.info("Using worker: %s", self.cfg.worker_class_str)

     # check worker class requirements
     if hasattr(self.worker_class, "check_config"):
         self.worker_class.check_config(self.cfg, self.log)

     self.cfg.when_ready(self)

这个函数主要是看最后一段代码, 这个时候是 master 初始化后, 执行 when_ready 这个钩子. 这个钩子, 我们可以做一些事情, 给个例子:

# this is your application file

from gunicron.app.base import Application

def when_ready(server):
    """when gunicorn master start, do something yourself"""


class MyApp(Application):
    def install_hooks(self):
        self.cfg.set('when_ready', when_ready)
self.manage_workers()
def manage_workers(self):
    """\
    Maintain the number of workers by spawning or killing
    as required.
    """
    if len(self.WORKERS.keys()) < self.num_workers:
        self.spawn_workers()

    workers = self.WORKERS.items()
    workers = sorted(workers, key=lambda w: w[1].age)
    while len(workers) > self.num_workers:
        (pid, _) = workers.pop(0)
        self.kill_worker(pid, signal.SIGTERM)

    active_worker_count = len(workers)
    if self._last_logged_active_worker_count != active_worker_count:
        self._last_logged_active_worker_count = active_worker_count
        self.log.debug("{0} workers".format(active_worker_count),
                       extra={"metric": "gunicorn.workers",
                              "value": active_worker_count,
                              "mtype": "gauge"})

这个函数就是用来保证 worker 数量, 主要是通过 spawn_workers()kill_worker 这两个函数.

Graceful timeout

本章主要介绍 gunicorn 是如何 gracefully 退出的.

gunicorn 需要优雅退出必须是 master 接受到一个 SIGTERM 信号( man kill 查看相关信息).

看一下 master 的信号处理器(信号处理函数):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  def handle_term(self):
      "SIGTERM handling"
      raise StopIteration

  def handle_int(self):
      "SIGINT handling"
      self.stop(False)
      raise StopIteration

  def handle_quit(self):
      "SIGQUIT handling"
      self.stop(False)
      raise StopIteration

  def stop(self, graceful=True):
      """\
      Stop workers

      :attr graceful: boolean, If True (the default) workers will be
      killed gracefully  (ie. trying to wait for the current connection)
      """
      self.LISTENERS = []
      sig = signal.SIGTERM
      if not graceful:
          sig = signal.SIGQUIT
      limit = time.time() + self.cfg.graceful_timeout
      # instruct the workers to exit
      self.kill_workers(sig)
      # wait until the graceful timeout
      while self.WORKERS and time.time() < limit:
          time.sleep(0.1)

      self.kill_workers(signal.SIGKILL)

上面只列出了几个关键函数,可以看到,master 在收到 SIGTERM 信号之后是直接 raise 一个 StopIteration , 在 master 的主循环中是有 catch 这个异常的

1
2
3
4
5
6
7
8
9
  while True:
      try:
          # 省略一些代码
      except StopIteration:
          self.halt()
      except KeyboardInterrupt:
          self.halt()

      # 省略一些代码

所以 master 在收到 SIGTERM 信号之后调用 self.halt() 函数,看一看这个函数:

def halt(self, reason=None, exit_status=0):
    """ halt arbiter """
    self.stop()
    self.log.info("Shutting down: %s", self.master_name)
    if reason is not None:
        self.log.info("Reason: %s", reason)
    if self.pidfile is not None:
        self.pidfile.unlink()
    self.cfg.on_exit(self)
    sys.exit(exit_status)

跟收到其他信号一样,调用 self.stop() , 这个函数已经在上方代码里列出了。

  1. 清空 self.LISTENERS, 这个属性里存放的就是建立的 socket 实例,一般来说这个列表里只有一个 socket
  2. 设置默认发给 worker 的信号为 SIGTERM, 如果参数 gracefulTrue (默认),则改为 SIGQUIT 信号.
  3. 给所有 worker 发送第二步中的信号.
  4. 如果有 worker 还没有退出则等待, 直到所有 worker 退出或者等待时间超过 config 中配置的 graceful_timeout (默认为30秒)
  5. 给所有超过 graceful_timeout 还没退出的 worker 发送 SIGKILL

至此, master 是如何通知 worker 优雅退出的已经讲完,下一章将剖析 worker 收到 master 发过来的信号后是如何处理的。

Workers

spawn_worker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
 def spawn_worker(self):
     self.worker_age += 1
     worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                self.app, self.timeout / 2.0,
                                self.cfg, self.log)
     self.cfg.pre_fork(self, worker)
     pid = os.fork()
     if pid != 0:
         self.WORKERS[pid] = worker
         return pid

     # Process Child
     worker_pid = os.getpid()
     try:
         util._setproctitle("worker [%s]" % self.proc_name)
         self.log.info("Booting worker with pid: %s", worker_pid)
         if self.cfg.reuseport:
             worker.sockets = create_sockets(self.cfg, self.log)
             listeners_str = ",".join([str(l) for l in worker.sockets])
             self.log.info("Listening at: %s (%s) using reuseport",
                           listeners_str,
                           worker_pid)
         self.cfg.post_fork(self, worker)
         worker.init_process()
         sys.exit(0)
     except SystemExit:
         raise
     except AppImportError as e:
         self.log.debug("Exception while loading the application: \n%s",
                 traceback.format_exc())
         print("%s" % e, file=sys.stderr)
         sys.stderr.flush()
         sys.exit(self.APP_LOAD_ERROR)
     except:
         self.log.exception("Exception in worker process:\n%s",
                 traceback.format_exc())
         if not worker.booted:
             sys.exit(self.WORKER_BOOT_ERROR)
         sys.exit(-1)
     finally:
         self.log.info("Worker exiting (pid: %s)", worker_pid)
         try:
             worker.tmp.close()
             self.cfg.worker_exit(self, worker)
         except:
             pass

首先, 实例化 worker_class, 执行 pre_fork 这个钩子, 然后就是fork子进程, 在父进程中记录 worker进程的pid和worker实例, self.WORKERS[pid] = worker. 在子进程中, 设置进程名记录logger等, 然后执行 post_fork 钩子, 最后这个子进程进入自己的处理函数, worker.init_process.

worker

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
 class Worker(object):

     SIGNALS = [getattr(signal, "SIG%s" % x)
             for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]

     PIPE = []

     def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
         """\
         This is called pre-fork so it shouldn't do anything to the
         current process. If there's a need to make process wide
         changes you'll want to do that in ``self.init_process()``.
         """
         self.age = age
         self.ppid = ppid
         self.sockets = sockets
         self.app = app
         self.timeout = timeout
         self.cfg = cfg
         self.booted = False
         self.aborted = False
         self.reloader = None

         self.nr = 0
         jitter = randint(0, cfg.max_requests_jitter)
         self.max_requests = cfg.max_requests + jitter or MAXSIZE
         self.alive = True
         self.log = log
         self.tmp = WorkerTmp(cfg)

     def __str__(self):
         return "<Worker %s>" % self.pid

     @property
     def pid(self):
         return os.getpid()

     def notify(self):
         """\
         Your worker subclass must arrange to have this method called
         once every ``self.timeout`` seconds. If you fail in accomplishing
         this task, the master process will murder your workers.
         """
         self.tmp.notify()

     def run(self):
         """\
         This is the mainloop of a worker process. You should override
         this method in a subclass to provide the intended behaviour
         for your particular evil schemes.
         """
         raise NotImplementedError()

     def init_process(self):
         """\
         If you override this method in a subclass, the last statement
         in the function should be to call this method with
         super(MyWorkerClass, self).init_process() so that the ``run()``
         loop is initiated.
         """

         # start the reloader
         if self.cfg.reload:
             def changed(fname):
                 self.log.info("Worker reloading: %s modified", fname)
                 os.kill(self.pid, signal.SIGQUIT)
             self.reloader = Reloader(callback=changed).start()

         # set environment' variables
         if self.cfg.env:
             for k, v in self.cfg.env.items():
                 os.environ[k] = v

         util.set_owner_process(self.cfg.uid, self.cfg.gid)

         # Reseed the random number generator
         util.seed()

         # For waking ourselves up
         self.PIPE = os.pipe()
         for p in self.PIPE:
             util.set_non_blocking(p)
             util.close_on_exec(p)

         # Prevent fd inheritance
         [util.close_on_exec(s) for s in self.sockets]
         util.close_on_exec(self.tmp.fileno())

         self.log.close_on_exec()

         self.init_signals()

         self.wsgi = self.app.wsgi()

         self.cfg.post_worker_init(self)

         # Enter main run loop
         self.booted = True
         self.run()

     def init_signals(self):
         # reset signaling
         [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
         # init new signaling
         signal.signal(signal.SIGQUIT, self.handle_quit)
         signal.signal(signal.SIGTERM, self.handle_exit)
         signal.signal(signal.SIGINT, self.handle_quit)
         signal.signal(signal.SIGWINCH, self.handle_winch)
         signal.signal(signal.SIGUSR1, self.handle_usr1)
         signal.signal(signal.SIGABRT, self.handle_abort)

         # Don't let SIGTERM and SIGUSR1 disturb active requests
         # by interrupting system calls
         if hasattr(signal, 'siginterrupt'):  # python >= 2.6
             signal.siginterrupt(signal.SIGTERM, False)
             signal.siginterrupt(signal.SIGUSR1, False)

     def handle_usr1(self, sig, frame):
         self.log.reopen_files()

     def handle_exit(self, sig, frame):
         self.alive = False

     def handle_quit(self, sig, frame):
         self.alive = False
         # worker_int callback
         self.cfg.worker_int(self)
         time.sleep(0.1)
         sys.exit(0)

     def handle_abort(self, sig, frame):
         self.alive = False
         self.cfg.worker_abort(self)
         sys.exit(1)

     def handle_error(self, req, client, addr, exc):
         request_start = datetime.now()
         addr = addr or ('', -1)  # unix socket case
         if isinstance(exc, (InvalidRequestLine, InvalidRequestMethod,
                 InvalidHTTPVersion, InvalidHeader, InvalidHeaderName,
                 LimitRequestLine, LimitRequestHeaders,
                 InvalidProxyLine, ForbiddenProxyRequest)):

             status_int = 400
             reason = "Bad Request"

             if isinstance(exc, InvalidRequestLine):
                 mesg = "Invalid Request Line '%s'" % str(exc)
             elif isinstance(exc, InvalidRequestMethod):
                 mesg = "Invalid Method '%s'" % str(exc)
             elif isinstance(exc, InvalidHTTPVersion):
                 mesg = "Invalid HTTP Version '%s'" % str(exc)
             elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)):
                 mesg = "%s" % str(exc)
                 if not req and hasattr(exc, "req"):
                     req = exc.req  # for access log
             elif isinstance(exc, LimitRequestLine):
                 mesg = "%s" % str(exc)
             elif isinstance(exc, LimitRequestHeaders):
                 mesg = "Error parsing headers: '%s'" % str(exc)
             elif isinstance(exc, InvalidProxyLine):
                 mesg = "'%s'" % str(exc)
             elif isinstance(exc, ForbiddenProxyRequest):
                 reason = "Forbidden"
                 mesg = "Request forbidden"
                 status_int = 403

             msg = "Invalid request from ip={ip}: {error}"
             self.log.debug(msg.format(ip=addr[0], error=str(exc)))
         else:
             self.log.exception("Error handling request")

             status_int = 500
             reason = "Internal Server Error"
             mesg = ""

         if req is not None:
             request_time = datetime.now() - request_start
             environ = default_environ(req, client, self.cfg)
             environ['REMOTE_ADDR'] = addr[0]
             environ['REMOTE_PORT'] = str(addr[1])
             resp = Response(req, client, self.cfg)
             resp.status = "%s %s" % (status_int, reason)
             resp.response_length = len(mesg)
             self.log.access(resp, req, environ, request_time)

         try:
             util.write_error(client, status_int, reason, mesg)
         except:
             self.log.debug("Failed to send error message.")

     def handle_winch(self, sig, fname):
         # Ignore SIGWINCH in worker. Fixes a crash on OpenBSD.
         return

这个类中, 主要是初始化了一些信号

def init_signals(self):
    # reset signaling
    [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
    # init new signaling
    signal.signal(signal.SIGQUIT, self.handle_quit)
    signal.signal(signal.SIGTERM, self.handle_exit)
    signal.signal(signal.SIGINT, self.handle_quit)
    signal.signal(signal.SIGWINCH, self.handle_winch)
    signal.signal(signal.SIGUSR1, self.handle_usr1)
    signal.signal(signal.SIGABRT, self.handle_abort)

    # Don't let SIGTERM and SIGUSR1 disturb active requests
    # by interrupting system calls
    if hasattr(signal, 'siginterrupt'):  # python >= 2.6
        signal.siginterrupt(signal.SIGTERM, False)
        signal.siginterrupt(signal.SIGUSR1, False)

主要注意的是, signal.SIGTERN 这个信号, 它的处理函数是 handle_exit.

def handle_exit(self, sig, frame):
    self.alive = False

只是将 worker 的状态标志为 self.alive = False. 这就会使得worker能够在退出前处理未完成的请求. 在 init_process 的最后, 调用 self.run() 函数运行worker进程. self.run() 具体的实现需要看 worker的类型 self.worker_class. 支持的类型定义在 gunicorn.workers.__init__.SUPPORTED_WORKERS 里.

Redis-py

Gevent

Flask

  1. config原理
  2. 路由原理
  3. Wsgi接口调用
  4. 理解session
  5. 理解threading.local
  6. 理解flask自己封装的thread local
  7. 理解g和request
  8. 理解app context和request context

werkzeug

Celery

Kazoo