ReadSource¶
目录:
Gunicorn¶
Where to start¶
首先从gunicorn的 setup.py
开始吧.
里面的 entry_points
里这样:
entry_points="""
[console_scripts]
gunicorn=gunicorn.app.wsgiapp:run
gunicorn_django=gunicorn.app.djangoapp:run
gunicorn_paster=gunicorn.app.pasterapp:run
[paste.server_runner]
main=gunicorn.app.pasterapp:paste_server
"""
所以我们从最基础的 wsgiapp
看起.
WSGIApplication¶
入口就在这里:
def run():
"""Here is docstring"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
然后我们可以看到 WSGIApplication
这个class, 其实代码也不长:
class WSGIApplication(Application):
def init(self, parser, opts, args):
if opts.paste and opts.paste is not None:
app_name = 'main'
path = opts.paste
if '#' in path:
path, app_name = path.split('#')
path = os.path.abspath(os.path.normpath(
os.path.join(util.getcwd(), path)))
if not os.path.exists(path):
raise ConfigError("%r not found" % path)
# paste application, load the config
self.cfgurl = 'config:%s#%s' % (path, app_name)
self.relpath = os.path.dirname(path)
from .pasterapp import paste_config
return paste_config(self.cfg, self.cfgurl, self.relpath)
if len(args) < 1:
parser.error("No application module specified.")
self.cfg.set("default_proc_name", args[0])
self.app_uri = args[0]
def chdir(self):
# chdir to the configured path before loading,
# default is the current dir
os.chdir(self.cfg.chdir)
# add the path to sys.path
sys.path.insert(0, self.cfg.chdir)
def load_wsgiapp(self):
self.chdir()
# load the app
return util.import_app(self.app_uri)
def load_pasteapp(self):
self.chdir()
# load the paste app
from .pasterapp import load_pasteapp
return load_pasteapp(self.cfgurl, self.relpath, global_conf=self.cfg.paste_global_conf)
def load(self):
if self.cfg.paste is not None:
return self.load_pasteapp()
else:
return self.load_wsgiapp()
可以看到这个 WSGIApplication
类继承自 Application
.
所以, 我们需要进入到这个父类里一探究竟.
Application¶
class Application(BaseApplication):
def get_config_from_filename(self, filename):
if not os.path.exists(filename):
raise RuntimeError("%r doesn't exist" % filename)
cfg = {
"__builtins__": __builtins__,
"__name__": "__config__",
"__file__": filename,
"__doc__": None,
"__package__": None
}
try:
execfile_(filename, cfg, cfg)
except Exception:
# print("Failed to read config file: %s" % filename, file=sys.stderr)
traceback.print_exc()
sys.stderr.flush()
sys.exit(1)
return cfg
def get_config_from_module_name(self, module_name):
return util.import_module(module_name).__dict__
def load_config_from_module_name_or_filename(self, location):
"""
Loads the configuration file: the file is a python file, otherwise raise an RuntimeError
Exception or stop the process if the configuration file contains a syntax error.
"""
if location.startswith("python:"):
module_name = location[len("python:"):]
cfg = self.get_config_from_module_name(module_name)
else:
if location.startswith("file:"):
filename = location[len("file:"):]
else:
filename = location
cfg = self.get_config_from_filename(filename)
for k, v in cfg.items():
# Ignore unknown names
if k not in self.cfg.settings:
continue
try:
self.cfg.set(k.lower(), v)
except:
# print("Invalid value for %s: %s\n" % (k, v), file=sys.stderr)
sys.stderr.flush()
raise
return cfg
def load_config_from_file(self, filename):
return self.load_config_from_module_name_or_filename(location=filename)
def load_config(self):
# parse console args
parser = self.cfg.parser()
args = parser.parse_args()
# optional settings from apps
cfg = self.init(parser, args, args.args)
# Load up the any app specific configuration
if cfg and cfg is not None:
for k, v in cfg.items():
self.cfg.set(k.lower(), v)
if args.config:
self.load_config_from_file(args.config)
else:
default_config = get_default_config_file()
if default_config is not None:
self.load_config_from_file(default_config)
# Lastly, update the configuration with any command line
# settings.
for k, v in args.__dict__.items():
if v is None:
continue
if k == "args":
continue
self.cfg.set(k.lower(), v)
def run(self):
if self.cfg.check_config:
try:
self.load()
except:
msg = "\nError while loading the application:\n"
# print(msg, file=sys.stderr)
traceback.print_exc()
sys.stderr.flush()
sys.exit(1)
sys.exit(0)
if self.cfg.spew:
debug.spew()
if self.cfg.daemon:
util.daemonize(self.cfg.enable_stdio_inheritance)
# set python paths
if self.cfg.pythonpath and self.cfg.pythonpath is not None:
paths = self.cfg.pythonpath.split(",")
for path in paths:
pythonpath = os.path.abspath(path)
if pythonpath not in sys.path:
sys.path.insert(0, pythonpath)
super(Application, self).run()
这个 Application
类则继承一个Base类 BaseApplication
:
BaseApplication¶
class BaseApplication(object):
"""
An application interface for configuring and loading
the various necessities for any given web framework.
"""
def __init__(self, usage=None, prog=None):
self.usage = usage
self.cfg = None
self.callable = None
self.prog = prog
self.logger = None
self.do_load_config()
def do_load_config(self):
"""
Loads the configuration
"""
try:
self.load_default_config()
self.load_config()
except Exception as e:
# print("\nError: %s" % str(e), file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
def load_default_config(self):
# init configuration
self.cfg = Config(self.usage, prog=self.prog)
def init(self, parser, opts, args):
raise NotImplementedError
def load(self):
raise NotImplementedError
def load_config(self):
"""
This method is used to load the configuration from one or several input(s).
Custom Command line, configuration file.
You have to override this method in your class.
"""
raise NotImplementedError
def reload(self):
self.do_load_config()
if self.cfg.spew:
debug.spew()
def wsgi(self):
if self.callable is None:
self.callable = self.load()
return self.callable
def run(self):
try:
Arbiter(self).run()
except RuntimeError as e:
# print("\nError: %s\n" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(1)
所以当实例化 WSGIApplication
的时候, 第一步做的其实是 do_load_config()
, 这个方法中,
首先加载了默认配置 load_default_config()
, 其实就是实例化了 gunicorn.config.Config()
类.
然后则是加载我们实际的配置: load_config()
, load_config()
的时候做了一步 init()
的操作.
这个 init()
就是 WSGIApplication()
类里的 init()
方法. 当然, 我们可以覆写这个 init()
方法.
然后就是调用了 run()
这个方法. run()
方法里, 则尝试加载我们app. util.import_app(self.app_uri)
.
最后调用 BaseApplication
类的 run()
方法. 这个 run()
方法中则启动了我们所谓的 master
: Arbiter(self).run()
.
接下来, 我们就需要进入到 gunicorn.arbiter.Arbiter
中一探究竟了.
Main master loop¶
arbiter.Arbiter
这个类就是一个 main master loop.
run¶
首先看 run
这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | def run(self):
"Main master loop."
self.start()
util._setproctitle("master [%s]" % self.proc_name)
try:
self.manage_workers()
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
except StopIteration:
self.halt()
except KeyboardInterrupt:
self.halt()
except HaltServer as inst:
self.halt(reason=inst.reason, exit_status=inst.exit_status)
except SystemExit:
raise
except Exception:
self.log.info("Unhandled exception in main loop",
exc_info=True)
self.stop(False)
if self.pidfile is not None:
self.pidfile.unlink()
sys.exit(-1)
|
这个方法就是一个死循环, 不断的检测和管理 worker
. 所以 master
其实没干什么事.
只是用来管理 worker
的. 而 worker
才是真正干活的. 这个后面会详细说到.
self.start¶
现在我们来看 run
函数的第三行: self.start()
, 这个 start
就是用来
Initialize the arbiter. Start listening and set pidfile if needed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | def start(self):
"""\
Initialize the arbiter. Start listening and set pidfile if needed.
"""
self.log.info("Starting gunicorn %s", __version__)
if 'GUNICORN_PID' in os.environ:
self.master_pid = int(os.environ.get('GUNICORN_PID'))
self.proc_name = self.proc_name + ".2"
self.master_name = "Master.2"
self.pid = os.getpid()
if self.cfg.pidfile is not None:
pidname = self.cfg.pidfile
if self.master_pid != 0:
pidname += ".2"
self.pidfile = Pidfile(pidname)
self.pidfile.create(self.pid)
self.cfg.on_starting(self)
self.init_signals()
if not self.LISTENERS:
self.LISTENERS = create_sockets(self.cfg, self.log)
listeners_str = ",".join([str(l) for l in self.LISTENERS])
self.log.debug("Arbiter booted")
self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
self.log.info("Using worker: %s", self.cfg.worker_class_str)
# check worker class requirements
if hasattr(self.worker_class, "check_config"):
self.worker_class.check_config(self.cfg, self.log)
self.cfg.when_ready(self)
|
这个函数主要是看最后一段代码, 这个时候是 master
初始化后, 执行 when_ready
这个钩子.
这个钩子, 我们可以做一些事情, 给个例子:
# this is your application file
from gunicron.app.base import Application
def when_ready(server):
"""when gunicorn master start, do something yourself"""
class MyApp(Application):
def install_hooks(self):
self.cfg.set('when_ready', when_ready)
self.manage_workers()¶
def manage_workers(self):
"""\
Maintain the number of workers by spawning or killing
as required.
"""
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()
workers = self.WORKERS.items()
workers = sorted(workers, key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
active_worker_count = len(workers)
if self._last_logged_active_worker_count != active_worker_count:
self._last_logged_active_worker_count = active_worker_count
self.log.debug("{0} workers".format(active_worker_count),
extra={"metric": "gunicorn.workers",
"value": active_worker_count,
"mtype": "gauge"})
这个函数就是用来保证 worker
数量, 主要是通过 spawn_workers()
和 kill_worker
这两个函数.
Graceful timeout¶
本章主要介绍 gunicorn 是如何 gracefully 退出的.
gunicorn 需要优雅退出必须是 master 接受到一个 SIGTERM 信号( man kill 查看相关信息).
看一下 master 的信号处理器(信号处理函数):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | def handle_term(self):
"SIGTERM handling"
raise StopIteration
def handle_int(self):
"SIGINT handling"
self.stop(False)
raise StopIteration
def handle_quit(self):
"SIGQUIT handling"
self.stop(False)
raise StopIteration
def stop(self, graceful=True):
"""\
Stop workers
:attr graceful: boolean, If True (the default) workers will be
killed gracefully (ie. trying to wait for the current connection)
"""
self.LISTENERS = []
sig = signal.SIGTERM
if not graceful:
sig = signal.SIGQUIT
limit = time.time() + self.cfg.graceful_timeout
# instruct the workers to exit
self.kill_workers(sig)
# wait until the graceful timeout
while self.WORKERS and time.time() < limit:
time.sleep(0.1)
self.kill_workers(signal.SIGKILL)
|
上面只列出了几个关键函数,可以看到,master 在收到 SIGTERM 信号之后是直接 raise 一个 StopIteration , 在 master 的主循环中是有 catch 这个异常的
1 2 3 4 5 6 7 8 9 | while True:
try:
# 省略一些代码
except StopIteration:
self.halt()
except KeyboardInterrupt:
self.halt()
# 省略一些代码
|
所以 master 在收到 SIGTERM 信号之后调用 self.halt() 函数,看一看这个函数:
def halt(self, reason=None, exit_status=0):
""" halt arbiter """
self.stop()
self.log.info("Shutting down: %s", self.master_name)
if reason is not None:
self.log.info("Reason: %s", reason)
if self.pidfile is not None:
self.pidfile.unlink()
self.cfg.on_exit(self)
sys.exit(exit_status)
跟收到其他信号一样,调用 self.stop() , 这个函数已经在上方代码里列出了。
- 清空 self.LISTENERS, 这个属性里存放的就是建立的 socket 实例,一般来说这个列表里只有一个 socket
- 设置默认发给 worker 的信号为 SIGTERM, 如果参数 graceful 非 True (默认),则改为 SIGQUIT 信号.
- 给所有 worker 发送第二步中的信号.
- 如果有 worker 还没有退出则等待, 直到所有 worker 退出或者等待时间超过 config 中配置的 graceful_timeout (默认为30秒)
- 给所有超过 graceful_timeout 还没退出的 worker 发送 SIGKILL
至此, master 是如何通知 worker 优雅退出的已经讲完,下一章将剖析 worker 收到 master 发过来的信号后是如何处理的。
Workers¶
spawn_worker¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
self.WORKERS[pid] = worker
return pid
# Process Child
worker_pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker_pid)
if self.cfg.reuseport:
worker.sockets = create_sockets(self.cfg, self.log)
listeners_str = ",".join([str(l) for l in worker.sockets])
self.log.info("Listening at: %s (%s) using reuseport",
listeners_str,
worker_pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
except AppImportError as e:
self.log.debug("Exception while loading the application: \n%s",
traceback.format_exc())
print("%s" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(self.APP_LOAD_ERROR)
except:
self.log.exception("Exception in worker process:\n%s",
traceback.format_exc())
if not worker.booted:
sys.exit(self.WORKER_BOOT_ERROR)
sys.exit(-1)
finally:
self.log.info("Worker exiting (pid: %s)", worker_pid)
try:
worker.tmp.close()
self.cfg.worker_exit(self, worker)
except:
pass
|
首先, 实例化 worker_class
, 执行 pre_fork
这个钩子, 然后就是fork子进程, 在父进程中记录
worker进程的pid和worker实例, self.WORKERS[pid] = worker
.
在子进程中, 设置进程名记录logger等, 然后执行 post_fork
钩子, 最后这个子进程进入自己的处理函数,
worker.init_process
.
worker¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | class Worker(object):
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
PIPE = []
def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
"""\
This is called pre-fork so it shouldn't do anything to the
current process. If there's a need to make process wide
changes you'll want to do that in ``self.init_process()``.
"""
self.age = age
self.ppid = ppid
self.sockets = sockets
self.app = app
self.timeout = timeout
self.cfg = cfg
self.booted = False
self.aborted = False
self.reloader = None
self.nr = 0
jitter = randint(0, cfg.max_requests_jitter)
self.max_requests = cfg.max_requests + jitter or MAXSIZE
self.alive = True
self.log = log
self.tmp = WorkerTmp(cfg)
def __str__(self):
return "<Worker %s>" % self.pid
@property
def pid(self):
return os.getpid()
def notify(self):
"""\
Your worker subclass must arrange to have this method called
once every ``self.timeout`` seconds. If you fail in accomplishing
this task, the master process will murder your workers.
"""
self.tmp.notify()
def run(self):
"""\
This is the mainloop of a worker process. You should override
this method in a subclass to provide the intended behaviour
for your particular evil schemes.
"""
raise NotImplementedError()
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super(MyWorkerClass, self).init_process() so that the ``run()``
loop is initiated.
"""
# start the reloader
if self.cfg.reload:
def changed(fname):
self.log.info("Worker reloading: %s modified", fname)
os.kill(self.pid, signal.SIGQUIT)
self.reloader = Reloader(callback=changed).start()
# set environment' variables
if self.cfg.env:
for k, v in self.cfg.env.items():
os.environ[k] = v
util.set_owner_process(self.cfg.uid, self.cfg.gid)
# Reseed the random number generator
util.seed()
# For waking ourselves up
self.PIPE = os.pipe()
for p in self.PIPE:
util.set_non_blocking(p)
util.close_on_exec(p)
# Prevent fd inheritance
[util.close_on_exec(s) for s in self.sockets]
util.close_on_exec(self.tmp.fileno())
self.log.close_on_exec()
self.init_signals()
self.wsgi = self.app.wsgi()
self.cfg.post_worker_init(self)
# Enter main run loop
self.booted = True
self.run()
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
def handle_usr1(self, sig, frame):
self.log.reopen_files()
def handle_exit(self, sig, frame):
self.alive = False
def handle_quit(self, sig, frame):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
time.sleep(0.1)
sys.exit(0)
def handle_abort(self, sig, frame):
self.alive = False
self.cfg.worker_abort(self)
sys.exit(1)
def handle_error(self, req, client, addr, exc):
request_start = datetime.now()
addr = addr or ('', -1) # unix socket case
if isinstance(exc, (InvalidRequestLine, InvalidRequestMethod,
InvalidHTTPVersion, InvalidHeader, InvalidHeaderName,
LimitRequestLine, LimitRequestHeaders,
InvalidProxyLine, ForbiddenProxyRequest)):
status_int = 400
reason = "Bad Request"
if isinstance(exc, InvalidRequestLine):
mesg = "Invalid Request Line '%s'" % str(exc)
elif isinstance(exc, InvalidRequestMethod):
mesg = "Invalid Method '%s'" % str(exc)
elif isinstance(exc, InvalidHTTPVersion):
mesg = "Invalid HTTP Version '%s'" % str(exc)
elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)):
mesg = "%s" % str(exc)
if not req and hasattr(exc, "req"):
req = exc.req # for access log
elif isinstance(exc, LimitRequestLine):
mesg = "%s" % str(exc)
elif isinstance(exc, LimitRequestHeaders):
mesg = "Error parsing headers: '%s'" % str(exc)
elif isinstance(exc, InvalidProxyLine):
mesg = "'%s'" % str(exc)
elif isinstance(exc, ForbiddenProxyRequest):
reason = "Forbidden"
mesg = "Request forbidden"
status_int = 403
msg = "Invalid request from ip={ip}: {error}"
self.log.debug(msg.format(ip=addr[0], error=str(exc)))
else:
self.log.exception("Error handling request")
status_int = 500
reason = "Internal Server Error"
mesg = ""
if req is not None:
request_time = datetime.now() - request_start
environ = default_environ(req, client, self.cfg)
environ['REMOTE_ADDR'] = addr[0]
environ['REMOTE_PORT'] = str(addr[1])
resp = Response(req, client, self.cfg)
resp.status = "%s %s" % (status_int, reason)
resp.response_length = len(mesg)
self.log.access(resp, req, environ, request_time)
try:
util.write_error(client, status_int, reason, mesg)
except:
self.log.debug("Failed to send error message.")
def handle_winch(self, sig, fname):
# Ignore SIGWINCH in worker. Fixes a crash on OpenBSD.
return
|
这个类中, 主要是初始化了一些信号
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
主要注意的是, signal.SIGTERN
这个信号, 它的处理函数是 handle_exit
.
def handle_exit(self, sig, frame):
self.alive = False
只是将 worker 的状态标志为 self.alive = False
. 这就会使得worker能够在退出前处理未完成的请求.
在 init_process
的最后, 调用 self.run()
函数运行worker进程. self.run()
具体的实现需要看
worker的类型 self.worker_class
. 支持的类型定义在 gunicorn.workers.__init__.SUPPORTED_WORKERS
里.
Redis-py¶
Gevent¶
Flask¶
- config原理
- 路由原理
- Wsgi接口调用
- 理解session
- 理解threading.local
- 理解flask自己封装的thread local
- 理解g和request
- 理解app context和request context