好好活就是有意义的事,有意义的事就是好好活
libuv : User guide » Filesystem
libuv : User guide » Filesystem

libuv : User guide » Filesystem

Filesystem

使用 uv_fs_* 函数和 uv_fs_t 结构实现简单的文件系统的读/写。

注意: libuv 文件系统操作不同于套接字操作。 套接字操作使用操作系统提供的非阻塞操作。 文件系统操作在内部使用阻塞函数,但是我们会通过线程池中的线程来调用这些函数, 从而实现异步, 同时,在需要应用程序交互时通知注册到事件循环的观察者, 从而实现与套接字操作类似的效果.

所有的文件操作都有两种方式: 异步和同步.

如果回调函数是null, 那么将自动使用同步方式, 此时会发生阻塞, 如果操作失败, 会返回错误代码, 如 Basics of libuv 中所述的那样.

如果传入了回调函数, 那么将采用异步的方式, 此时会从线程池中选择一个空闲的线程来处理请求, 函数将返回的0.

Reading/Writing files

获取一个文件描述符:

int uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, int mode, uv_fs_cb cb)

flagsmode都是标准的Unix flags, libuv会将其转化为对应的Windows的flags.

关闭文件描述符:

int uv_fs_close(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb)

可以看到, uv_fs_t是请求而不是句柄!

回调函数的函数声明:

void callback(uv_fs_t* req);

让我们看一下 cat 命令的简单实现。 我们首先注册一个文件打开时的回调:

uvcat/main.c – opening a file

void on_open(uv_fs_t *req) {
    // The request passed to the callback is the same as the one the call setup
    // function was passed.
    assert(req == &open_req);
    if (req->result >= 0) {
        iov = uv_buf_init(buffer, sizeof(buffer));
        uv_fs_read(uv_default_loop(), &read_req, req->result,
                   &iov, 1, -1, on_read);
    }
    else {
        fprintf(stderr, "error opening file: %s\n", uv_strerror((int)req->result));
    }
}

req->result 是open操作返回的文件描述符, 如果打开成功, 我们将在会在open的回调函数中, 读取它.

读取的操作是 uv_fs_read 完成的, 他会从 req->result 指示的文件中读出数据, 写到iov指示的buffer中, 最大的写入长度就是iov的大小, iov后面面的参数表示有几个iov, 如果是2的话, 说明iov+iov.len也是一个可用的buffer.

当以上写入操作完成后, 将会调用read的回调函数.

uvcat/main.c – read callback

void on_read(uv_fs_t *req) {
    if (req->result < 0) {
        fprintf(stderr, "Read error: %s\n", uv_strerror(req->result));
    }
    else if (req->result == 0) {
        uv_fs_t close_req;
        // synchronous
        uv_fs_close(uv_default_loop(), &close_req, open_req.result, NULL);
    }
    else if (req->result > 0) {
        iov.len = req->result;
        uv_fs_write(uv_default_loop(), &write_req, 1, &iov, 1, -1, on_write);
    }
}

req->result 是读取操作的返回值, 也就是读了多长的数据! 我们需要把这些数据写道stdout中, 因为我们要实现的cat命令.

写入操作是 uv_fs_write 完成的. 第一个1表示stdout的文件描述符, iov是buffer, 第二个1是iov的个数!

写入完成之后, 我们需要调用 on_write , 因为文件目前仅仅读了一个iov的大小, 因此需要我们继续读, 知道文件读取完毕.

uvcat/main.c – write callback


void on_write(uv_fs_t *req) {
    if (req->result < 0) {
        fprintf(stderr, "Write error: %s\n", uv_strerror((int)req->result));
    }
    else {
        uv_fs_read(uv_default_loop(), &read_req, open_req.result, &iov, 1, -1, on_read);
    }
}

所以写入操作的回调函数就是继续执行读操作.

警告:由于文件系统和磁盘驱动器的性能配置方式,“成功”的写入可能尚未提交到磁盘。

uvcat/main.c

