文章系列(部分):
Processes
libuv 提供了许多的子进程管理相关的方法,屏蔽了平台的差异,并且允许使用流或命名管道与子进程进行通讯。
Unix 中的一个常见说法是,“每个进程都做一件事,并将它做好”。 在这种情况下,一个进程通常会使用多个子进程来完成任务(类似于在 shell 中使用管道)。 与基于共享内存的多线程模型相比,基于消息机制地多进程模型可能 easier to reason。
A common refrain against event-based programs is that they cannot take advantage of multiple cores in modern computers. In a multi-threaded program the kernel can perform scheduling and assign different threads to different cores, improving performance. But an event loop has only one thread. T
libuv 提供了相当多的子进程管理,抽象了平台差异并允许使用流或命名管道与子进程进行通信。
Unix 中的一个常见习语是每个进程都做一件事并且做得很好。 在这种情况下,一个进程通常会使用多个子进程来完成任务(类似于在 shell 中使用管道)。 与具有线程和共享内存的多进程模型相比,具有消息的多进程模型也可能更容易推理。
基于事件的程序的一个常见问题是它们无法利用现代计算机中的多核。 在多线程程序中,内核可以执行调度并将不同的线程分配给不同的内核,从而提高性能。 但是一个事件循环只有一个线程。 解决方法可以是启动多个进程,每个进程运行一个事件循环,每个进程被分配到一个单独的 CPU 内核。
Spawning 子进程
最简单的情况是您只想启动一个进程并知道它何时退出。 这是使用 uv_spawn 实现的。
spawn/main.c
uv_loop_t *loop;
uv_process_t child_req;
uv_process_options_t options;
int main() {
loop = uv_default_loop();
char* args[3];
args[0] = "mkdir";
args[1] = "test-dir";
args[2] = NULL;
options.exit_cb = on_exit;
options.file = "mkdir";
options.args = args;
int r;
if ((r = uv_spawn(loop, &child_req, &options))) {
fprintf(stderr, "%s\n", uv_strerror(r));
return 1;
} else {
fprintf(stderr, "Launched process with ID %d\n", child_req.pid);
}
return uv_run(loop, UV_RUN_DEFAULT);
}
Note: options 被隐式初始化为零,因为它是一个全局变量。 如果您将选项更改为局部变量,请记住将其初始化为清空所有未使用的字段:
uv_process_options_t options = {0};
uv_process_t
结构体仅充当句柄,所有选项都通过 uv_process_options_t
设置. 要简单地启动一个进程,您只需要设置 file
和 args
字段. file
是可执行文件的路径. 因为 uv_spawn
在内部使用了 execvp(3) , 因此不需要使用完整的绝对路径. 最后根据约定, 参数数组应当比参数实际数目大一,末尾使用nullptr.
调用 uv_spawn
之后, uv_process_t.pid
将会存储子进程的PID.
进程退出时,将会调用推出的回调函数,参数包括退出状态exit_status
,和导致进程退出的信号类型term_signal
spawn/main.c
void on_exit(uv_process_t *req, int64_t exit_status, int term_signal) {
fprintf(stderr, "Process exited with status %" PRId64 ", signal %d\n", exit_status, term_signal);
uv_close((uv_handle_t*) req, NULL);
进程退出后需要关闭句柄
修改进程参数
在启动子进程之前,您可以使用 uv_process_options_t
中的字段来控制执行环境。
更改执行目录
Set uv_process_options_t.cwd
to the corresponding directory.
设置环境变量
uv_process_options_t.env
是一个以 null 结尾的字符串数组,每个 都以 VAR=VALUE 的形式设置进程的环境变量。设置为 NULL 将继承父进程的环境变量(即当前进程)。
Option flags
通过将 uv_process_options_t.flags
设置为以下值的按位或形式, 可以控制子进程的行为:
UV_PROCESS_SETUID
– 将子进程的UID设置为uv_process_options_t.uid
所指定的值.UV_PROCESS_SETGID
– 将子进程的GID设置为uv_process_options_t.gid
所指定的值.
仅仅Unix支持设置GID/UID, uv_spawn
will fail on Windows with UV_ENOTSUP
.
UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS
– No quoting or escaping ofuv_process_options_t.args
is done on Windows. Ignored on Unix.UV_PROCESS_DETACHED
– 子进程设置为DETACHED
,即父进程推出后,子进程依然可以运行,如下所示:
Detaching processes
传递标志 UV_PROCESS_DETACHED 可用于启动守护进程或独立于父进程的子进程,以便父进程退出不会影响它。
detach/main.c
int main() {
loop = uv_default_loop();
char* args[3];
args[0] = "sleep";
args[1] = "100";
args[2] = NULL;
options.exit_cb = NULL;
options.file = "sleep";
options.args = args;
options.flags = UV_PROCESS_DETACHED;
int r;
if ((r = uv_spawn(loop, &child_req, &options))) {
fprintf(stderr, "%s\n", uv_strerror(r));
return 1;
}
fprintf(stderr, "Launched sleep with PID %d\n", child_req.pid);
uv_unref((uv_handle_t*) &child_req);
return uv_run(loop, UV_RUN_DEFAULT);
J请记住,句柄仍在监视孩子,因此您的程序不会退出. Use uv_unref()
if you want to be more fire-and-forget.
发送信号给子进程
liblibuv 包装了 Unix 上的标准 kill(2) 系统调用,并在 Windows 上实现了一个具有类似语义的系统调用,但需要注意:所有 SIGTERM、SIGINT 和 SIGKILL 都会导致进程终止。 uv_kill 的函数签名是:
uv_err_t uv_kill(int pid, int signum);
对于使用 libuv 启动的进程,您可以改用 uv_process_kill,它接受 uv_process_t 作为第一个参数,而不是 pid。 在这种情况下,要记得调用uv_close关闭句柄。
Signals
libuv 提供了对于Unix信号的封装,以及对 Windows 的支持.
使用uv_signal_init()
初始化相关的句柄并关联到loop中,可以使用 uv_signal_start()
来监听指定的信号并执行对应的处理函数。每个handler只能关联一个信号值,并且再次调用 uv_signal_start()
将覆盖原来的关联。使用uv_signal_stop()
停止关联,一个可能的示例如下:
signal/main.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <uv.h>
uv_loop_t* create_loop()
{
uv_loop_t *loop = malloc(sizeof(uv_loop_t));
if (loop) {
uv_loop_init(loop);
}
return loop;
}
void signal_handler(uv_signal_t *handle, int signum)
{
printf("Signal received: %d\n", signum);
uv_signal_stop(handle);
}
// two signal handlers in one loop
void thread1_worker(void *userp)
{
uv_loop_t *loop1 = create_loop();
uv_signal_t sig1a, sig1b;
uv_signal_init(loop1, &sig1a);
uv_signal_start(&sig1a, signal_handler, SIGUSR1);
uv_signal_init(loop1, &sig1b);
uv_signal_start(&sig1b, signal_handler, SIGUSR1);
uv_run(loop1, UV_RUN_DEFAULT);
}
// two signal handlers, each in its own loop
void thread2_worker(void *userp)
{
uv_loop_t *loop2 = create_loop();
uv_loop_t *loop3 = create_loop();
uv_signal_t sig2;
uv_signal_init(loop2, &sig2);
uv_signal_start(&sig2, signal_handler, SIGUSR1);
uv_signal_t sig3;
uv_signal_init(loop3, &sig3);
uv_signal_start(&sig3, signal_handler, SIGUSR1);
// 这里每个loop都会一直循环执行,直到收到信号
while (uv_run(loop2, UV_RUN_NOWAIT) || uv_run(loop3, UV_RUN_NOWAIT)) {
}
}
int main()
{
printf("PID %d\n", getpid());
uv_thread_t thread1, thread2;
uv_thread_create(&thread1, thread1_worker, 0);
uv_thread_create(&thread2, thread2_worker, 0);
uv_thread_join(&thread1);
uv_thread_join(&thread2);
return 0;
}
Note: uv_run(loop, UV_RUN_NOWAIT) 与 uv_run(loop, UV_RUN_ONCE) 类似,它只处理一个事件。 如果没有未决事件,UV_RUN_ONCE 会阻塞,而 UV_RUN_NOWAIT 将立即返回。 我们使用 NOWAIT 以便其中一个循环不会因为另一个循环没有待处理的活动而被饿死。.
将 SIGUSR1 发送到进程,您会发现处理程序被调用 4 次,每个 uv_signal_t 调用一次。 处理程序只是停止每个句柄,以便程序退出。 这种对所有处理程序的分派非常有用。 使用多个事件循环的服务器可以确保在终止之前安全保存所有数据,只需为每个循环添加一个 SIGINT 观察程序即可。
Child Process I/O
一个正常的、新生成的进程有自己的一组文件描述符,其中 0、1 和 2 分别是标准输入、标准输出和标准错误。 有时您可能希望与孩子共享文件描述符。 例如,也许您的应用程序启动了一个子命令,并且您希望任何错误都进入日志文件,但忽略 stdout。 为此,您希望孩子的标准错误与父母的标准错误相同。 在这种情况下,libuv 支持继承文件描述符。
proc-streams/test.c
#include <stdio.h>
int main()
{
fprintf(stderr, "This is stderr\n");
printf("This is stdout\n");
return 0;
}
The actual program proc-streams
runs this while sharing only stderr
. 子进程的文件描述符,使用 uv_process_options_t
的stdio
字段进行配置,首先设置 stdio_count
字段配置文件描述的数目。uv_process_options_t.stdio
是 uv_stdio_container_t
类型的数组:
typedef struct uv_stdio_container_s {
uv_stdio_flags flags;
union {
uv_stream_t* stream;
int fd;
} data;
} uv_stdio_container_t;
flags 字段有多个值, UV_IGNORE
表示该字段不会被使用.如果前三个项目被标志为UV_IGNORE
,则他们被重定向为/dev/null
.
由于我们想传递现有的描述符,我们将使用 UV_INHERIT_FD。 然后我们将 fd 设置为 stderr。
proc-streams/main.c
int main() {
loop = uv_default_loop();
/* ... */
options.stdio_count = 3;
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_IGNORE;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
options.stdio = child_stdio;
options.exit_cb = on_exit;
options.file = args[0];
options.args = args;
int r;
if ((r = uv_spawn(loop, &child_req, &options))) {
fprintf(stderr, "%s\n", uv_strerror(r));
return 1;
}
return uv_run(loop, UV_RUN_DEFAULT);
}
运行 proc-stream
程序将只会看到 “This is stderr” 被打印. 可以尝试将 stdout
标记为UV_INHERIT_FD.
将此重定向应用于流非常简单。 通过将标志设置为 UV_INHERIT_STREAM 并将 data.stream 设置为父进程中的流,子进程可以将该流视为标准 I/O。 这可以用来实现像 CGI 这样的东西。
A sample CGI script/executable is:
cgi/tick.c
#include <stdio.h>
#include <unistd.h>
int main() {
int i;
for (i = 0; i < 10; i++) {
printf("tick\n");
fflush(stdout);
sleep(1);
}
printf("BOOM!\n");
return 0;
}
CGI 服务器结合了本章和网络中的概念,因此每个客户端都会收到十个滴答声,然后关闭该连接。
cgi/main.c
void on_new_connection(uv_stream_t *server, int status) {
if (status == -1) {
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*) client) == 0) {
invoke_cgi_script(client);
}
else {
uv_close((uv_handle_t*) client, NULL);
}
H这里我们简单地接受 TCP 连接并将套接字(流)传递给 invoke_cgi_script
。
cgi/main.c
args[1] = NULL;
/* ... finding the executable path and setting up arguments ... */
options.stdio_count = 3;
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_IGNORE;
child_stdio[1].flags = UV_INHERIT_STREAM;
child_stdio[1].data.stream = (uv_stream_t*) client;
child_stdio[2].flags = UV_IGNORE;
options.stdio = child_stdio;
options.exit_cb = cleanup_handles;
options.file = args[0];
options.args = args;
// Set this so we can close the socket after the child process exits.
child_req.data = (void*) client;
int r;
if ((r = uv_spawn(loop, &child_req, &options))) {
fprintf(stderr, "%s\n", uv_strerror(r));
CGI 脚本的标准输出设置为套接字,这样无论我们的滴答脚本打印什么,都会发送到客户端。 通过使用进程,我们可以将读/写缓冲卸载到操作系统,因此在方便方面这非常好。 请注意,创建流程是一项代价高昂的任务。
Parent-child IPC
通过将 uv_stdio_container_t.flags 设置为 UV_CREATE_PIPE 和 UV_READABLE_PIPE 或 UV_WRITABLE_PIPE 的按位组合,父子节点可以通过管道进行一种或两种方式的通信。 读/写标志是从子进程的角度来看的。 在这种情况下,必须将 uv_stream_t* 流字段设置为指向一个已初始化、未打开的 uv_pipe_t 实例。
New stdio Pipes
uv_pipe_t
不仅仅 pipe(7) (or |
), 还支持任何流文件类对象. 在 Windows上, 他只能是 命名管道. 在 Unix中, 可以是任意 Unix Domain Socket, 或者是 mkfifo(1) 的派生, 或者是 pipe(7). 当 uv_spawn
初始化 uv_pipe_t
时,将创建一个socketpair(2).
这是为了允许多个 libuv 进程与 IPC 通信。 这将在下面讨论。
Arbitrary process IPC
由于域套接字可以在文件系统中具有众所周知的名称和位置,因此它们可以用于不相关进程之间的 IPC。 开源桌面环境使用的 D-BUS 系统使用域套接字进行事件通知。 当联系人上线或检测到新硬件时,各种应用程序可以做出反应。 MySQL 服务器还运行一个域套接字,客户端可以在该套接字上与之交互。
使用域套接字时,通常遵循客户端-服务器模式,套接字的创建者/所有者充当服务器。 在初始设置之后,消息传递与 TCP 没有什么不同,因此我们将重新使用回显服务器示例。
pipe-echo-server/main.c
void remove_sock(int sig) {
uv_fs_t req;
uv_fs_unlink(loop, &req, PIPENAME, NULL);
exit(0);
}
int main() {
loop = uv_default_loop();
uv_pipe_t server;
uv_pipe_init(loop, &server, 0);
signal(SIGINT, remove_sock);
int r;
if ((r = uv_pipe_bind(&server, PIPENAME))) {
fprintf(stderr, "Bind error %s\n", uv_err_name(r));
return 1;
}
if ((r = uv_listen((uv_stream_t*) &server, 128, on_new_connection))) {
fprintf(stderr, "Listen error %s\n", uv_err_name(r));
return 2;
}
return uv_run(loop, UV_RUN_DEFAULT);
}
我们将套接字命名为 echo.sock,这意味着它将在本地目录中创建。 就流 API 而言,此套接字现在的行为与 TCP 套接字没有什么不同。 您可以使用 socat 测试此服务器:
$ socat - /path/to/socket
想要连接到域套接字的客户端将使用:
void uv_pipe_connect(uv_connect_t *req, uv_pipe_t *handle, const char *name, uv_connect_cb cb);
where name
will be echo.sock
or similar. On Unix systems, name
must point to a valid file (e.g. /tmp/echo.sock
). On Windows, name
follows a \\?\pipe\echo.sock
format.
Sending file descriptors over pipes
域套接字很酷的一点是,文件描述符可以通过域套接字在进程之间进行交换。 这允许进程将其 I/O 移交给其他进程。 应用程序包括负载平衡服务器、工作进程和其他优化 CPU 使用的方法。 libuv 目前仅支持通过管道发送 TCP 套接字或其他管道。
为了演示,我们将查看一个回显服务器实现,它以循环方式将客户端传递给工作进程。 这个程序有点复杂,虽然书中只包含了一些片段,但建议阅读完整的代码才能真正理解它。
worker进程非常简单,因为文件描述符由主进程交给它。
multi-echo-server/worker.c
uv_loop_t *loop;
uv_pipe_t queue;
int main() {
loop = uv_default_loop();
uv_pipe_init(loop, &queue, 1 /* ipc */);
uv_pipe_open(&queue, 0 /*std io*/);
uv_read_start((uv_stream_t*)&queue, alloc_buffer, on_new_connection);
return uv_run(loop, UV_RUN_DEFAULT);
}
queue 是连接到另一端主进程的管道,新的文件描述符沿着该管道发送。 将 uv_pipe_init 的 ipc 参数设置为 1 以指示此管道将用于进程间通信,这一点很重要! 由于 master 会将文件句柄写入 worker 的标准输入,我们使用 uv_pipe_open 将管道连接到 stdin。
multi-echo-server/worker.c
void on_new_connection(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
if (nread < 0) {
if (nread != UV_EOF)
fprintf(stderr, "Read error %s\n", uv_err_name(nread));
uv_close((uv_handle_t*) q, NULL);
return;
}
uv_pipe_t *pipe = (uv_pipe_t*) q;
if (!uv_pipe_pending_count(pipe)) {
fprintf(stderr, "No pending count\n");
return;
}
uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP);
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(q, (uv_stream_t*) client) == 0) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*) client, &fd);
fprintf(stderr, "Worker %d: Accepted fd %d\n", getpid(), fd);
uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}
else {
uv_close((uv_handle_t*) client, NULL);
}
}
First we call uv_pipe_pending_count()
to ensure that a handle is available to read out. If your program could deal with different types of handles, uv_pipe_pending_type()
can be used to determine the type. Although accept
seems odd in this code, it actually makes sense. What accept
traditionally does is get a file descriptor (the client) from another file descriptor (The listening socket). Which is exactly what we do here. Fetch the file descriptor (client
) from queue
. From this point the worker does standard echo server stuff.
Turning now to the master, let’s take a look at how the workers are launched to allow load balancing.
multi-echo-server/main.c
struct child_worker {
uv_process_t req;
uv_process_options_t options;
uv_pipe_t pipe;
} *workers;
The child_worker
structure wraps the process, and the pipe between the master and the individual process.
multi-echo-server/main.c
void setup_workers() {
round_robin_counter = 0;
// ...
// launch same number of workers as number of CPUs
uv_cpu_info_t *info;
int cpu_count;
uv_cpu_info(&info, &cpu_count);
uv_free_cpu_info(info, cpu_count);
child_worker_count = cpu_count;
workers = calloc(cpu_count, sizeof(struct child_worker));
while (cpu_count--) {
struct child_worker *worker = &workers[cpu_count];
uv_pipe_init(loop, &worker->pipe, 1);
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
child_stdio[0].data.stream = (uv_stream_t*) &worker->pipe;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
worker->options.stdio = child_stdio;
worker->options.stdio_count = 3;
worker->options.exit_cb = close_process_handle;
worker->options.file = args[0];
worker->options.args = args;
uv_spawn(loop, &worker->req, &worker->options);
fprintf(stderr, "Started worker %d\n", worker->req.pid);
}
}
In setting up the workers, we use the nifty libuv function uv_cpu_info
to get the number of CPUs so we can launch an equal number of workers. Again it is important to initialize the pipe acting as the IPC channel with the third argument as 1. We then indicate that the child process’ stdin
is to be a readable pipe (from the point of view of the child). Everything is straightforward till here. The workers are launched and waiting for file descriptors to be written to their standard input.
It is in on_new_connection
(the TCP infrastructure is initialized in main()
), that we accept the client socket and pass it along to the next worker in the round-robin.
multi-echo-server/main.c
void on_new_connection(uv_stream_t *server, int status) {
if (status == -1) {
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*) client) == 0) {
uv_write_t *write_req = (uv_write_t*) malloc(sizeof(uv_write_t));
dummy_buf = uv_buf_init("a", 1);
struct child_worker *worker = &workers[round_robin_counter];
uv_write2(write_req, (uv_stream_t*) &worker->pipe, &dummy_buf, 1, (uv_stream_t*) client, NULL);
round_robin_counter = (round_robin_counter + 1) % child_worker_count;
}
else {
uv_close((uv_handle_t*) client, NULL);
}
}
The uv_write2
call handles all the abstraction and it is simply a matter of passing in the handle (client
) as the right argument. With this our multi-process echo server is operational.
Thanks to Kyle for pointing out that uv_write2()
requires a non-empty buffer even when sending handles.
Pingback:libuv : User guide » Networking – Kingdo Station
Pingback:libuv : User guide » Utilities – Kingdo Station