文章系列(部分):
Threads
Wait a minute? Why are we on threads? Aren’t event loops supposed to be the way to do web-scale programming? Well… no. Threads are still the medium in which processors do their jobs. Threads are therefore mighty useful sometimes, even though you might have to wade through various synchronization primitives.
线程, 是用来实现异步的另一种方式, 通过线程, 我们可以让所有的系统调用都变成异步执行. 与麻烦的epoll不同, 我们只需要启动一个线程去执行同步的或者是阻塞的任务, 然后在其执行完毕后, 收集其执行结果就可以了, 这样就可以异步的执行任何任务.
今天有两个主要的线程库:Windows 线程实现和 POSIX 的 pthreads(7)。 libuv 的线程 API 类似于 pthreads API,并且通常具有相似的语义。
毕竟Linux的C编程是最自然的一件事情, 而libuv之所以出现, 就是为了让nodejs兼容Windows.
libuv 的线程模块是 libuv 中的一个独立部分。 虽然其他特性密切依赖于事件循环和回调原则,但线程是完全不可知的,它们会在需要阻塞时阻塞,并直接通过返回值发出错误信号,如下文中第一个示例所示,它甚至不需要运行事件循环。
libuv 的线程 API 也非常有限,因为线程的语义和语法在所有平台上都是不同的,具有不同的完整性级别。
本章做了以下假设:只有一个事件循环,在一个线程(主线程)中运行。 没有其他线程与事件循环交互(使用 uv_async_send 除外)。
Core thread operations
你只需使用 uv_thread_create() 启动一个线程,然后使用 uv_thread_join() 等待它关闭。
thread-create/main.c
int main() {
int tracklen = 10;
uv_thread_t hare_id;
uv_thread_t tortoise_id;
uv_thread_create(&hare_id, hare, &tracklen);
uv_thread_create(&tortoise_id, tortoise, &tracklen);
uv_thread_join(&hare_id);
uv_thread_join(&tortoise_id);
return 0;
}
提示 : uv_thread_t 只是 Unix 上 pthread_t 的别名,but this is an implementation detail, avoid depending on it to always be true.
第二个参数是作为线程入口点的函数,最后一个参数的类型时 void * ,可用于将自定义参数传递给线程。 函数 hare 将在一个新启动的单独的线程中运行,由操作系统进行调度:
thread-create/main.c
void hare(void *arg) {
int tracklen = *((int *) arg);
while (tracklen) {
tracklen--;
sleep(1);
fprintf(stderr, "Hare ran another step\n");
}
fprintf(stderr, "Hare done running!\n");
}
pthread_join()
允许目标线程通过第二个参数传递数据, 但是uv_thread_join()
并不允许, 线程间的通讯见下文.
Synchronization Primitives
Mutexes
mutex函数, 完全等效于pthread中的相关的各个函数.
libuv mutex functions
int uv_mutex_init(uv_mutex_t* handle);
int uv_mutex_init_recursive(uv_mutex_t* handle);
void uv_mutex_destroy(uv_mutex_t* handle);
void uv_mutex_lock(uv_mutex_t* handle);
int uv_mutex_trylock(uv_mutex_t* handle);
void uv_mutex_unlock(uv_mutex_t* handle);
uv_mutex_init()、uv_mutex_init_recursive() 和 uv_mutex_trylock() 函数将在成功时返回 0,否则返回错误代码。
如果 libuv 已在启用调试的情况下编译,uv_mutex_destroy()
、uv_mutex_lock()
和 uv_mutex_unlock()
将在出错时abort()。 同样,如果错误不是 EAGAIN 或 EBUSY,uv_mutex_trylock()
将 abort 。
支持递归互斥锁,但您不应依赖它们。 此外,它们不应与 uv_cond_t 变量一起使用。
如果已锁定互斥锁的线程尝试再次锁定它,默认的 BSD 互斥锁实现将引发错误。 例如,像这样的构造:
uv_mutex_init(a_mutex);
uv_mutex_lock(a_mutex);
uv_thread_create(thread_id, entry, (void *)a_mutex);
uv_mutex_lock(a_mutex);
// more things here
can be used to wait until another thread initializes some stuff and then unlocks a_mutex
but will lead to your program crashing if in debug mode, or return an error in the second call to uv_mutex_lock()
.
注意: Mutexes on Windows are always recursive.
Locks
读写锁是一种更细粒度的访问机制。 两个读者可以同时访问共享内存。 当读者持有时,写者无法获得锁。 当写者持有锁时,读者或写者不会获得锁。 数据库中经常使用读写锁。
locks/main.c – simple rwlocks
#include <stdio.h>
#include <uv.h>
uv_barrier_t blocker;
uv_rwlock_t numlock;
int shared_num;
void reader(void *n)
{
int num = *(int *)n;
int i;
for (i = 0; i < 20; i++) {
uv_rwlock_rdlock(&numlock);
printf("Reader %d: acquired lock\n", num);
printf("Reader %d: shared num = %d\n", num, shared_num);
uv_rwlock_rdunlock(&numlock);
printf("Reader %d: released lock\n", num);
}
uv_barrier_wait(&blocker);
}
void writer(void *n)
{
int num = *(int *)n;
int i;
for (i = 0; i < 20; i++) {
uv_rwlock_wrlock(&numlock);
printf("Writer %d: acquired lock\n", num);
shared_num++;
printf("Writer %d: incremented shared num = %d\n", num, shared_num);
uv_rwlock_wrunlock(&numlock);
printf("Writer %d: released lock\n", num);
}
uv_barrier_wait(&blocker);
}
int main()
{
uv_barrier_init(&blocker, 4);
shared_num = 0;
uv_rwlock_init(&numlock);
uv_thread_t threads[3];
int thread_nums[] = {1, 2, 1};
uv_thread_create(&threads[0], reader, &thread_nums[0]);
uv_thread_create(&threads[1], reader, &thread_nums[1]);
uv_thread_create(&threads[2], writer, &thread_nums[2]);
uv_barrier_wait(&blocker);
uv_barrier_destroy(&blocker);
uv_rwlock_destroy(&numlock);
return 0;
}
运行它并观察读者是如何重叠执行的。 如果有多个写者,调度程序通常会给它们更高的优先级,因此如果添加两个写入者,您会发现两个写者往往在读者再次获得机会之前先完成。
我们在上面的例子中也使用了 barrier ,这样主线程就可以等待所有的读者和写者都结束。
Others
libuv also supports semaphores, condition variables and barriers with APIs very similar to their pthread counterparts.
In addition, libuv provides a convenience function uv_once()
. Multiple threads can attempt to call uv_once()
with a given guard and a function pointer, only the first one will win, the function will be called once and only once:
/* Initialize guard */
static uv_once_t once_only = UV_ONCE_INIT;
int i = 0;
void increment() {
i++;
}
void thread1() {
/* ... work */
uv_once(once_only, increment);
}
void thread2() {
/* ... work */
uv_once(once_only, increment);
}
int main() {
/* ... spawn threads */
}
After all threads are done, i == 1
.
libuv v0.11.11 onwards also added a uv_key_t
struct and api for thread-local storage.
libuv work queue
uv_queue_work() 是一个方便的函数,它允许应用程序在单独的线程中运行任务,并在任务完成时触发回调。最重要的是,uv_queue_work() 允许潜在的任何第三方库与事件循环范例一起使用。当使用事件循环时,必须确保在执行 I/O 时, 循环线程中定期运行的函数不会阻塞或严重占用 CPU,因为这意味着循环变慢并且没有满负荷的处理事件。
但是,如果您想要响应性(经典的“每个客户端一个线程”服务器模型),那么许多现有代码都具有与线程一起使用的阻塞函数(例如,在后台执行 I/O 的例程),并获得他们使用事件循环库通常涉及滚动您自己的系统,在单独的线程中运行任务。 libuv 只是为此提供了一个方便的抽象。
一般来说, 对于用户的请求, 应该是安排一个线程来处理, 因为线程中不可避免的会存在阻塞的IO的代码, 特别是对于第三方的代码而言, 因为他们不可能按照事件循环加回调的模式去编程, 因此libuv提供的 work queue 其实就是用线程去执行一个任务, 以抽象上述过程.
下面这个简单的实例将计算斐波那契数,并在中途睡眠,但将会在在单独的线程中运行它,这样阻塞型任务和 CPU 任务不会妨碍事件循环执行其他活动。
queue-work/main.c – lazy fibonacci
void fib(uv_work_t *req) {
int n = *(int *) req->data;
if (random() % 2)
sleep(1);
else
sleep(3);
long fib = fib_(n);
fprintf(stderr, "%dth fibonacci is %lu\n", n, fib);
}
void after_fib(uv_work_t *req, int status) {
fprintf(stderr, "Done calculating %dth fibonacci\n", *(int *) req->data);
}
fib() 函数将会在单独的线程中执行, 执行完毕后, 会在事件循环中调用 after_fib() 函数, after_fib() 就可以认为是异步执行fib()的回调函数.
从 libuv 0.9.4 版开始,一个附加函数 uv_cancel() 可用。这允许您取消 libuv 工作队列上的任务。只有尚未开始的任务才能被取消。如果一个任务已经开始执行,或者它已经完成执行,uv_cancel() 将失败。
如果用户请求终止,uv_cancel() 对清理挂起的任务很有用。例如,音乐播放器可能会排队多个目录以扫描音频文件。如果用户终止程序,它应该快速退出,而不是等到所有挂起的请求都运行后。
让我们修改斐波那契示例来演示 uv_cancel()。我们首先为终止设置了一个信号处理程序。
queue-cancel/main.c
int main() {
loop = uv_default_loop();
int data[FIB_UNTIL];
int i;
for (i = 0; i < FIB_UNTIL; i++) {
data[i] = i;
fib_reqs[i].data = (void *) &data[i];
uv_queue_work(loop, &fib_reqs[i], fib, after_fib);
}
uv_signal_t sig;
uv_signal_init(loop, &sig);
uv_signal_start(&sig, signal_handler, SIGINT);
return uv_run(loop, UV_RUN_DEFAULT);
}
当用户按下 Ctrl+C 触发信号时,我们将 uv_cancel() 发送给所有workers。 uv_cancel() 将为已经执行或完成的那些返回 0。
queue-cancel/main.c
void signal_handler(uv_signal_t *req, int signum)
{
printf("Signal received!\n");
int i;
for (i = 0; i < FIB_UNTIL; i++) {
uv_cancel((uv_req_t*) &fib_reqs[i]);
}
uv_signal_stop(req);
}
对于成功取消的任务,调用 after 函数时, 第二个参数status将被设置为 UV_ECANCELED。
queue-cancel/main.c
void after_fib(uv_work_t *req, int status) {
if (status == UV_ECANCELED)
fprintf(stderr, "Calculation of %d cancelled.\n", *(int *) req->data);
}
uv_cancel()
can also be used with uv_fs_t
and uv_getaddrinfo_t
requests. For the filesystem family of functions, uv_fs_t.errorno
will be set to UV_ECANCELED.
上述任务, 都是通过线程实现的, 但是线程池的大小是固定的, 如果任务超过了线程数目, 那么自然就需要排队, 如果正在排队的任务被执行 uv_cancel()
, 就会结束.
提示: 一个设计良好的应用程序应该可以自定义的终止已经开始运行的任务, 比如可以让任务定期的检查一个有主线程设置的变量, 如果主线程要求其终止则自动终止.
Inter-thread communication
有时,需要各个线程在运行时相互发送消息。 例如,在单独的线程中运行一些持续时间较长的任务(可能使用 uv_queue_work),但希望将进度通知给主线程。 这是一个让下载管理器通知用户下载状态的简单示例。
progress/main.c
uv_loop_t *loop;
uv_async_t async;
int main() {
loop = uv_default_loop();
uv_work_t req;
int size = 10240;
req.data = (void*) &size;
uv_async_init(loop, &async, print_progress);
uv_queue_work(loop, &req, fake_download, after);
return uv_run(loop, UV_RUN_DEFAULT);
}
尽管任何线程都可以发送消息, 但是只有loop主线程可以接收消息. 当 async 接收到消息后, 会调用回调函数 print_progress() .
警告: 重要的是要意识到,由于消息发送是异步的,回调可能会在另一个线程中调用 uv_async_send 后立即调用,也可能在一段时间后调用。 libuv 还可以组合对 uv_async_send 的多次调用,并且只调用一次回调。 libuv 做出的唯一保证是——回调函数在调用 uv_async_send 后至少被调用一次。 如果没有发起 uv_async_send 调用,则不会调用回调。 如果你进行了两次或多次调用,而 libuv 还没有机会运行回调,它可能只会调用一次你的回调,用于 uv_async_send 的多次调用。 您的回调永远不会因为一个事件而被调用两次。
我来总结一下这个警告: 一次 uv_async_send 调用最多触发一次回调函数, libuv可能会将多次 uv_async_send 调用整合到一起, 仅仅执行一次回调.
progress/main.c
double percentage;
void fake_download(uv_work_t *req) {
int size = *((int*) req->data);
int downloaded = 0;
while (downloaded < size) {
percentage = downloaded*100.0/size;
async.data = (void*) &percentage;
uv_async_send(&async);
sleep(1);
downloaded += (200+random())%1000; // can only download max 1000bytes/sec,
// but at least a 200;
}
}
在下载函数中,我们修改了进度指示器,并使用 uv_async_send 发送进度的信息。 请记住:uv_async_send 也是非阻塞的,并且会立即返回。
progress/main.c
void print_progress(uv_async_t *handle) {
double percentage = *((double*) handle->data);
fprintf(stderr, "Downloaded %.2f%%\n", percentage);
}
回调是标准的 libuv 模式,从观察者中提取数据。
最后,重要的是要记住关闭观察者。
void after(uv_work_t *req, int status) {
fprintf(stderr, "Download complete\n");
uv_close((uv_handle_t*) &async, NULL);
}
After this example, which showed the abuse of the data
field, bnoordhuis pointed out that using the data
field is not thread safe, and uv_async_send()
is actually only meant to wake up the event loop. Use a mutex or rwlock to ensure accesses are performed in the right order.
注意, 互斥锁和 读写锁不能在信号处理程序中使用,而 uv_async_send 可以。
One use case where uv_async_send
is required is when interoperating with libraries that require thread affinity for their functionality. For example in node.js, a v8 engine instance, contexts and its objects are bound to the thread that the v8 instance was started in. Interacting with v8 data structures from another thread can lead to undefined results. Now consider some node.js module which binds a third party library. It may go something like this:
- In node, the third party library is set up with a JavaScript callback to be invoked for more information:
var lib = require('lib');
lib.on_progress(function() {
console.log("Progress");
});
lib.do();
// do other stuff
lib.do
is supposed to be non-blocking but the third party lib is blocking, so the binding usesuv_queue_work
.- The actual work being done in a separate thread wants to invoke the progress callback, but cannot directly call into v8 to interact with JavaScript. So it uses
uv_async_send
. - The async callback, invoked in the main loop thread, which is the v8 thread, then interacts with v8 to invoke the JavaScript callback.
Pingback:libuv : User guide » Utilities – Kingdo Station