uv_fs_t open_req;
uv_fs_t read_req;
uv_fs_t write_req;
uv_buf_t iov;
char buffer[100];
int main(int argc, char **argv) {
    uv_fs_open(uv_default_loop(), &open_req, argv[1], O_RDONLY, 0, on_open);
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);

    uv_fs_req_cleanup(&open_req);
    uv_fs_req_cleanup(&read_req);
    uv_fs_req_cleanup(&write_req);
    return 0;
}

警告:必须始终在文件系统请求上调用 uv_fs_req_cleanup() 函数以释放 libuv 中的内部内存分配。

Filesystem operations

异步支持所有标准文件系统操作,如 unlink、rmdir、stat,并具有直观的参数顺序。 它们遵循与 read/write/open 调用相同的模式,在 uv_fs_t.result 字段中返回结果。 完整函数list:

Filesystem operations:

int uv_fs_close(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb);
int uv_fs_open(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, int mode, uv_fs_cb cb);
int uv_fs_read(uv_loop_t* loop, uv_fs_t* req, uv_file file, const uv_buf_t bufs[], unsigned int nbufs, int64_t offset, uv_fs_cb cb);
int uv_fs_unlink(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb);
int uv_fs_write(uv_loop_t* loop, uv_fs_t* req, uv_file file, const uv_buf_t bufs[], unsigned int nbufs, int64_t offset, uv_fs_cb cb);
int uv_fs_copyfile(uv_loop_t* loop, uv_fs_t* req, const char* path, const char* new_path, int flags, uv_fs_cb cb);
int uv_fs_mkdir(uv_loop_t* loop, uv_fs_t* req, const char* path, int mode, uv_fs_cb cb);
int uv_fs_mkdtemp(uv_loop_t* loop, uv_fs_t* req, const char* tpl, uv_fs_cb cb);
int uv_fs_rmdir(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb);
int uv_fs_scandir(uv_loop_t* loop, uv_fs_t* req, const char* path, int flags, uv_fs_cb cb);
int uv_fs_scandir_next(uv_fs_t* req, uv_dirent_t* ent);
int uv_fs_opendir(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb);
int uv_fs_readdir(uv_loop_t* loop, uv_fs_t* req, uv_dir_t* dir, uv_fs_cb cb);
int uv_fs_closedir(uv_loop_t* loop, uv_fs_t* req, uv_dir_t* dir, uv_fs_cb cb);
int uv_fs_stat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb);
int uv_fs_fstat(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb);
int uv_fs_rename(uv_loop_t* loop, uv_fs_t* req, const char* path, const char* new_path, uv_fs_cb cb);
int uv_fs_fsync(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb);
int uv_fs_fdatasync(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_fs_cb cb);
int uv_fs_ftruncate(uv_loop_t* loop, uv_fs_t* req, uv_file file, int64_t offset, uv_fs_cb cb);
int uv_fs_sendfile(uv_loop_t* loop, uv_fs_t* req, uv_file out_fd, uv_file in_fd, int64_t in_offset, size_t length, uv_fs_cb cb);
int uv_fs_access(uv_loop_t* loop, uv_fs_t* req, const char* path, int mode, uv_fs_cb cb);
int uv_fs_chmod(uv_loop_t* loop, uv_fs_t* req, const char* path, int mode, uv_fs_cb cb);
int uv_fs_utime(uv_loop_t* loop, uv_fs_t* req, const char* path, double atime, double mtime, uv_fs_cb cb);
int uv_fs_futime(uv_loop_t* loop, uv_fs_t* req, uv_file file, double atime, double mtime, uv_fs_cb cb);
int uv_fs_lstat(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb);
int uv_fs_link(uv_loop_t* loop, uv_fs_t* req, const char* path, const char* new_path, uv_fs_cb cb);
int uv_fs_symlink(uv_loop_t* loop, uv_fs_t* req, const char* path, const char* new_path, int flags, uv_fs_cb cb);
int uv_fs_readlink(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb);
int uv_fs_realpath(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_fs_cb cb);
int uv_fs_fchmod(uv_loop_t* loop, uv_fs_t* req, uv_file file, int mode, uv_fs_cb cb);
int uv_fs_chown(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_uid_t uid, uv_gid_t gid, uv_fs_cb cb);
int uv_fs_fchown(uv_loop_t* loop, uv_fs_t* req, uv_file file, uv_uid_t uid, uv_gid_t gid, uv_fs_cb cb);
int uv_fs_lchown(uv_loop_t* loop, uv_fs_t* req, const char* path, uv_uid_t uid, uv_gid_t gid, uv_fs_cb cb);

