一. 前言

上文中我们介绍了进程间通信的方法之一:信号,本文将继续介绍另一种进程间通信的方法,即管道。管道是Linux中使用shell经常用到的一个技术,本文将深入剖析管道的实现和运行逻辑。

二. 管道简介

在Linux的日常使用中,我们常常会用到管道,如下所示

ps -ef | grep 关键字 | awk '{print $2}' | xargs kill -9

这里面的竖线|就是一个管道。它会将前一个命令的输出,作为后一个命令的输入。从管道的这个名称可以看出来,管道是一种单向传输数据的机制,它其实是一段缓存,里面的数据只能从一端写入,从另一端读出。如果想互相通信,我们需要创建两个管道才行。

管道分为两种类型,| 表示的管道称为匿名管道,意思就是这个类型的管道没有名字,用完了就销毁了。就像上面那个命令里面的一样,竖线代表的管道随着命令的执行自动创建、自动销毁。用户甚至都不知道自己在用管道这种技术,就已经解决了问题。另外一种类型是命名管道。这个类型的管道需要通过 mkfifo 命令显式地创建。

mkfifo hello

我们可以往管道里面写入东西。例如,写入一个字符串。

# echo "hello world" > hello

这个时候管道里面的内容没有被读出,这个命令就会停在这里。这个时候,我们就需要重新连接一个终端。在终端中用下面的命令读取管道里面的内容:

# cat < hello hello world

一方面,我们能够看到,管道里面的内容被读取出来,打印到了终端上;另一方面,echo 那个命令正常退出了。这就是有名管道的执行流程。

三. 匿名管道创建

实际管道的创建调用的是系统调用pipe(),该函数建了一个管道 pipe,返回了两个文件描述符,这表示管道的两端,一个是管道的读取端描述符 fd[0],另一个是管道的写入端描述符 fd[1]。

int pipe(int fd[2])

其内核实现如下所示,pipe2 ()调用 __do_pipe_flags() 创建一个数组 files来存放管道的两端的打开文件,另一个数组 fd 存放管道的两端的文件描述符。如果 __do_pipe_flags() 没有错误,那就调用 fd_install()将两个 fd 和两个 struct file 关联起来,这一点和打开一个文件的过程类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
SYSCALL_DEFINE1(pipe, int __user *, fildes)
{
return sys_pipe2(fildes, 0);
}

SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags)
{
struct file *files[2];
int fd[2];
int error;

error = __do_pipe_flags(fd, files, flags);
if (!error) {
if (unlikely(copy_to_user(fildes, fd, sizeof(fd)))) {
......
error = -EFAULT;
} else {
fd_install(fd[0], files[0]);
fd_install(fd[1], files[1]);
}
}
return error;
}

__do_pipe_flags()调用了create_pipe_files()生成fd,然后调用get_unused_fd_flags()赋值fdr和fdw,即读文件描述符和写文件描述符。由此也可以看出管道的特性:由一端写入,由另一端读出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int __do_pipe_flags(int *fd, struct file **files, int flags)
{
int error;
int fdw, fdr;
......
error = create_pipe_files(files, flags);
......
error = get_unused_fd_flags(flags);
......
fdr = error;
error = get_unused_fd_flags(flags);
......
fdw = error;
audit_fd_pair(fdr, fdw);
fd[0] = fdr;
fd[1] = fdw;
return 0;
......
}

create_pipe_files()是管道创建的关键逻辑,从这里可以看出来管道实际上也是一种抽象的文件系统pipefs,有着对应的特殊文件以及inode。这里首先通过get_pipe_inode()获取特殊inode,然后调用alloc_file_pseudo()通过inode以及对应的挂载结构体pipe_mnt,文件操作结构体pipefifo_fops创建关联的dentry并以此创建文件结构体并分配内存,通过alloc_file_clone()创建一份新的file后将两个文件分别保存在res[0]和res[1]中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int create_pipe_files(struct file **res, int flags)
{
struct inode *inode = get_pipe_inode();
struct file *f;
if (!inode)
return -ENFILE;
f = alloc_file_pseudo(inode, pipe_mnt, "",
O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT)),
&pipefifo_fops);
if (IS_ERR(f)) {
free_pipe_info(inode->i_pipe);
iput(inode);
return PTR_ERR(f);
}
f->private_data = inode->i_pipe;
res[0] = alloc_file_clone(f, O_RDONLY | (flags & O_NONBLOCK),
&pipefifo_fops);
if (IS_ERR(res[0])) {
put_pipe_info(inode, inode->i_pipe);
fput(f);
return PTR_ERR(res[0]);
}
res[0]->private_data = inode->i_pipe;
res[1] = f;
return 0;
}

其虚拟文件系统pipefs对应的结构体和操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static struct file_system_type pipe_fs_type = {
.name = "pipefs",
.mount = pipefs_mount,
.kill_sb = kill_anon_super,
};

static int __init init_pipe_fs(void)
{
int err = register_filesystem(&pipe_fs_type);

if (!err) {
pipe_mnt = kern_mount(&pipe_fs_type);
}
......
}

