uwsgi代码学习#
概况#
https://uwsgi-docs.readthedocs.io/en/latest/
unbit/uwsgi
uwsgi项目的目标是开发全栈web服务。
主要是一个wsgi服务器,并包含了各种相关功能。实际使用中有茫茫多的选项。
囊括的知识点可以说是巨多。
包括linux进程/线程/协程相关,网络相关,python的c接口相关,等等。
很多功能做成了plugin的形式。
代码不算庞大,花一定时间是可以看个大概的。
可以学到uwsgi的各种功能和用法,更好地干活。
可以学到很多linux/c/python/工程知识。
下面把web服务相关的流程大概过一遍。
只看主要流程,忽略细节。
wsgi
https://peps.python.org/pep-3333/
https://wsgi.readthedocs.io/en/latest/learn.html
python
https://docs.python.org/3/c-api/
https://docs.python.org/3/extending/embedding.html
pthread
http://lemuria.cis.vtc.edu/~pchapin/TutorialPthread/pthread-Tutorial.pdf
https://docs.oracle.com/cd/E53394_01/html/E54803/tlib-1.html
https://www.ibm.com/docs/en/i/7.2?topic=category-pthread-apis
https://man7.org/linux/man-pages/man7/pthreads.7.html
https://www.cs.cmu.edu/afs/cs/academic/class/15492-f07/www/pthreads.html
https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/pthread.h.html
环境和配置#
linux
uwsgi 2.0.21
python 3.1x
实际环境中的主要组件
nginx
uwsgi
gevent/libev
flask
以下述配置看代码
[uwsgi]
# python插件参数
module = main
callable = main_app
# gevent插件参数
gevent = 256
gevent-early-monkey-patch = 1
# uwsgi的参数
chdir = /xxx/xxx/projects/xxx
touch-reload = /xxx/xxx/projects/xxx/main.py
socket = 0.0.0.0:10000
#location of log files
logto = /xxx/xxx/log/xxx.log
log-maxsize = 10000000 # 10m
single-interpreter = true
#run on subdir. really hard to find the solution
mount = /api/=main.py
manage-script-name = True
processes = 4
listen = 1024
reload-mercy = 20
worker-reload-mercy = 20
stats=0.0.0.0:10010
stats-http = True
# 其他应用
attach-daemon = python /xxx/xxx/projects/xxx/deploy/uwsgi_monitor.py
看代码时打开宏
UWSGI_EVENT_USE_EPOLL
PYTHREE
__linux__
UWSGI_LOCK_USE_MUTEX
代码分析#
整体结构主要是一个master进程,和多个worker进程。
几乎所有数据都在struct uwsgi_server,超大的结构体。
其中一些数据用mmap分配内存,可在进程间方便地共享。
master负责管理worker的状态。
worker负责接受大量请求并生成协程进行处理。
主要结构体#
// plugin是非常重要的内容。这个软件许多功能都是做成plugin的形式。
// plugin分为request plugin和generic plugin
// 主要看python和gevent两个plugin
/*
uwsgi --plugin-list 可列出加载的plugin
--show-config 可列出最终的配置
--logger-list
--cheaper-algo-list
--router-list
--loop-list
--imperial-monitor-list
--clock-list
--alarms-list
一起打印出来看看有个大概映像
uwsgi --plugin-list --show-config --logger-list --cheaper-algo-list --router-list --loop-list --imperial-monitor-list --clock-list --alarms-list
*/
// 我是用pip安装的uwsgi,列出来有四五十个plugin。
// 找了一圈没找到加载这些plugin的代码。估计是pip的版本编译时内置的,暂不纠结。
// 见 ULEP宏 UWSGI_LOAD_EMBEDDED_PLUGINS UWSGI_EMBED_PLUGINS
// 见buildconf文件夹
struct uwsgi_plugin {
const char *name; // 名字
// 各种回调
int (*init) (void);
// ...
}
// 一个app。例如一个flask项目。
struct uwsgi_app {
void *callable; // wsgi的callable名字
void **args; // 参数
void **environ; // 参数/环境
// 处理请求的回调
void *(*request_subhandler) (struct wsgi_request *, struct uwsgi_app *);
int (*response_subhandler) (struct wsgi_request *);
// ...
}
// 一个请求相关的数据
struct wsgi_request {
int fd;
int app_id;
int async_id; // 根据core数量从0开始编号。
void *async_args; // 参数
void *async_environ; // 参数/环境
// ...
}
// uwsgi_core用来承载一个请求
struct uwsgi_core {
int in_request; // 是否正在处理请求
struct wsgi_request req;
// ...
}
struct uwsgi_worker {
int id;
pid_t pid;
uint64_t status;
int manage_next_request;
int apps_cnt;
struct uwsgi_app *apps;
struct uwsgi_core *cores;
// ...
}
// mule功能
// https://uwsgi-docs.readthedocs.io/en/latest/Mules.html
struct uwsgi_mule {
int id;
pid_t pid;
}
// 某些plugin会用到mule,
// daemon是master另外维护的进程
// https://uwsgi-docs.readthedocs.io/en/latest/AttachingDaemons.html
// attach-daemon参数添加daemon程序
struct uwsgi_daemon {
char *command; // 要执行的命令
}
// uwsgi大状态
struct uwsgi_instance_status {
int gracefully_reloading;
int brutally_reloading;
int gracefully_destroying;
int brutally_destroying;
int chain_reloading;
int workers_reloading;
int is_cheap; // spawn workers only after the first request
int is_cleaning;
int dying_for_need_app;
};
// mmap分配各种共享数据
struct uwsgi_shared {
}
struct uwsgi_socket {
// 各种socket基本数据
int fd;
int family;
// https://man7.org/linux/man-pages/man7/epoll.7.html
// https://en.wikipedia.org/wiki/Epoll
// https://stackoverflow.com/questions/9162712/what-is-the-purpose-of-epolls-edge-triggered-option
// https://lwn.net/Articles/864947/
int edge_trigger; // epoll的trigger模式。是个大课题。可默认为level trigger
// 简单来说。编程模型不一样。(我没有系统地写过这种事件处理程序。可能理解有误)
// edge trigger是告诉你状态发生变化了,可读写了。
// 只通知一次,此时你要记住这次通知,尽可能多读/写,直到读写接口返回EAGAIN等。然后等待下次触发。
// level trigger是告知目前的状态是可读写数据。
// 我们可以更随意地安排读写,即使一次读写不完,过会再poll,还是能继续读写。
// edge trigger的区别在于,如果一次不读写完,你再poll,可能就poll不到事件,因为此时状态可能没有改变。
// 实际的例子。监听socket,有新客户端连上来,走py_uwsgi_gevent_main。
// 假设此时有3个客户端连上来。如果level trigger,可以accept一个就退出,再poll,还是能poll到后面的客户端。
// 而如果是edge trigger,就得不断尝试accept,直到确实没有新客户端。
// 如果此时accept第1个就退出,那么再poll就poll不到剩下的2个客户端,直到又有第4个客户端连上来才能再次poll到。那么就造成了第2/3个卡住得不到accept。
// 各种基本操作的配置。比如accept时用什么函数
int (*proto_accept) (struct wsgi_request *, int);
int (*proto) (struct wsgi_request *);
// ...
}
// uwsgi_server包含服务器各种主要数据,有将近1000个member。
struct uwsgi_server {
char hostname[256];
int hostname_len;
int need_app; // need-app参数。必须要有app,如果没有,kill_them_all()。
// 存一堆协议处理函数
int (*proto_hooks[UWSGI_PROTO_MAX_CHECK]) (struct wsgi_request *, char *, char *, uint16_t);
int numproc; // worker进程数量。processes或workers参数可设置。我们设置processes=4。
int async; // async参数或gevent参数。我们设置gevent=256
int threads; // threads参数。我们不设置,默认为1。
int cores; // = uwsgi.async或uwsgi.threads。gevent=256所以cores=256
struct uwsgi_instance_status status;
struct uwsgi_plugin *p[256]; // request plugin
struct uwsgi_plugin *gp[MAX_GENERIC_PLUGINS]; // generic plugin
int gp_cnt;
struct uwsgi_worker *workers;
pid_t mypid; // 我的pid
int mywid; // 我的worker id
char *loop; // 指定loop
// mule
// https://uwsgi-docs.readthedocs.io/en/latest/Mules.html
int muleid;
int mules_cnt;
int farms_cnt;
struct uwsgi_mule *mules;
struct uwsgi_farm *farms;
int listen_queue; // listen参数。set the socket listen queue size
int manage_next_request; // 指示worker是否还要处理下个请求。用在启动和停止时。
struct uwsgi_daemon *daemons;
int master_process; // 貌似。是否起主控进程。如果配置的选项有UWSGI_OPT_MASTER,就会打开。较多选项都是如此。
int master_queue; // io消息系统实例的fd。比如epoll。
char *zerg_server; // zerg模式
char *stats; // states服务。用于外部查询server状态
struct uwsgi_shared *shared; // mmap共享内存
struct uwsgi_protocol *protocols;
struct uwsgi_socket *sockets;
struct uwsgi_socket *shared_sockets;
// ...
}
// python plugin的主数据
struct uwsgi_python {
char *callable; // callable参数
// ...
}
// gevent plugin数据
struct uwsgi_gevent {
PyObject **watchers;
// ...
}
master#
uwsgi_server uwsgi; // server主数据
main
uwsgi_setup
signal(SIGCHLD, SIG_DFL); // 子进程结束。默认忽略
signal(SIGSEGV, uwsgi_segfault); // Invalid memory reference/Segmentation Fault
uwsgi_log("!!! uWSGI process %d got Segmentation Fault !!!\n", (int) getpid());
signal(signum, SIG_DFL);
signal(SIGFPE, uwsgi_fpe); // Floating-point exception
uwsgi_log("!!! uWSGI process %d got Floating Point Exception !!!\n", (int) getpid());
kill(getpid(), signum);
signal(SIGHUP, SIG_IGN); // Hangup detected on controlling terminal or death of controlling process
signal(SIGTERM, SIG_IGN); // Termination signal
signal(SIGPIPE, SIG_IGN); // Broken pipe: write to pipe with no readers
masterpid = getpid();
uwsgi_proto_hooks_setup
// 配置一些http协议的处理函数
getcwd
uwsgi_setup_schemes
// ...
uwsgi_register_clock
uwsgi_set_clock
// 注册进程结束时的回调
// fallback config
atexit(uwsgi_fallback_config);
// 貌似一个用一个备用的配置再次启动
// manage/flush logs
atexit(uwsgi_flush_logs);
// 貌似尝试读完pipe
// clear sockets, pidfiles// ...
atexit(vacuum);
// 各种清理/unlink
// call user scripts
atexit(uwsgi_exec_atexit);
// 自定义回调
// 创建进程间共享内存
uwsgi.shared = (struct uwsgi_shared *) uwsgi_calloc_shared(sizeof(struct uwsgi_shared));
uwsgi_malloc_shared
void *addr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
// mmap创建共享内存。须重点学习
// plugin初始化为空
for (i = 0; i < 256; i++) {
uwsgi.p[i] = &unconfigured_plugin;
}
uwsgi_init_default
// 设置大量默认参数
uwsgi_check_emperor // 帝王模式。可忽略
uwsgi_setup_reload
uwsgi_register_base_hooks
// 注册各种基本操作
uwsgi_register_logchunks
// ???
uwsgi_log_encoders_register_embedded
uwsgi_metrics_collectors_setup
UWSGI_LOAD_EMBEDDED_PLUGINS
build_options
// uwsgi_base_options是写死的选项
// 把这些选项和custom选项整理放到uwsgi.options
parse_sys_envs(UWSGI_ENVIRON);
uwsgi_commandline_config
// 对整理好的选项,用getopt_long解析命令行参数。
add_exported_option(optname, optarg, 0);
// 添加
uwsgi_configure
// 看情况对每个选项调用对应的uwsgi_option.func
// 执行一些需要先执行的流程比如exec_asap
// logto参数设置uwsgi.logfile
// 可做daemonize
//
uwsgi_setup_log
daemonize
logto
logto
// 可选log打到文件或udp socket
// setup offload engines
uwsgi_offload_engines_register_all();
// setup main loops
// worker的loop。重点。后续详细看流程
uwsgi_register_loop("simple", simple_loop);
uwsgi_register_loop("async", async_loop);
// setup cheaper algos
// cheaper机制。需要研究
// https://uwsgi-docs.readthedocs.io/en/latest/Cheaper.html
uwsgi_register_cheaper_algo("spare", uwsgi_cheaper_algo_spare);
uwsgi_register_cheaper_algo("backlog", uwsgi_cheaper_algo_backlog);
uwsgi_register_cheaper_algo("manual", uwsgi_cheaper_algo_manual);
// setup imperial monitors
uwsgi_register_imperial_monitor("dir", uwsgi_imperial_monitor_directory_init, uwsgi_imperial_monitor_directory);
uwsgi_register_imperial_monitor("glob", uwsgi_imperial_monitor_glob_init, uwsgi_imperial_monitor_glob);
// setup stats pushers
// 状态推送
// https://uwsgi-docs.readthedocs.io/en/latest/PushingStats.html
uwsgi_stats_pusher_setup();
// register embedded alarms
uwsgi_register_embedded_alarms();
// 可选择列出各种配置
show_config
plugins_list
loggers_list
cheaper_algo_list
router_list
loop_list
imperial_monitor_list
clocks_list
alarms_list
// 打印各种重要信息
uwsgi_log_initial("*** Starting uWSGI %s (%dbit) on [%.*s] ***\n", UWSGI_VERSION, (int) (sizeof(void *)) * 8, 24, ctime((const time_t *) &uwsgi.start_tv.tv_sec));
// 初始化shared_sockets
// shared-socket选项
uwsgi_setup_shared_sockets
uwsgi_emperor_start // 帝王模式。忽略
// jail系统
// https://uwsgi-docs.readthedocs.io/en/latest/Namespaces.html
uwsgi_start
uwsgi_set_cgroup // cgroup参数。忽略
// jail相关。namespace相关参数。忽略
// ...
uwsgi_log_initial("your memory page size is %d bytes\n", uwsgi.page_size);
uwsgi_setup_locking
// 配置uwsgi.lock_ops相关的操作
// 宏走的 #define UWSGI_LOCK_ENGINE_NAME "pthread robust mutexes"
// 惊群问题Thundering Herd
// https://uwsgi-docs.readthedocs.io/en/latest/articles/SerializingAccept.html
uwsgi_rpc_init
// uwsgi_calloc_shared创建rpc_table等共享数据
// 初始化所有uwsgi.sharedareas。忽略
uwsgi_sharedareas_init
uwsgi_sharedarea_init
// queue框架
// https://uwsgi-docs.readthedocs.io/en/latest/Queue.html
uwsgi_init_queue // 没用到
// caching框架
// https://uwsgi-docs.readthedocs.io/en/latest/Caching.html
uwsgi_cache_create_all // 没用到
uwsgi_alarms_init
// exception-handler选项
uwsgi_exception_setup_handlers
// 注册uwsgi/http等协议处理
uwsgi_protocols_register
uwsgi_register_protocol("uwsgi", uwsgi_proto_uwsgi_setup);
// gp的init
// no-server选项
uwsgi_setup_systemd();
uwsgi_setup_upstart();
uwsgi_setup_zerg();
uwsgi_setup_emperor();
uwsgi_setup_inherited_sockets
// 上面的一些操作可能会创建socket初始数据,放入uwsgi.sockets。
// 配置中的socket参数指定了主接收socket。也已经创建。
uwsgi_bind_sockets
// 进行实际的socket创建
// 对每个socket
if (tcp_port == NULL) {
uwsgi_sock->fd = bind_to_unix(uwsgi_sock->name, uwsgi.listen_queue, uwsgi.chmod_socket, uwsgi.abstract_socket);
// 分配sockaddr_un内存
serverfd = create_server_socket(AF_UNIX, SOCK_STREAM);
// 创建socket
// https://linux.die.net/man/2/socket
int serverfd = socket(domain, type, 0);
// setsockopt各种设置
// https://linux.die.net/man/2/bind
bind(serverfd, (struct sockaddr *) uws_addr, len + ((void *) uws_addr->sun_path - (void *) uws_addr))
// https://linux.die.net/man/2/listen
// uwsgi的listen参数是直接传到listen接口
listen(serverfd, listen_queue)
} else {
uwsgi_sock->fd = bind_to_tcp(uwsgi_sock->name, uwsgi.listen_queue, tcp_port);
socket_to_in_addr(socket_name, tcp_port, 0, (struct sockaddr_in *) &uws_addr);
uwsgi_resolve_ip
gethostbyname
inet_ntoa
serverfd = create_server_socket(AF_INET, SOCK_DGRAM);
// 根据配置进行各种setsockopt
bind(serverfd, (struct sockaddr *) &uws_addr, addr_len)
listen(serverfd, listen_queue)
}
uwsgi_set_sockets_protocols
// uwsgi_protocols_register中注册了各种协议
// 这里调用注册的setup函数
uwsgi_socket_setup_protocol
uwsgi_proto_uwsgi_setup
// 设置socket的各种基本操作函数。例如accept等等,对这些c库的socket操作进行封装。
// p plugin初始化
// plugin的post_init
// uwsgi.has_threads相关处理
pthread_mutex_init
// cron相关
// https://uwsgi-docs.readthedocs.io/en/latest/Cron.html
uwsgi_setup_workers
// numproc是processes或workers参数配置的
// 每个worker都会起进程
// 这里给uwsgi.workers起各种数据
// 用uwsgi_calloc_shared分配共享内存
// 那么master和各个worker(各进程间)就能读写同一份数据了
uwsgi.workers = (struct uwsgi_worker *) uwsgi_calloc_shared(sizeof(struct uwsgi_worker) * (uwsgi.numproc + 1));
for (i = 0; i <= uwsgi.numproc; i++) {
// allocate memory for apps
uwsgi.workers[i].apps = (struct uwsgi_app *) uwsgi_calloc_shared(sizeof(struct uwsgi_app) * uwsgi.max_apps);
// allocate memory for cores
uwsgi.workers[i].cores = (struct uwsgi_core *) uwsgi_calloc_shared(sizeof(struct uwsgi_core) * uwsgi.cores);
}
create_signal_pipe
uwsgi_setup_mules_and_farms
uwsgi_log("*** Operational MODE: preforking+async ***\n");
uwsgi_setup_metrics
// uwsgi.logformat
// spoolers
// https://uwsgi-docs.readthedocs.io/en/latest/Spooler.html
// plugin的preinit_apps
// lazy和lazy-apps参数
if (!uwsgi.lazy && !uwsgi.lazy_apps) {
uwsgi_init_all_apps();
// exec-pre-app参数
uwsgi_run_command_and_wait
fork
// 父进程
waitpid
// 子进程
uwsgi_run_command_do
execvp
// call-pre-app参数
uwsgi_call_symbol
dlsym
func
// plugin的init_apps()
uwsgi_python_init_apps
// 见下面python plugin流程
// mount参数
每个uwsgi.mounts
每个uwsgi.p
if (uwsgi.p[j]->mount_app)
uwsgi_log("mounting %s on %s\n", what, app_mps->value);
mount_app
uwsgi_python_mount_app
// 见下面的python plugin流程
// exec-post-app参数
// call-post-app参数
uwsgi_call_symbol
}
atexit(uwsgi_plugins_atexit);
// plugin的atexit()
// plugin的postinit_apps
// uwsgi.daemonize2
// 检查plugin是否已经配置
uwsgi_alarm_thread_start();
uwsgi_exceptions_handler_thread_start();
// plugin的master_fixup
spooler_start
uwsgi_notify_ready
// 生成worker。workers参数或processes参数。
uwsgi_respawn_worker(int wid)
// 设置uwsgi.workers[wid]的各种数据
// plugin的pre_uwsgi_fork
uwsgi_fork
if(master) // 父进程
plugin的post_uwsgi_fork
uwsgi.workers[wid].pid = pid; // 记录worker的pid
elif(worker) // 子进程
// plugin的post_uwsgi_fork
signal(SIGWINCH, worker_wakeup);
signal(SIGTSTP, worker_wakeup);
// 设置/初始化worker的各种数据
uwsgi.mywid = wid;
uwsgi.mypid = getpid();
uwsgi.workers[uwsgi.mywid].id = uwsgi.mywid;
// ...
uwsgi_fixup_fds
// plugin的master_fixup
uwsgi_run
master_loop // (用masterpid确定)
uwsgi_init_rb_timer // 初始化timer的红黑树
uwsgi_unix_signal(SIGTSTP, suspend_resume_them_all);
suspend_resume_them_all
// toggle所有worker的suppend状态?
// subscribe系列参数
uwsgi_subscribe_all(suspend, 1);
// 给每个worker发SIGTSTP
kill(uwsgi.workers[i].pid, SIGTSTP)
uwsgi_unix_signal(SIGHUP, grace_them_all);
grace_them_all
uwsgi.status.gracefully_reloading = 1;
uwsgi_destroy_processes
uwsgi_detach_daemons
对uwsgi.daemons中的所有
// https://www.man7.org/linux/man-pages/man2/kill.2.html
// 检测不正常的daemon。signal发0。
kill(ud->pid, 0)
// 如果做相应处理
// 发SIGKILL等等
// kill所有gateway
kill(ushared->gateways[i].pid, SIGKILL);
uwsgi_log("// ...gracefully killing workers// ...\n");
// 每个worker走uwsgi_curse(i, SIGHUP);
// 每个mule走uwsgi_curse_mule(i, SIGHUP);
uwsgi_unix_signal(SIGTERM, reap_them_all);
uwsgi_destroy_processes
// 每个worker走uwsgi_curse(i, SIGTERM);
// 每个mule走uwsgi_curse_mule(i, SIGTERM);
uwsgi_unix_signal(SIGQUIT, kill_them_all);
uwsgi_unix_signal(SIGINT, kill_them_all);
kill_them_all
uwsgi.status.brutally_destroying = 1;
// 每个worker走uwsgi_curse(i, SIGINT);
// 每个mule走uwsgi_curse_mule(i, SIGINT);
uwsgi_destroy_processes
uwsgi_unix_signal(SIGUSR1, stats);
stats
// 打印config和worker信息
atexit(uwsgi_master_cleanup_hooks);
uwsgi_master_cleanup_hooks
// 所有plugin调master_cleanup
uwsgi.master_queue = event_queue_init();
epfd = epoll_create(256);
event_queue_add_fd_read // 监听各种fd
epoll_ctl(eq, EPOLL_CTL_ADD, fd, &ee)
uwsgi_metrics_start_collector
pthread_create
uwsgi_metrics_loop
for(;;) {
// 收集metrics信息
sleep(1);
}
uwsgi_add_reload_fds
uwsgi_cache_start_sweepers
uwsgi_cache_start_sync_servers
// 各种功能初始化
// zerg_server初始化
// stats服务初始化
// stats参数和stats-http参数配置
uwsgi.stats_fd = bind_to_tcp(uwsgi.stats, uwsgi.listen_queue, tcp_port);
event_queue_add_fd_read(uwsgi.master_queue, uwsgi.stats_fd);
uwsgi_log("*** Stats server enabled on %s fd: %d ***\n", uwsgi.stats, uwsgi.stats_fd);
// uwsgi.stats_pusher_instances
// uwsgi.udp_socket
// udp服务器
uwsgi_setup_snmp
// uwsgi.status.is_cheap
uwsgi_mule // 初始化mule
uwsgi_fork
// 子进程
uwsgi_close_all_sockets
uwsgi_mule_run
uwsgi_mule_handler
for (;;) {
}
gateway_respawn
uwsgi_fork
// 子进程
ug->loop(id, ug->data);
// attach-daemon参数配置另外的daemon程序。在此启动
uwsgi_daemons_spawn_all
uwsgi_spawn_daemon
uwsgi_fork
// 子进程
daemon_spawn
uwsgi_exec_command_with_args
execvp
uwsgi_cache_sync_all
msync
// 处理各种touch
uwsgi_check_touches(uwsgi.touch_reload);
stat
// fs monitor
uwsgi_fsmon_setup
// uwsgi.requested_cheaper_algo
// master主循环
for (;;) {
uwsgi_master_check_death
uwsgi_master_check_reload
uwsgi_master_check_chain
uwsgi_master_check_mercy
uwsgi_daemons_smart_check
// https://linux.die.net/man/2/waitpid
// 等待某个子进程状态发生变化
diedpid = waitpid(WAIT_ANY, &waitpid_status, WNOHANG);
if (diedpid == -1) {
// 出错
}
if (diedpid == 0) { // 无改变。执行master的日常工作。
// 处理ushared->files_monitored
// 如果没注册进行注册
// 处理ushared->timers的注册
// 处理ushared->rb_timers的注册
// 检测ushared->rb_timers的超时
uwsgi_route_signal // 超时发送signal
uwsgi_signal_send // 往各种signal_pipe发
// 等一个事件。比如
rlen = event_queue_wait(uwsgi.master_queue, check_interval, &interesting_fd);
epoll_wait
uwsgi_update_load_counters
uwsgi_manage_signal_cron
// cron参数。可配置cron
uwsgi_manage_command_cron
uwsgi_cron_task_needs_execution
// 检查时间
uwsgi_run_command
fork
execvp
// 如果有事件,处理事件。
if (rlen > 0) {
// if the following function returns -1, a new worker has just spawned
if (uwsgi_master_manage_events(interesting_fd)) {
// 处理各种事件
// 比如我们配置了stats参数,从外部用http查询状态。会走uwsgi_send_stats传回状态。
return 0;
}
}
// logrotate操作
// logto参数设置
uwsgi_check_logrotate
uwsgi_master_check_idle
// idle参数和die-on-idle参数
// 检查idle状态。做一些操作。可能设为cheap mode。
master_check_listen_queue
// 检查socket状态
// getsockopt得到queue和max_queue
// 如果queue满了。打印。
// 这个出现在大量请求处理不过来的场景。
if (uwsgi_sock->queue > 0 && uwsgi_sock->queue >= uwsgi_sock->max_queue) {
uwsgi_log_verbose("*** uWSGI listen queue of socket \"%s\" (fd: %d) full !!! (%llu/%llu) ***\n", uwsgi_sock->name, uwsgi_sock->fd, (unsigned long long) uwsgi_sock->queue, (unsigned long long) uwsgi_sock->max_queue);
}
// check各种deadline
uwsgi_master_check_workers_deadline
// 各种原因导致需要杀死之类的操作。执行
// check-mountpoint参数。如果mountpoint不存在了。
uwsgi_master_check_mountpoints
uwsgi_check_mountpoint
statfs // 检查文件。如果不存在,uwsgi_nuclear_blast全完蛋。
// resubscribe every 10 cycles by default
uwsgi_subscribe_all
uwsgi_cache_sync_all
// https://man7.org/linux/man-pages/man2/msync.2.html
// 同步mmap共享数据
msync // synchronize a file with a memory map。
// 检查touch_reload
if (!uwsgi_instance_is_reloading && !uwsgi_instance_is_dying) {
char *touched = uwsgi_check_touches(uwsgi.touch_reload);
// 通过时间戳等手段检查是否touch
if (touched) {
uwsgi_log_verbose("*** %s has been touched// ... grace them all !!! ***\n", touched);
uwsgi_block_signal(SIGHUP);
grace_them_all(0);
uwsgi_unblock_signal(SIGHUP);
continue;
}
}
// 如果没有child状态改变,直接continue。
continue
}
// 有child状态改变
uwsgi_deadlock_check(diedpid); // lock相关暂未研究
// check各种death
uwsgi_master_check_daemons_death(diedpid)
uwsgi_master_check_cron_death(diedpid
// ...
int thewid = find_worker_id(diedpid);
if (thewid <= 0) { // 不是worker。基本只是uwsgi_log一下?
// 检查是否uwsgi.spoolers
// 检查是否uwsgi.mules
// 检查是否ushared->gateways
if (uwsgi_daemon_check_pid_death(diedpid))
continue
continue
}
// 是worker进程结束
uwsgi.workers[thewid].pid = 0;
if (WIFEXITED(waitpid_status) && WEXITSTATUS(waitpid_status) == UWSGI_FAILED_APP_CODE) {
if (uwsgi.lazy_apps && uwsgi.need_app) {
kill_them_all
}
}
// 处理各种waitpid_status
// 准备重新生成worker
// avoid fork bombing
// 如发现check_interval间隔内过于频繁地重启worker,会sleep一会。
// 见forkbomb-delay参数和check-interval参数
uwsgi_respawn_worker
// 见之前的分析
}
worker#
main
uwsgi_setup
流程见上面的master
uwsgi_run
此时进程身份是worker
// eventually maps (or disable) sockets for the worker
// map-socket参数。忽略。
uwsgi_map_sockets();
// worker-exec参数
if (uwsgi.worker_exec) {
execvp
exit(1);
}
// offload-threads参数
// https://uwsgi-docs.readthedocs.io/en/latest/OffloadSubsystem.html
if (uwsgi.offload_threads > 0) {
uwsgi_offload_thread_start
uwsgi_thread_new(uwsgi_offload_loop)
for (;;) {
event_queue_wait_multi
event_func
}
}
// plugin的post_fork
// worker-exec2参数
if (uwsgi.worker_exec2) {
execvp
exit(1);
}
// plugin的worker()。只有python的plugin用到。
uwsgi_python_worker
// 见下面的plugin流程。实际没什么用
uwsgi_worker_run
// lazy和lazy-apps参数
if (uwsgi.lazy || uwsgi.lazy_apps) {
uwsgi_init_all_apps();
// 见master
}
// 初始化请求的承载资源
// async_queue_unused里的每个wsgi_request对应一个http请求。
// 如果用尽,就不接受新的请求。
if (uwsgi.async > 1) { // gevent参数设置为256
// a stack of unused cores
uwsgi.async_queue_unused = uwsgi_malloc(sizeof(struct wsgi_request *) * uwsgi.async);
// fill it with default values
for (i = 0; i < uwsgi.async; i++) {
uwsgi.async_queue_unused[i] = &uwsgi.workers[uwsgi.mywid].cores[i].req;
}
// the first available core is the last one
uwsgi.async_queue_unused_ptr = uwsgi.async - 1;
}
uwsgi_unix_signal(SIGHUP, gracefully_kill);
// master通知worker正常停止。发SIGHUP
gracefully_kill
uwsgi.workers[uwsgi.mywid].manage_next_request = 0;
if (uwsgi.threads > 1) { // 这里应该是不走
wait_for_threads
pthread_mutex_lock(&uwsgi.six_feet_under_lock); // 争锁
遍历uwsgi.workers[uwsgi.mywid].cores
确认自己的thread_id存在
pthread_cancel(thread_id) // 尝试cancel线程。
pthread_join(thread_id) // join
}
if (uwsgi.async > 1) {
if (uwsgi.workers[uwsgi.mywid].shutdown_sockets)
uwsgi_shutdown_all_sockets();
对于所有uwsgi.sockets
shutdown
close
exit(UWSGI_RELOAD_CODE);
}
uwsgi_unix_signal(SIGINT, end_me);
uwsgi_unix_signal(SIGTERM, end_me);
end_me
exit // 直接exit
uwsgi_unix_signal(SIGUSR1, stats);
stats
打印信息
signal(SIGUSR2, (void *) &what_i_am_doing);
what_i_am_doing
打印信息
// 所有plugin->fixup
uwsgi.chdir2
// 清空req
// async_id按index设置
for (i = 0; i < uwsgi.cores; i++) {
memset(&uwsgi.workers[uwsgi.mywid].cores[i].req, 0, sizeof(struct wsgi_request));
uwsgi.workers[uwsgi.mywid].cores[i].req.async_id = i;
}
// 每个worker的cores[0]的thread_id,设为当前的thread_id。
if (uwsgi.cores > 1) {
uwsgi.workers[uwsgi.mywid].cores[0].thread_id = pthread_self();
pthread_mutex_init(&uwsgi.six_feet_under_lock, NULL);
}
uwsgi_ignition
// plugin的hijack_worker
uwsgi.workers[uwsgi.mywid].accepting = 1;
// 执行一些hook
uwsgi_hooks_run(uwsgi.hook_accepting, "accepting", 1);
// 三种loop
// 主要看gevent的loop
if (uwsgi.loop) {
void (*u_loop) (void) = uwsgi_get_loop(uwsgi.loop);
// 运行plugin的loop比如gevent/asyncio
u_loop
gevent_loop
// 见下方流程
} else {
// 其他loop。未详细看,可能有错。
if (uwsgi.async < 2) {
simple_loop(); // 简单loop
uwsgi_loop_cores_run(simple_loop_run);
pthread_create启动线程
simple_loop_run
event_queue_init
// 根据选择(epoll/kqueue/poll)初始化event_queue
uwsgi_add_sockets_to_queue(main_queue, core_id);
// 相关的socket进queue。比如epoll开始监听。
// 死循环监听事件/接收数据/处理请求
while (uwsgi.workers[uwsgi.mywid].manage_next_request) {
wsgi_req_setup
// 准备一个新的wsgi请求
wsgi_req_accept // 等待新链接
thunder_lock
event_queue_wait
epoll_wait
proto_accept
uwsgi_proto_base_accept
accept
wsgi_req_recv // 等待对端数据
proto
uwsgi_proto_uwsgi_parser
read
uwsgi_wait_read_req
wait_read_hook
uwsgi_simple_wait_read_hook
uwsgi.p[wsgi_req->uh->modifier1]->request(wsgi_req);
// wsgi协议处理
uwsgi_request_wsgi
init_uwsgi_app
wsgi_env_create
request_subhandler
uwsgi_request_subhandler_wsgi
python_call
PyObject_CallObject
uwsgi_close_request
}
}
else {
async_loop(); // 异步loop
uwsgi_async_init
event_queue_init
uwsgi_add_sockets_to_queue
// 死循环监听事件/接收数据/处理请求
while (uwsgi.workers[uwsgi.mywid].manage_next_request) {
event_queue_wait_multi // 等待事件
for(所有事件){
if(is_a_new_connection){
wsgi_req_setup
wsgi_req_simple_accept
proto_accept
wsgi_req_async_recv
}
if(!is_a_new_connection){
proto
uwsgi_proto_uwsgi_parser
read
runqueue_push // 插入可能的req
}
}
// 协程处理堆积的req
}
}
}
// 正常一直在上面的loop中运行。走不到这。
python plugin#
// 在uwsgi中嵌入python,执行python代码。
// https://docs.python.org/3/extending/embedding.html
init
uwsgi_python_init
// 在c环境中起python环境
// 然后可用python的c接口对python环境进行各种操作,执行外部python代码等等。
Py_GetVersion
Py_GetCompiler
// 检已初始化。获取gil。
if (Py_IsInitialized()) {
uwsgi_log("--- Python VM already initialized ---\n");
PyGILState_Ensure();
goto ready;
}
Py_SetProgramName // 没完全懂
Py_Initialize
preinit_apps
uwsgi_python_preinit_apps
init_pyargv
PySys_SetArgv(up.argc, up.py_argv);
PyDict_SetItemString(sys_dict, "executable", PyUnicode_FromString(up.executable));
init_uwsgi_embedded_module
// 主要初始化python的uwsgi module环境。
// 这样后续运行的外部python文件也也已调用uwsgi的各种函数。
// https://docs.python.org/3/c-api/typeobj.html
// https://docs.python.org/3/c-api/allocation.html#c.PyObject_New
// 初始化uwsgi_InputType配置。相当于创建一个新python类型uwsgi._Input。
// 在uwsgi_request_subhandler_wsgi里会PyObject_New创建实例。
// 实际的数据包含struct wsgi_request。基本包含了一个请求的各种数据。
PyType_Ready(&uwsgi_InputType)
// 做一个workers的tuple。每个存一个dict。
up.workers_tuple = PyTuple_New(uwsgi.numproc);
for (i = 0; i < uwsgi.numproc; i++) {
zero = PyDict_New();
Py_INCREF(zero);
PyTuple_SetItem(up.workers_tuple, i, zero);
}
// 创建uwsgi module
PyImport_AppendInittab("uwsgi", init_uwsgi3);
init_uwsgi3
PyModule_Create
// 获取uwsgi
new_uwsgi_module = PyImport_AddModule("uwsgi");
// 获取uwsgi.__dict__
up.embedded_dict = PyModule_GetDict(new_uwsgi_module);
// 对up.embedded_dict操作。往module里加各种参数/函数
// 处理与python plugin相关的配置参数。比如module和callable。参数添加到up.embedded_dict的opt
// uwsgi.sockets里的sockets添加到sockets
// magic_table等等
init_uwsgi_module_advanced
// 添加uwsgi_advanced_methods和uwsgi_metrics_methods
// 包含较多辅助功能。比如查询worker状态等
init_uwsgi_module_cache
// 添加uwsgi_cache_methods
// https://uwsgi-docs.readthedocs.io/en/latest/Queue.html
if (uwsgi.queue_size > 0) {
init_uwsgi_module_queue(new_uwsgi_module);
}
uwsgi_init_symbol_import
PyImport_ImportModule("uwsgi");
// 继续初始化一些类型,放到uwsgi模块。
init_uwsgi_vars
// 配置一些环境。path之类。不纠结
init_apps
// 初始化app,即Flask项目
uwsgi_python_init_apps
// 初始化up.loaders
// 我们配置中的module参数设置了up.wsgi_config。走uwsgi_uwsgi_loader
if (up.wsgi_config != NULL) {
init_uwsgi_app(LOADER_UWSGI, up.wsgi_config, uwsgi.wsgi_req, up.main_thread, PYTHON_APP_TYPE_WSGI);
// 这个函数新增一个app。可有多个app。
// 新增id
int id = uwsgi_apps_cnt;
wi = &uwsgi_apps[id];
// 按配置拿到flask项目的main.py里的main_app
wi->callable = up.loaders[loader](arg1);
uwsgi_uwsgi_loader(arg1)
quick_callable = up.callable; // 设置为main_app
// arg1为配置中的module=main。此时需要存在一个main.py作为入口module。
wsgi_dict = get_uwsgi_pydict(module);
wsgi_module = PyImport_ImportModule(module);
// 此时import了main.py。就已经实际运行了main.py。
// 我的main.py里触发一系列初始化动作,有打印log。此时可以看到log。
// 按wsgi协议,flask需要返回一个callable给uwsgi调用。
// main.py里最终要有个类似main_app = Flask("myapp")的流程。即用flask接口生成一个callable。
// 拿到main.py的__dict__
wsgi_dict = PyModule_GetDict(wsgi_module);
// 返回main.py里的main_app
return PyDict_GetItemString(wsgi_dict, quick_callable);
if (PyDict_Check((PyObject *)wi->callable)) {
// callable为dict。多个app。暂时忽略。
}
// 起uwsgi.cores个environ
wi->environ = malloc(sizeof(PyObject*)*uwsgi.cores);
if (!wi->environ) {
uwsgi_error("malloc()");
exit(1);
}
// 每个cores起dict
// environ后续存放一个请求的参数和一些环境数据
for(i=0;i<uwsgi.cores;i++) {
wi->environ[i] = PyDict_New();
if (!wi->environ[i]) {
uwsgi_log("unable to allocate new env dictionary for app\n");
exit(1);
}
}
if (app_type == PYTHON_APP_TYPE_WSGI) {
wi->request_subhandler = uwsgi_request_subhandler_wsgi;
wi->response_subhandler = uwsgi_response_subhandler_wsgi;
wi->argc = 2;
}
// 起uwsgi.cores个wi->args
// 每个arg设置为(0, wsgi_spitout)
// 处理请求时会把0设置为wsgi_req->async_environ。最后传给flask流程。
if (app_type == PYTHON_APP_TYPE_WSGI) {
// prepare sendfile() for WSGI app
wi->sendfile = PyCFunction_New(uwsgi_sendfile_method, NULL);
wi->eventfd_read = PyCFunction_New(uwsgi_eventfd_read_method, NULL);
wi->eventfd_write = PyCFunction_New(uwsgi_eventfd_write_method, NULL);
}
// 设置wi->gateway_version
// 设置wi->uwsgi_version
// wi->uwsgi_node
// 打印
if (app_type == PYTHON_APP_TYPE_WSGI) {
uwsgi_log( "WSGI app %d (mountpoint='%.*s') ready in %d seconds on interpreter %p pid: %d%s\n", id, wi->mountpoint_len, wi->mountpoint, (int) wi->startup_time, wi->interpreter, (int) getpid(), default_app);
}
// emulate COW
// copy on write?
uwsgi_emulate_cow_for_apps(id);
}
mount_app
// 和uwsgi_python_init_apps基本一个意思。加载app。
uwsgi_python_mount_app(char *mountpoint, char *app)
// 配置中 mount = /api/=main.py
// 那么app=main.py。mountpoint=/api/
// 设置uwsgi.wsgi_req的一些数据
id = init_uwsgi_app(LOADER_MOUNT, app, uwsgi.wsgi_req, up.main_thread, PYTHON_APP_TYPE_WSGI);
// 和上面的LOADER_UWSGI大同小异。主要是loader的区别
uwsgi_mount_loader
callable = uwsgi_file_loader((void *)what);
wsgi_file_module = uwsgi_pyimport_by_filename(py_filename, filename);
// 读py文件
// parse and compile
py_compiled_node = Py_CompileString(pycontent, real_filename, Py_file_input);
// 运行。返回module。
py_file_module = PyImport_ExecCodeModule(name, py_compiled_node);
return py_file_module;
wsgi_file_dict = PyModule_GetDict(wsgi_file_module);
wsgi_file_callable = PyDict_GetItemString(wsgi_file_dict, callable);
return wsgi_file_callable;
worker
uwsgi_python_worker()
// 一般忽略
if (!up.worker_override)
return 0;
配置中同时配置了module和mount。其实是错的,这样实际会起2个uwsgi的app,走同一个flask项目。
可把module参数去掉。
多个app的例子?
https://uwsgi-docs.readthedocs.io/en/latest/Snippets.html#multiple-flask-apps-in-different-mountpoints
gevent plugin#
// worker最后走gevent_loop
gevent_loop
PyObject *gevent_dict = get_uwsgi_pydict("gevent"); // import gevent
PyImport_ImportModule
PyModule_GetDict
// 检查version
if (ugevent.monkey) {
monkey_patch();
}
// PyDict_GetItemString
// PyCFunction_New
// 获取和定义各种功能
ugevent.spawn = PyDict_GetItemString(gevent_dict, "spawn");
// 直接用sleep触发切换,把自己切换出去。
ugevent.greenlet_switch = PyDict_GetItemString(gevent_dict, "sleep");
if (!ugevent.greenlet_switch) uwsgi_pyexit;
PyObject *gevent_get_hub = PyDict_GetItemString(gevent_dict, "get_hub");
ugevent.hub = python_call(gevent_get_hub, PyTuple_New(0), 0, NULL);
// https://www.gevent.org/api/gevent.hub.html#gevent.hub.get_hub
// ugevent.hub是默认打头的协程,相当于根协程,在它基础上不断派生新的协程。
// 可作为调度者
// 当请求处理的协程需要等待io而切换时,就切换回ugevent.hub。
// https://www.gevent.org/api/gevent.hub.html#the-event-loop
ugevent.hub_loop = PyObject_GetAttrString(ugevent.hub, "loop");
PyObject *uwsgi_gevent_main = PyCFunction_New(uwsgi_gevent_main_def, NULL);
// ...
// greenlet to run at each request
// 设置greenlet的处理函数。处理一个http请求。
PyObject *uwsgi_request_greenlet = PyCFunction_New(uwsgi_gevent_request_def, NULL);
// 设置第一个参数为处理函数
ugevent.greenlet_args = PyTuple_New(2);
PyTuple_SetItem(ugevent.greenlet_args, 0, uwsgi_request_greenlet);
py_uwsgi_gevent_request
// 初始化watcher
ugevent.watchers = uwsgi_malloc(sizeof(PyObject *) * uwsgi_count_sockets(uwsgi.sockets));
while(uwsgi_sock) {
// https://www.gevent.org/api/gevent.hub.html#gevent._interfaces.ILoop.io
// 调用gevent的hub_loop的io函数。创建watcher。
// PyObject_CallMethod的参数说明见https://docs.python.org/3/c-api/arg.html
ugevent.watchers[i] = PyObject_CallMethod(ugevent.hub_loop, "io", "ii", uwsgi_sock->fd, 1);
// start启动watcher
// https://www.gevent.org/api/gevent.hub.html#gevent._interfaces.IWatcher
PyObject_CallMethod(ugevent.watchers[i], "start", "Ol", uwsgi_gevent_main, (long)uwsgi_sock);
// 这里设置了watcher的回调。如果有客户端连上服务器,socket可accept,就走uwsgi_gevent_main。
py_uwsgi_gevent_main
// 申请一个wsgi_req资源
wsgi_req = find_first_available_wsgi_req();
// 取uwsgi.async_queue_unused中最后一个
// async_queue_unused是看作资源队列而不是任务队列
wsgi_req_setup(wsgi_req, wsgi_req->async_id, uwsgi_sock);
// 给wsgi_req填各种数据
// async_id是资源的index编号,定死的。
// 这个请求就绑定在async_id了。
// 也就绑定在当前worker的cores[async_id]这个core了。
// 设置core的in_request=1
// 实际accept新的socket
wsgi_req_simple_accept(wsgi_req, uwsgi_sock->fd)
wsgi_req->fd = wsgi_req->socket->proto_accept(wsgi_req, fd);
uwsgi_proto_base_accept
accept
// 设置第二个参数
PyTuple_SetItem(ugevent.greenlet_args, 1, PyLong_FromLong((long)wsgi_req));
// spawn协程。传入greenlet_args
PyObject *new_gl = python_call(ugevent.spawn, ugevent.greenlet_args, 0, NULL);
py_uwsgi_gevent_request
// 取出wsgi_req
PyObject *py_wsgi_req = PyTuple_GetItem(args, 0);
// 到此wsgi_req只是做了初始化,分配资源等。
// 不断读socket。并尝试解析请求。
for(;;) {
int ret = uwsgi.wait_read_hook(wsgi_req->fd, uwsgi.socket_timeout);
// 准备读。等到真的可读或者超时时,切回来处理。
// 有异常的话跳出循环
uwsgi_gevent_wait_read_hook
// 给wsgi_req->fd创建io watcher监听io数据
PyObject *watcher = PyObject_CallMethod(ugevent.hub_loop, "io", "ii", fd, 1);
// 创建time watcher监听超时
PyObject *timer = PyObject_CallMethod(ugevent.hub_loop, "timer", "i", timeout);
PyObject *current_greenlet = GET_CURRENT_GREENLET;
PyObject *current = PyObject_GetAttrString(current_greenlet, "switch");
// 启动watcher。
// 回调操作为current_greenlet.switch。
// 即切换到current_greenlet也就是切换回此协程进行下一步操作。
ret = PyObject_CallMethod(watcher, "start", "OO", current, watcher);
// 同样启动timer
ret = PyObject_CallMethod(timer, "start", "OO", current, timer);
// 此时切换回hub。当socket有数据或者timer超时时切回来。
ret = PyObject_CallMethod(ugevent.hub, "switch", NULL);
// 回到此地。清理watcher
stop_the_watchers_and_clear
// 走到这说明socket有数据或超时。
// 尝试初步解析。根据数据长度判断是否为合法的wsgi请求。
// 如果有错跳出循环
// 如果数据不全继续循环读
// 如果成功解析,跳出并处理请求。
int status = wsgi_req->socket->proto(wsgi_req);
uwsgi_proto_uwsgi_setup里设置了uwsgi_sock->proto = uwsgi_proto_uwsgi_parser;
// 解析wsgi请求
uwsgi_proto_uwsgi_parser
// 读socket
read(wsgi_req->fd, ptr + wsgi_req->proto_parser_pos, (uwsgi.buffer_size + 4) - wsgi_req->proto_parser_pos);
// 判断长度
}
// 如果是一个合法的请求
for(;;) {
// plugin的request
if (uwsgi.p[wsgi_req->uh->modifier1]->request(wsgi_req) <= UWSGI_OK) {
// python plugin的request
uwsgi_request_wsgi
uwsgi_parse_vars(wsgi_req)
// 相当多的具体数据解析。不细看了。
wsgi_req->async_environ = up.wsgi_env_create(wsgi_req, wi);
// 处理请求
request_subhandler
uwsgi_request_subhandler_wsgi
// 各种数据填到wsgi_req->async_environ
// 最后放到wsgi_req->async_args
// 涉及wsgi协议
wsgi_req->async_app = wi->callable;
// 最后调用callable。也就是配置里的main_app。
// 到此请求就转到flask项目了。
return python_call(wsgi_req->async_app, wsgi_req->async_args, uwsgi.catch_exceptions, wsgi_req);
PyObject_CallObject
// 返回请求
response_subhandler
uwsgi_response_subhandler_wsgi
int ret = uwsgi_python_send_body(wsgi_req, pychunk);
uwsgi_response_write_body_do
// 各种写socket。不细看
// 包括返回文件uwsgi_proto_base_sendfile的情况
// 最后走linux的sendfile()
goto end;
}
wsgi_req->switches++;
// switch after each yield
GEVENT_SWITCH
// 即sleep(0)。切换出去。
}
uwsgi_close_request(wsgi_req);
// 收尾工作
// 设置状态
// close socket
// 各种free
free_req_queue // 释放req资源
// 其他收尾工作
// 到此一个http请求结束了
}
// 一些signal配置
python_call(ugevent.signal, ge_signal_tuple, 0, NULL);
// 体面地结束worker
py_uwsgi_gevent_graceful
// uwsgi_log("Gracefully killing worker %d (pid: %d)// ...\n", uwsgi.mywid, uwsgi.mypid);
// 停止所有watcher
for(i=0;i<count;i++) {
PyObject_CallMethod(ugevent.watchers[i], "stop", NULL);
}
// 等待所有core完成本次请求
// 杀掉主控协程。那么下面对wait_for_me的join就会成功,worker进程就结束了。
PyObject_CallMethod(ugevent.ctrl_gl, "kill", NULL);
// 暴力结束worker
py_uwsgi_gevent_int
// uwsgi_log("Brutally killing worker %d (pid: %d)// ...\n", uwsgi.mywid, uwsgi.mypid);
// 停止所有watcher
for(i=0;i<count;i++) {
PyObject_CallMethod(ugevent.watchers[i], "stop", NULL);
}
// 不等待请求结束。直接杀
PyObject_CallMethod(ugevent.ctrl_gl, "kill", NULL);
// 配置wait_for_me。默认等hub。
PyObject *wait_for_me = ugevent.hub;
// wait_for_hub参数默认关闭
// 改为等ugevent.ctrl_gl
if (!ugevent.wait_for_hub) {
// 起一个控制协程
// 每当子协程切换时
PyObject *uwsgi_greenlet_ctrl_gl_handler = PyCFunction_New(uwsgi_gevent_ctrl_gl_def, NULL);
Py_INCREF(uwsgi_greenlet_ctrl_gl_handler);
PyObject *ctrl_gl_args = PyTuple_New(1);
PyTuple_SetItem(ctrl_gl_args, 0, uwsgi_greenlet_ctrl_gl_handler);
// spawn控制协程。在这里死循环。
// 实际是个dummy。啥也不干。就死循环不断sleep60秒(当然,会切换协程)。
ugevent.ctrl_gl = python_call(ugevent.spawn, ctrl_gl_args, 0, NULL);
py_uwsgi_gevent_ctrl_gl
for(;;) {
// sleep60秒
PyObject_CallObject(ugevent.greenlet_switch, gevent_sleep_args);
}
// 配置为控制协程
wait_for_me = ugevent.ctrl_gl;
}
for(;;) {
// 对wait_for_me = ugevent.ctrl_gl进行join
// 正常情况时join不了的。那么卡在这,除非控制协程结束,意味着worker结束。
// gevent的Greentlet最终继承自greenlet库的greenlet类。
// 它的join调用hub的switch(),最终调用greenlet类的switch,再调run,也就是gevent.Hub的run。
// 最终走libev.ev_run,启动事件循环。
// 这个流程在gevent代码里也看不出来,执行run的流程在greenlet代码里。
if (!PyObject_CallMethod(wait_for_me, "join", NULL)) {
PyErr_Print();
}
else {
break;
}
}
http流程#
// gevent参数
// 指定loop框架为gevent。采用gevent的协程调度。底层事件循环为libev。
// socket参数。服务器的监听socket。客户端或者nginx往这上连。
// 使用default protocol。uwsgi_socket_setup_protocol中会默认设置为uwsgi。
uwsgi_opt_add_socket
uwsgi_new_socket
添加到uwsgi.sockets末尾
// uwsgi_bind_sockets里进行创建/bind/listen
worker的gevent_loop中
// 获取uwsgi.sockets
// 对每个socket的fd创建watcher
watcher绑定py_uwsgi_gevent_main
// mount参数
// mount = /api/=main.py
uwsgi_setup
uwsgi_start
uwsgi_init_all_apps
// 处理参数,找=号,用=号隔开。
mount_app("/api/", "main.py")
uwsgi_python_mount_app
// 加载flask项目,获取flask的app以便后续调用。
// 见上面的plugin流程
// callable 参数
// 上述uwsgi_python_mount_app中指定flask代码中的app名字。
worker起socket并进入事件循环监听
客户端发起或者nginx转发一个新http请求
socket可accept
来到py_uwsgi_gevent_main进行accept
创建一个协程
读socket数据
解析请求
交给flask项目处理
得到flask的结果
结果通过socket发回给客户端或者nginx
请求结束/协程结束