Buffers and Streams

流 (uv_stream_t)是libuv 中一个基本的I/O 句柄。 TCP 套接字、UDP 套接字 、管道和 IPC的文件 I/O 都被视为流子类。

使用每个子类的自定义函数初始化流,然后使用.

int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
int uv_read_stop(uv_stream_t*);
int uv_write(uv_write_t* req, uv_stream_t* handle,
             const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb);

基于流的函数比文件系统的函数更容易使用,并且当 uv_read_start() 被调用一次时,libuv 将自动继续从流中读取,直到 uv_read_stop() 被调用。

数据的单元是缓冲区——uv_buf_t。 这只是一个指向字节 (uv_buf_t.base) 和长度 (uv_buf_t.len) 的指针的集合。 uv_buf_t 是轻量级的,按值传递。 真正需要管理的是实际字节,它们必须由应用程序分配和释放。

流和普通文件不同, 普通文件的大小是固定的, 因此用户只需要定义一个确定的 uv_buf_t ,然后将文件的数据读入即可, 如果文件太大, 那么久分批次读入; 但是流的大小是不固定的, 每次读多少取决于发送者, 因此每次都会给出一个建议值, 表明我这次读了多少数据, 然后再由用户根据建议值创建buf, 然后将数据读到buf中, 之所以叫建议值 , 就是说用户定义的buf的大小也是可以随意, 太小的话, 也可以分批读.

为了演示流,我们需要使用 uv_pipe_t。 它允许流式传输本地文件。 这是一个使用 libuv 的简单 tee 实用程序。 异步执行所有操作显示了事件 I/O 的强大功能。 两次写入不会相互阻塞,但我们必须小心复制缓冲区数据,以确保在写入之前不会释放缓冲区。

该程序的使用方式:

./uvtee <output_file>
# 此程序会将管道输入的数据, 同时输出到stdout和指定的file

我们我们需要的文件上打开管道。 默认情况下,指向文件的 libuv 管道以双向方式打开。

uvtee/main.c – read on pipes

int main(int argc, char **argv) {
    loop = uv_default_loop();

    uv_pipe_init(loop, &stdin_pipe, 0);
    uv_pipe_open(&stdin_pipe, 0);

    uv_pipe_init(loop, &stdout_pipe, 0);
    uv_pipe_open(&stdout_pipe, 1);
    
    uv_fs_t file_req;
    int fd = uv_fs_open(loop, &file_req, argv[1], O_CREAT | O_RDWR, 0644, NULL);
    uv_pipe_init(loop, &file_pipe, 0);
    uv_pipe_open(&file_pipe, fd);

    uv_read_start((uv_stream_t*)&stdin_pipe, alloc_buffer, read_stdin);

    uv_run(loop, UV_RUN_DEFAULT);
    return 0;
}

uv_pipe_init () 用于创建管道, 第三个参数表示匿名管道, 如果要创建命名管道需要设置为1.

然后我们将管道和文件描述符相关联, 分别关联了0(sdtin), 1(stdout), fd(打开的文件)

然后我们开始读与stdin关联的管道, 此过程中, 输入端每输入一个回车, 管道就会收到一次数据, 然后先触发 alloc_buffer 的回调函数, 再触发 read_stdin 的回调函数.

uvtee/main.c – reading buffers

void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
    *buf = uv_buf_init((char*) malloc(suggested_size), suggested_size);
}