const struct file_operations pipefifo_fops = {
.open = fifo_open,
.llseek = no_llseek,
.read_iter = pipe_read,
.write_iter = pipe_write,
.poll = pipe_poll,
.unlocked_ioctl = pipe_ioctl,
.release = pipe_release,
.fasync = pipe_fasync,
};

static struct inode * get_pipe_inode(void)
{
struct inode *inode = new_inode_pseudo(pipe_mnt->mnt_sb);
struct pipe_inode_info *pipe;
......
inode->i_ino = get_next_ino();

pipe = alloc_pipe_info();
......
inode->i_pipe = pipe;
pipe->files = 2;
pipe->readers = pipe->writers = 1;
inode->i_fop = &pipefifo_fops;
inode->i_state = I_DIRTY;
inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;
inode->i_uid = current_fsuid();
inode->i_gid = current_fsgid();
inode->i_atime = inode->i_mtime = inode->i_ctime = current_time(inode);

return inode;
......
}

至此,一个匿名管道就创建成功了。如果对于 fd[1]写入,调用的是 pipe_write(),向 pipe_buffer 里面写入数据;如果对于 fd[0]的读入,调用的是 pipe_read(),也就是从 pipe_buffer 里面读取数据。至此,我们在一个进程内创建了管道,但是尚未实现进程间通信。

四. 匿名管道通信

在上文中我们提到了匿名管道通过|符号实现进程间的通信,传递输入给下一个进程作为输出,其实现原理如下:

  • 利用fork创建子进程,复制file_struct会同样复制fd输入输出数组,但是fd指向的文件仅有一份,即两个进程间可以通过fd数组实现对同一个管道文件的跨进程读写操作
  • 禁用父进程的读,禁用子进程的写,即从父进程写入从子进程读出,从而实现了单向管道,避免了混乱
  • 对于A|B来说,shell首先创建子进程A,接着创建子进程B,由于二者均从shell创建,因此共用fd数组。shell关闭读写,A开写B开读,从而实现了A 和B之间的通信。

image

接着我们需要调用dup2()实现输入输出和管道两端的关联,该函数会将fd赋值给fd2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Duplicate FD to FD2, closing the old FD2 and making FD2 be
open the same file as FD is. Return FD2 or -1. */
int
__dup2 (int fd, int fd2)
{
if (fd < 0 || fd2 < 0)
{
__set_errno (EBADF);
return -1;
}
if (fd == fd2)
/* No way to check that they are valid. */
return fd2;
__set_errno (ENOSYS);
return -1;
}

在 files_struct 里面,有这样一个表,下标是 fd,内容指向一个打开的文件 struct file。在这个表里面,前三项是定下来的,其中第零项 STDIN_FILENO 表示标准输入,第一项 STDOUT_FILENO 表示标准输出,第三项 STDERR_FILENO 表示错误输出。

1
2
3
struct files_struct {
struct file __rcu * fd_array[NR_OPEN_DEFAULT];
}
  • 在 A 进程写入端通过dup2(fd[1],STDOUT_FILENO)将 STDOUT_FILENO(也即第一项)不再指向标准输出,而是指向创建的管道文件,那么以后往标准输出写入的任何东西,都会写入管道文件。
  • 在 B 进程中读取端通过dup2(fd[0],STDIN_FILENO)将 STDIN_FILENO 也即第零项不再指向标准输入,而是指向创建的管道文件,那么以后从标准输入读取的任何东西,都来自于管道文件。

至此,我们将 A|B 的功能完成。

image

五. 有名管道

对于有名管道,我们需要通过mkfifo创建,实际调用__xmknod()函数,最终调用mknod(),和字符设备创建一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* Create a named pipe (FIFO) named PATH with protections MODE.  */
int
mkfifo (const char *path, mode_t mode)
{
dev_t dev = 0;
return __xmknod (_MKNOD_VER, path, mode | S_IFIFO, &dev);
}

/* Create a device file named PATH, with permission and special bits MODE
and device number DEV (which can be constructed from major and minor
device numbers with the `makedev' macro above). */
int
__xmknod (int vers, const char *path, mode_t mode, dev_t *dev)
{
unsigned long long int k_dev;
if (vers != _MKNOD_VER)
return INLINE_SYSCALL_ERROR_RETURN_VALUE (EINVAL);
/* We must convert the value to dev_t type used by the kernel. */
k_dev = (*dev) & ((1ULL << 32) - 1);
if (k_dev != *dev)
return INLINE_SYSCALL_ERROR_RETURN_VALUE (EINVAL);
return INLINE_SYSCALL (mknod, 3, path, mode, (unsigned int) k_dev);
}

mknod 在字符设备那一节已经解析过了,先是通过 user_path_create() 对于这个管道文件创建一个 dentry,然后因为是 S_IFIFO,所以调用 vfs_mknod()。由于这个管道文件是创建在一个普通文件系统上的,假设是在 ext4 文件上,于是 vfs_mknod 会调用 ext4_dir_inode_operations 的 mknod,也即会调用 ext4_mknod()。