void read_stdin(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
    if (nread < 0){
        if (nread == UV_EOF){
            // end of file
            uv_close((uv_handle_t *)&stdin_pipe, NULL);
            uv_close((uv_handle_t *)&stdout_pipe, NULL);
            uv_close((uv_handle_t *)&file_pipe, NULL);
        }
    } else if (nread > 0) {
        write_data((uv_stream_t *)&stdout_pipe, nread, *buf, on_stdout_write);
        write_data((uv_stream_t *)&file_pipe, nread, *buf, on_file_write);
    }

    // OK to free buffer as write_data copies it.
    if (buf->base)
        free(buf->base);
}

在 alloc_buffer 的回调函数处理中, 给定的参数包括建议值的大小, 和 uv_buf_t 的指针.

此时我们需要给 uv_buf_t 指向的变量赋值, 也就是我们提供多大的一个buffer空间.

在 read_stdin 的回调函数中, 数据已经被读取到了 uv_buf_t 中, 这个时候,我们要把数据copy到file和stdout中.

参数 nread 表示读了多少数据, buf 是我们在 alloc_buffer 的回调 中初始化的buffer空间.

在执行完写入操作之后, 需要执行写入的回调函数. 最后要释放掉这次的buffer.

标准 malloc 在这里就足够了,但您可以使用任何内存分配方案。例如,node.js 使用它自己的slab 分配器,它将缓冲区与V8 对象相关联。

任何错误的读取回调 nread 参数都小于 0。此错误可能是 EOF,在这种情况下,我们使用通用关闭函数 uv_close() 关闭所有流,该函数根据其内部类型处理句柄。否则 nread 是一个非负数,我们可以尝试将那么多字节写入输出流。最后记住缓冲区分配和释放是应用程序的责任,所以我们释放数据。

如果分配内存失败,分配回调可能会返回长度为零的缓冲区。在这种情况下,调用读取回调时出现错误 UV_ENOBUFS。 libuv 将继续尝试读取流,因此如果您想在分配失败时停止,则必须显式调用 uv_close()。

读取回调可能会在 nread = 0 时调用,表示此时没有要读取的内容。大多数应用程序会忽略这一点。

uvtee/main.c – Write to pipe

typedef struct {
    uv_write_t req;
    uv_buf_t buf;
} write_req_t;

void free_write_req(uv_write_t *req) {
    write_req_t *wr = (write_req_t*) req;
    free(wr->buf.base);
    free(wr);
}

void on_stdout_write(uv_write_t *req, int status) {
    free_write_req(req);
}

void on_file_write(uv_write_t *req, int status) {
    free_write_req(req);
}

void write_data(uv_stream_t *dest, size_t size, uv_buf_t buf, uv_write_cb cb) {
    write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
    req->buf = uv_buf_init((char*) malloc(size), size);
    memcpy(req->buf.base, buf.base, size);
    uv_write((uv_write_t*) req, (uv_stream_t*)dest, &req->buf, 1, cb);
}

这里的代码非常有意思, write_data 是我们自己写的一个函数, 它首先将创建buf的一个副本, 然后调用了 uv_write , 为什么不直接调用呢? 因为 uv_write 是一个异步的操作, 也就是立即返回的, 但是我们在 read_stdin 中, 调用完 write_data 后就直接释放了 buf , 而此时 uv_write 还没有开始执行呢! 因此我们在 uv_write 之前创建了一个buf的副本!

但是我能的副本也是需要在 uv_write 之后释放, 也就是需要在 uv_write 的回调函数中进行释放, 但是回调函数的参数是不会让你额外传入参数的! 于是就使用了一个非常妙的方法, 我们构造了一个 write_req_t 的结构体, 其第一个成员是 uv_write_t ,这样 uv_write_t 的指针和 write_req_t 的指针就是重合的, 最后只要通过指针类型转化就做到了在传递 uv_write_t 的同时传递了 write_req_t , 这样就把buf的地址一起传过来了!

当然, 上述代码其实我们也可以不用创建副本, 我们可以用类似的方法, 把释放buf的过程, 不要放在 read_stdin 中, 而是放在 write_req_t 的回调函数中.

write_data() 复制从读取中获得的缓冲区。 此缓冲区不会传递到写完成时触发的写回调。 为了解决这个问题,我们在 write_req_t 中包装了一个写请求和一个缓冲区,并在回调中解包。 我们制作了一个副本,这样我们就可以从两个相互独立的 write_data 调用中释放两个缓冲区。 虽然这样的演示程序可以接受,但您可能需要更智能的内存管理,例如引用计数缓冲区或任何主要应用程序中的缓冲区池。

警告 : 如果您的程序打算与其他程序一起使用,它可能有意或无意地写入管道。 这使得它容易在接收到 SIGPIPE 时中止。 在应用程序的初始化阶段插入:signal(SIGPIPE, SIG_IGN) 是个好主意。

File change events

所有现代操作系统都提供 API 来监视单个文件或目录,并在文件被修改时得到通知。 libuv 包装了常见的文件更改通知库。 这是 libuv 中比较不一致的部分之一。 文件更改通知系统本身在不同平台上的差异很大,因此很难让所有东西都能在任何地方工作。 为了演示,我将构建一个简单的实用程序,它在任何被监视的文件发生更改时运行用户指定的 command :

./onchange <command> <file1> [file2] ...

使用 uv_fs_event_init() 启动文件更改通知:

onchange/main.c – The setup

int main(int argc, char **argv) {
    if (argc <= 2) {
        fprintf(stderr, "Usage: %s <command> <file1> [file2 ...]\n", argv[0]);
        return 1;
    }

    loop = uv_default_loop();
    command = argv[1];

    while (argc-- > 2) {
        fprintf(stderr, "Adding watch on %s\n", argv[argc]);
        uv_fs_event_t *fs_event_req = malloc(sizeof(uv_fs_event_t));
        uv_fs_event_init(loop, fs_event_req);
        // The recursive flag watches subdirectories too.
        uv_fs_event_start(fs_event_req, run_command, argv[argc], UV_FS_EVENT_RECURSIVE);
    }

    return uv_run(loop, UV_RUN_DEFAULT);
}

第三个参数是要监控的实际文件或目录。 最后一个参数 flags 可以是:

/*
* Flags to be passed to uv_fs_event_start().
*/
enum uv_fs_event_flags {
    UV_FS_EVENT_WATCH_ENTRY = 1,
    UV_FS_EVENT_STAT = 2,
    UV_FS_EVENT_RECURSIVE = 4
};

UV_FS_EVENT_WATCH_ENTRY 和 UV_FS_EVENT_STAT (还)不做任何事情。 UV_FS_EVENT_RECURSIVE 也将在支持的平台上开始监视子目录。

回调将接收以下参数:

  1. uv_fs_event_t *handle – 句柄。 句柄的path字段是设置监视的文件。
  2. const char *filename – 如果正在监视目录,则这是已更改的文件。 仅在 Linux 和 Windows 上非空, 在其他平台可能为空。
  3. int flags – UV_RENAME 或 UV_CHANGE 之一, 或两者按位或。
  4. int status – 当前为 0。

在我们的示例中,我们只需打印参数并使用 system() 运行 command 。

void run_command(uv_fs_event_t *handle, const char *filename, int events, int status) {
    char path[1024];
    size_t size = 1023;
    // Does not handle error if path is longer than 1023.
    uv_fs_event_getpath(handle, path, &size);
    path[size] = '\0';

    fprintf(stderr, "Change detected in %s: ", path);
    if (events & UV_RENAME)
        fprintf(stderr, "renamed");
    if (events & UV_CHANGE)
        fprintf(stderr, "changed");

    fprintf(stderr, " %s\n", filename ? filename : "");
    system(command);
}

一条评论

  1. Pingback:libuv : User guide » Utilities – Kingdo Station

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注