在 ext4_mknod() 中,ext4_new_inode_start_handle() 会调用 __ext4_new_inode(),在 ext4 文件系统上真的创建一个文件,但是会调用 init_special_inode(),创建一个内存中特殊的 inode,这个函数我们在字符设备文件中也遇到过,只不过当时 inode 的 i_fop 指向的是 def_chr_fops,这次换成管道文件了,inode 的 i_fop 变成指向 pipefifo_fops,这一点和匿名管道是一样的。这样,管道文件就创建完毕了。

接下来,要打开这个管道文件,我们还是会调用文件系统的 open() 函数。还是沿着文件系统的调用方式,一路调用到 pipefifo_fops 的 open() 函数,也就是 fifo_open()。在 fifo_open() 里面会创建 pipe_inode_info,这一点和匿名管道也是一样的。这个结构里面有个成员是 struct pipe_buffer *bufs。我们可以知道,所谓的命名管道,其实是也是内核里面的一串缓存。接下来,对于命名管道的写入,我们还是会调用 pipefifo_fops 的 pipe_write() 函数,向 pipe_buffer 里面写入数据。对于命名管道的读入,我们还是会调用 pipefifo_fops 的 pipe_read(),也就是从 pipe_buffer 里面读取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
static int fifo_open(struct inode *inode, struct file *filp)
{
struct pipe_inode_info *pipe;
bool is_pipe = inode->i_sb->s_magic == PIPEFS_MAGIC;
int ret;
filp->f_version = 0;
spin_lock(&inode->i_lock);
if (inode->i_pipe) {
pipe = inode->i_pipe;
pipe->files++;
spin_unlock(&inode->i_lock);
} else {
spin_unlock(&inode->i_lock);
pipe = alloc_pipe_info();
if (!pipe)
return -ENOMEM;
pipe->files = 1;
spin_lock(&inode->i_lock);
if (unlikely(inode->i_pipe)) {
inode->i_pipe->files++;
spin_unlock(&inode->i_lock);
free_pipe_info(pipe);
pipe = inode->i_pipe;
} else {
inode->i_pipe = pipe;
spin_unlock(&inode->i_lock);
}
}
filp->private_data = pipe;
/* OK, we have a pipe and it's pinned down */
__pipe_lock(pipe);
/* We can only do regular read/write on fifos */
filp->f_mode &= (FMODE_READ | FMODE_WRITE);
switch (filp->f_mode) {
case FMODE_READ:
/*
* O_RDONLY
* POSIX.1 says that O_NONBLOCK means return with the FIFO
* opened, even when there is no process writing the FIFO.
*/
pipe->r_counter++;
if (pipe->readers++ == 0)
wake_up_partner(pipe);
if (!is_pipe && !pipe->writers) {
if ((filp->f_flags & O_NONBLOCK)) {
/* suppress EPOLLHUP until we have
* seen a writer */
filp->f_version = pipe->w_counter;
} else {
if (wait_for_partner(pipe, &pipe->w_counter))
goto err_rd;
}
}
break;

case FMODE_WRITE:
/*
* O_WRONLY
* POSIX.1 says that O_NONBLOCK means return -1 with
* errno=ENXIO when there is no process reading the FIFO.
*/
ret = -ENXIO;
if (!is_pipe && (filp->f_flags & O_NONBLOCK) && !pipe->readers)
goto err;
pipe->w_counter++;
if (!pipe->writers++)
wake_up_partner(pipe);
if (!is_pipe && !pipe->readers) {
if (wait_for_partner(pipe, &pipe->r_counter))
goto err_wr;
}
break;

case FMODE_READ | FMODE_WRITE:
/*
* O_RDWR
* POSIX.1 leaves this case "undefined" when O_NONBLOCK is set.
* This implementation will NEVER block on a O_RDWR open, since
* the process can at least talk to itself.
*/
pipe->readers++;
pipe->writers++;
pipe->r_counter++;
pipe->w_counter++;
if (pipe->readers == 1 || pipe->writers == 1)
wake_up_partner(pipe);
break;
default:
ret = -EINVAL;
goto err;
}
/* Ok! */
__pipe_unlock(pipe);
return 0;
......
}

总结

无论是匿名管道还是命名管道,在内核都是一个文件。只要是文件就要有一个 inode。在这种特殊的 inode 里面,file_operations 指向管道特殊的 pipefifo_fops,这个 inode 对应内存里面的缓存。当我们用文件的 open 函数打开这个管道设备文件的时候,会调用 pipefifo_fops 里面的方法创建 struct file 结构,他的 inode 指向特殊的 inode,也对应内存里面的缓存,file_operations 也指向管道特殊的 pipefifo_fops。写入一个 pipe 就是从 struct file 结构找到缓存写入,读取一个 pipe 就是从 struct file 结构找到缓存读出。匿名管道和命名管道区别就在于匿名管道会通过dup2()指定输入输出源,完成之后立即释放,而命名管道通过mkfifo创建挂载后,需要手动调用pipe_read()和pipe_write()来完成其功能,表现到用户端即为前面提到的例子。