| /* SPDX-License-Identifier: MIT */ |
| /* |
| * Description: UBLK_F_BATCH_IO buffer management |
| */ |
| |
| #include "kublk.h" |
| |
| static inline void *ublk_get_commit_buf(struct ublk_thread *t, |
| unsigned short buf_idx) |
| { |
| unsigned idx; |
| |
| if (buf_idx < t->commit_buf_start || |
| buf_idx >= t->commit_buf_start + t->nr_commit_buf) |
| return NULL; |
| idx = buf_idx - t->commit_buf_start; |
| return t->commit_buf + idx * t->commit_buf_size; |
| } |
| |
| /* |
| * Allocate one buffer for UBLK_U_IO_PREP_IO_CMDS or UBLK_U_IO_COMMIT_IO_CMDS |
| * |
| * Buffer index is returned. |
| */ |
| static inline unsigned short ublk_alloc_commit_buf(struct ublk_thread *t) |
| { |
| int idx = allocator_get(&t->commit_buf_alloc); |
| |
| if (idx >= 0) |
| return idx + t->commit_buf_start; |
| return UBLKS_T_COMMIT_BUF_INV_IDX; |
| } |
| |
| /* |
| * Free one commit buffer which is used by UBLK_U_IO_PREP_IO_CMDS or |
| * UBLK_U_IO_COMMIT_IO_CMDS |
| */ |
| static inline void ublk_free_commit_buf(struct ublk_thread *t, |
| unsigned short i) |
| { |
| unsigned short idx = i - t->commit_buf_start; |
| |
| ublk_assert(idx < t->nr_commit_buf); |
| ublk_assert(allocator_get_val(&t->commit_buf_alloc, idx) != 0); |
| |
| allocator_put(&t->commit_buf_alloc, idx); |
| } |
| |
| static unsigned char ublk_commit_elem_buf_size(struct ublk_dev *dev) |
| { |
| if (dev->dev_info.flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_USER_COPY | |
| UBLK_F_AUTO_BUF_REG)) |
| return 8; |
| |
| /* one extra 8bytes for carrying buffer address */ |
| return 16; |
| } |
| |
| static unsigned ublk_commit_buf_size(struct ublk_thread *t) |
| { |
| struct ublk_dev *dev = t->dev; |
| unsigned elem_size = ublk_commit_elem_buf_size(dev); |
| unsigned int total = elem_size * dev->dev_info.queue_depth; |
| unsigned int page_sz = getpagesize(); |
| |
| return round_up(total, page_sz); |
| } |
| |
| static void free_batch_commit_buf(struct ublk_thread *t) |
| { |
| if (t->commit_buf) { |
| unsigned buf_size = ublk_commit_buf_size(t); |
| unsigned int total = buf_size * t->nr_commit_buf; |
| |
| munlock(t->commit_buf, total); |
| free(t->commit_buf); |
| } |
| allocator_deinit(&t->commit_buf_alloc); |
| free(t->commit); |
| } |
| |
| static int alloc_batch_commit_buf(struct ublk_thread *t) |
| { |
| unsigned buf_size = ublk_commit_buf_size(t); |
| unsigned int total = buf_size * t->nr_commit_buf; |
| unsigned int page_sz = getpagesize(); |
| void *buf = NULL; |
| int i, ret, j = 0; |
| |
| t->commit = calloc(t->nr_queues, sizeof(*t->commit)); |
| for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) { |
| if (t->q_map[i]) |
| t->commit[j++].q_id = i; |
| } |
| |
| allocator_init(&t->commit_buf_alloc, t->nr_commit_buf); |
| |
| t->commit_buf = NULL; |
| ret = posix_memalign(&buf, page_sz, total); |
| if (ret || !buf) |
| goto fail; |
| |
| t->commit_buf = buf; |
| |
| /* lock commit buffer pages for fast access */ |
| if (mlock(t->commit_buf, total)) |
| ublk_err("%s: can't lock commit buffer %s\n", __func__, |
| strerror(errno)); |
| |
| return 0; |
| |
| fail: |
| free_batch_commit_buf(t); |
| return ret; |
| } |
| |
| static unsigned int ublk_thread_nr_queues(const struct ublk_thread *t) |
| { |
| int i; |
| int ret = 0; |
| |
| for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) |
| ret += !!t->q_map[i]; |
| |
| return ret; |
| } |
| |
| void ublk_batch_prepare(struct ublk_thread *t) |
| { |
| /* |
| * We only handle single device in this thread context. |
| * |
| * All queues have same feature flags, so use queue 0's for |
| * calculate uring_cmd flags. |
| * |
| * This way looks not elegant, but it works so far. |
| */ |
| struct ublk_queue *q = &t->dev->q[0]; |
| |
| /* cache nr_queues because we don't support dynamic load-balance yet */ |
| t->nr_queues = ublk_thread_nr_queues(t); |
| |
| t->commit_buf_elem_size = ublk_commit_elem_buf_size(t->dev); |
| t->commit_buf_size = ublk_commit_buf_size(t); |
| t->commit_buf_start = t->nr_bufs; |
| t->nr_commit_buf = 2 * t->nr_queues; |
| t->nr_bufs += t->nr_commit_buf; |
| |
| t->cmd_flags = 0; |
| if (ublk_queue_use_auto_zc(q)) { |
| if (ublk_queue_auto_zc_fallback(q)) |
| t->cmd_flags |= UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK; |
| } else if (!ublk_queue_no_buf(q)) |
| t->cmd_flags |= UBLK_BATCH_F_HAS_BUF_ADDR; |
| |
| t->state |= UBLKS_T_BATCH_IO; |
| |
| ublk_log("%s: thread %d commit(nr_bufs %u, buf_size %u, start %u)\n", |
| __func__, t->idx, |
| t->nr_commit_buf, t->commit_buf_size, |
| t->nr_bufs); |
| } |
| |
| static void free_batch_fetch_buf(struct ublk_thread *t) |
| { |
| int i; |
| |
| for (i = 0; i < t->nr_fetch_bufs; i++) { |
| io_uring_free_buf_ring(&t->ring, t->fetch[i].br, 1, i); |
| munlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size); |
| free(t->fetch[i].fetch_buf); |
| } |
| free(t->fetch); |
| } |
| |
| static int alloc_batch_fetch_buf(struct ublk_thread *t) |
| { |
| /* page aligned fetch buffer, and it is mlocked for speedup delivery */ |
| unsigned pg_sz = getpagesize(); |
| unsigned buf_size = round_up(t->dev->dev_info.queue_depth * 2, pg_sz); |
| int ret; |
| int i = 0; |
| |
| /* double fetch buffer for each queue */ |
| t->nr_fetch_bufs = t->nr_queues * 2; |
| t->fetch = calloc(t->nr_fetch_bufs, sizeof(*t->fetch)); |
| |
| /* allocate one buffer for each queue */ |
| for (i = 0; i < t->nr_fetch_bufs; i++) { |
| t->fetch[i].fetch_buf_size = buf_size; |
| |
| if (posix_memalign((void **)&t->fetch[i].fetch_buf, pg_sz, |
| t->fetch[i].fetch_buf_size)) |
| return -ENOMEM; |
| |
| /* lock fetch buffer page for fast fetching */ |
| if (mlock(t->fetch[i].fetch_buf, t->fetch[i].fetch_buf_size)) |
| ublk_err("%s: can't lock fetch buffer %s\n", __func__, |
| strerror(errno)); |
| t->fetch[i].br = io_uring_setup_buf_ring(&t->ring, 1, |
| i, IOU_PBUF_RING_INC, &ret); |
| if (!t->fetch[i].br) { |
| ublk_err("Buffer ring register failed %d\n", ret); |
| return ret; |
| } |
| } |
| |
| return 0; |
| } |
| |
| int ublk_batch_alloc_buf(struct ublk_thread *t) |
| { |
| int ret; |
| |
| ublk_assert(t->nr_commit_buf < 2 * UBLK_MAX_QUEUES); |
| |
| ret = alloc_batch_commit_buf(t); |
| if (ret) |
| return ret; |
| return alloc_batch_fetch_buf(t); |
| } |
| |
| void ublk_batch_free_buf(struct ublk_thread *t) |
| { |
| free_batch_commit_buf(t); |
| free_batch_fetch_buf(t); |
| } |
| |
| static void ublk_init_batch_cmd(struct ublk_thread *t, __u16 q_id, |
| struct io_uring_sqe *sqe, unsigned op, |
| unsigned short elem_bytes, |
| unsigned short nr_elem, |
| unsigned short buf_idx) |
| { |
| struct ublk_batch_io *cmd; |
| __u64 user_data; |
| |
| cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe); |
| |
| ublk_set_sqe_cmd_op(sqe, op); |
| |
| sqe->fd = 0; /* dev->fds[0] */ |
| sqe->opcode = IORING_OP_URING_CMD; |
| sqe->flags = IOSQE_FIXED_FILE; |
| |
| cmd->q_id = q_id; |
| cmd->flags = 0; |
| cmd->reserved = 0; |
| cmd->elem_bytes = elem_bytes; |
| cmd->nr_elem = nr_elem; |
| |
| user_data = build_user_data(buf_idx, _IOC_NR(op), nr_elem, q_id, 0); |
| io_uring_sqe_set_data64(sqe, user_data); |
| |
| t->cmd_inflight += 1; |
| |
| ublk_dbg(UBLK_DBG_IO_CMD, "%s: thread %u qid %d cmd_op %x data %lx " |
| "nr_elem %u elem_bytes %u buf_size %u buf_idx %d " |
| "cmd_inflight %u\n", |
| __func__, t->idx, q_id, op, user_data, |
| cmd->nr_elem, cmd->elem_bytes, |
| nr_elem * elem_bytes, buf_idx, t->cmd_inflight); |
| } |
| |
| static void ublk_setup_commit_sqe(struct ublk_thread *t, |
| struct io_uring_sqe *sqe, |
| unsigned short buf_idx) |
| { |
| struct ublk_batch_io *cmd; |
| |
| cmd = (struct ublk_batch_io *)ublk_get_sqe_cmd(sqe); |
| |
| /* Use plain user buffer instead of fixed buffer */ |
| cmd->flags |= t->cmd_flags; |
| } |
| |
| static void ublk_batch_queue_fetch(struct ublk_thread *t, |
| struct ublk_queue *q, |
| unsigned short buf_idx) |
| { |
| unsigned short nr_elem = t->fetch[buf_idx].fetch_buf_size / 2; |
| struct io_uring_sqe *sqe; |
| |
| io_uring_buf_ring_add(t->fetch[buf_idx].br, t->fetch[buf_idx].fetch_buf, |
| t->fetch[buf_idx].fetch_buf_size, |
| 0, 0, 0); |
| io_uring_buf_ring_advance(t->fetch[buf_idx].br, 1); |
| |
| ublk_io_alloc_sqes(t, &sqe, 1); |
| |
| ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_FETCH_IO_CMDS, 2, nr_elem, |
| buf_idx); |
| |
| sqe->rw_flags= IORING_URING_CMD_MULTISHOT; |
| sqe->buf_group = buf_idx; |
| sqe->flags |= IOSQE_BUFFER_SELECT; |
| |
| t->fetch[buf_idx].fetch_buf_off = 0; |
| } |
| |
| void ublk_batch_start_fetch(struct ublk_thread *t) |
| { |
| int i; |
| int j = 0; |
| |
| for (i = 0; i < t->dev->dev_info.nr_hw_queues; i++) { |
| if (t->q_map[i]) { |
| struct ublk_queue *q = &t->dev->q[i]; |
| |
| /* submit two fetch commands for each queue */ |
| ublk_batch_queue_fetch(t, q, j++); |
| ublk_batch_queue_fetch(t, q, j++); |
| } |
| } |
| } |
| |
| static unsigned short ublk_compl_batch_fetch(struct ublk_thread *t, |
| struct ublk_queue *q, |
| const struct io_uring_cqe *cqe) |
| { |
| unsigned short buf_idx = user_data_to_tag(cqe->user_data); |
| unsigned start = t->fetch[buf_idx].fetch_buf_off; |
| unsigned end = start + cqe->res; |
| void *buf = t->fetch[buf_idx].fetch_buf; |
| int i; |
| |
| if (cqe->res < 0) |
| return buf_idx; |
| |
| if ((end - start) / 2 > q->q_depth) { |
| ublk_err("%s: fetch duplicated ios offset %u count %u\n", __func__, start, cqe->res); |
| |
| for (i = start; i < end; i += 2) { |
| unsigned short tag = *(unsigned short *)(buf + i); |
| |
| ublk_err("%u ", tag); |
| } |
| ublk_err("\n"); |
| } |
| |
| for (i = start; i < end; i += 2) { |
| unsigned short tag = *(unsigned short *)(buf + i); |
| |
| if (tag >= q->q_depth) |
| ublk_err("%s: bad tag %u\n", __func__, tag); |
| |
| if (q->tgt_ops->queue_io) |
| q->tgt_ops->queue_io(t, q, tag); |
| } |
| t->fetch[buf_idx].fetch_buf_off = end; |
| return buf_idx; |
| } |
| |
| static int __ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q) |
| { |
| unsigned short nr_elem = q->q_depth; |
| unsigned short buf_idx = ublk_alloc_commit_buf(t); |
| struct io_uring_sqe *sqe; |
| void *buf; |
| int i; |
| |
| ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX); |
| |
| ublk_io_alloc_sqes(t, &sqe, 1); |
| |
| ublk_assert(nr_elem == q->q_depth); |
| buf = ublk_get_commit_buf(t, buf_idx); |
| for (i = 0; i < nr_elem; i++) { |
| struct ublk_batch_elem *elem = (struct ublk_batch_elem *)( |
| buf + i * t->commit_buf_elem_size); |
| struct ublk_io *io = &q->ios[i]; |
| |
| elem->tag = i; |
| elem->result = 0; |
| |
| if (ublk_queue_use_auto_zc(q)) |
| elem->buf_index = ublk_batch_io_buf_idx(t, q, i); |
| else if (!ublk_queue_no_buf(q)) |
| elem->buf_addr = (__u64)io->buf_addr; |
| } |
| |
| sqe->addr = (__u64)buf; |
| sqe->len = t->commit_buf_elem_size * nr_elem; |
| |
| ublk_init_batch_cmd(t, q->q_id, sqe, UBLK_U_IO_PREP_IO_CMDS, |
| t->commit_buf_elem_size, nr_elem, buf_idx); |
| ublk_setup_commit_sqe(t, sqe, buf_idx); |
| return 0; |
| } |
| |
| int ublk_batch_queue_prep_io_cmds(struct ublk_thread *t, struct ublk_queue *q) |
| { |
| int ret = 0; |
| |
| pthread_spin_lock(&q->lock); |
| if (q->flags & UBLKS_Q_PREPARED) |
| goto unlock; |
| ret = __ublk_batch_queue_prep_io_cmds(t, q); |
| if (!ret) |
| q->flags |= UBLKS_Q_PREPARED; |
| unlock: |
| pthread_spin_unlock(&q->lock); |
| |
| return ret; |
| } |
| |
| static void ublk_batch_compl_commit_cmd(struct ublk_thread *t, |
| const struct io_uring_cqe *cqe, |
| unsigned op) |
| { |
| unsigned short buf_idx = user_data_to_tag(cqe->user_data); |
| |
| if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS)) |
| ublk_assert(cqe->res == 0); |
| else if (op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) { |
| int nr_elem = user_data_to_tgt_data(cqe->user_data); |
| |
| ublk_assert(cqe->res == t->commit_buf_elem_size * nr_elem); |
| } else |
| ublk_assert(0); |
| |
| ublk_free_commit_buf(t, buf_idx); |
| } |
| |
| void ublk_batch_compl_cmd(struct ublk_thread *t, |
| const struct io_uring_cqe *cqe) |
| { |
| unsigned op = user_data_to_op(cqe->user_data); |
| struct ublk_queue *q; |
| unsigned buf_idx; |
| unsigned q_id; |
| |
| if (op == _IOC_NR(UBLK_U_IO_PREP_IO_CMDS) || |
| op == _IOC_NR(UBLK_U_IO_COMMIT_IO_CMDS)) { |
| t->cmd_inflight--; |
| ublk_batch_compl_commit_cmd(t, cqe, op); |
| return; |
| } |
| |
| /* FETCH command is per queue */ |
| q_id = user_data_to_q_id(cqe->user_data); |
| q = &t->dev->q[q_id]; |
| buf_idx = ublk_compl_batch_fetch(t, q, cqe); |
| |
| if (cqe->res < 0 && cqe->res != -ENOBUFS) { |
| t->cmd_inflight--; |
| t->state |= UBLKS_T_STOPPING; |
| } else if (!(cqe->flags & IORING_CQE_F_MORE) || cqe->res == -ENOBUFS) { |
| t->cmd_inflight--; |
| ublk_batch_queue_fetch(t, q, buf_idx); |
| } |
| } |
| |
| static void __ublk_batch_commit_io_cmds(struct ublk_thread *t, |
| struct batch_commit_buf *cb) |
| { |
| struct io_uring_sqe *sqe; |
| unsigned short buf_idx; |
| unsigned short nr_elem = cb->done; |
| |
| /* nothing to commit */ |
| if (!nr_elem) { |
| ublk_free_commit_buf(t, cb->buf_idx); |
| return; |
| } |
| |
| ublk_io_alloc_sqes(t, &sqe, 1); |
| buf_idx = cb->buf_idx; |
| sqe->addr = (__u64)cb->elem; |
| sqe->len = nr_elem * t->commit_buf_elem_size; |
| |
| /* commit isn't per-queue command */ |
| ublk_init_batch_cmd(t, cb->q_id, sqe, UBLK_U_IO_COMMIT_IO_CMDS, |
| t->commit_buf_elem_size, nr_elem, buf_idx); |
| ublk_setup_commit_sqe(t, sqe, buf_idx); |
| } |
| |
| void ublk_batch_commit_io_cmds(struct ublk_thread *t) |
| { |
| int i; |
| |
| for (i = 0; i < t->nr_queues; i++) { |
| struct batch_commit_buf *cb = &t->commit[i]; |
| |
| if (cb->buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX) |
| __ublk_batch_commit_io_cmds(t, cb); |
| } |
| |
| } |
| |
| static void __ublk_batch_init_commit(struct ublk_thread *t, |
| struct batch_commit_buf *cb, |
| unsigned short buf_idx) |
| { |
| /* so far only support 1:1 queue/thread mapping */ |
| cb->buf_idx = buf_idx; |
| cb->elem = ublk_get_commit_buf(t, buf_idx); |
| cb->done = 0; |
| cb->count = t->commit_buf_size / |
| t->commit_buf_elem_size; |
| } |
| |
| /* COMMIT_IO_CMDS is per-queue command, so use its own commit buffer */ |
| static void ublk_batch_init_commit(struct ublk_thread *t, |
| struct batch_commit_buf *cb) |
| { |
| unsigned short buf_idx = ublk_alloc_commit_buf(t); |
| |
| ublk_assert(buf_idx != UBLKS_T_COMMIT_BUF_INV_IDX); |
| ublk_assert(!ublk_batch_commit_prepared(cb)); |
| |
| __ublk_batch_init_commit(t, cb, buf_idx); |
| } |
| |
| void ublk_batch_prep_commit(struct ublk_thread *t) |
| { |
| int i; |
| |
| for (i = 0; i < t->nr_queues; i++) |
| t->commit[i].buf_idx = UBLKS_T_COMMIT_BUF_INV_IDX; |
| } |
| |
| void ublk_batch_complete_io(struct ublk_thread *t, struct ublk_queue *q, |
| unsigned tag, int res) |
| { |
| unsigned q_t_idx = ublk_queue_idx_in_thread(t, q); |
| struct batch_commit_buf *cb = &t->commit[q_t_idx]; |
| struct ublk_batch_elem *elem; |
| struct ublk_io *io = &q->ios[tag]; |
| |
| if (!ublk_batch_commit_prepared(cb)) |
| ublk_batch_init_commit(t, cb); |
| |
| ublk_assert(q->q_id == cb->q_id); |
| |
| elem = (struct ublk_batch_elem *)(cb->elem + cb->done * t->commit_buf_elem_size); |
| elem->tag = tag; |
| elem->buf_index = ublk_batch_io_buf_idx(t, q, tag); |
| elem->result = res; |
| |
| if (!ublk_queue_no_buf(q)) |
| elem->buf_addr = (__u64) (uintptr_t) io->buf_addr; |
| |
| cb->done += 1; |
| ublk_assert(cb->done <= cb->count); |
| } |
| |
| void ublk_batch_setup_map(unsigned char (*q_thread_map)[UBLK_MAX_QUEUES], |
| int nthreads, int queues) |
| { |
| int i, j; |
| |
| /* |
| * Setup round-robin queue-to-thread mapping for arbitrary N:M combinations. |
| * |
| * This algorithm distributes queues across threads (and threads across queues) |
| * in a balanced round-robin fashion to ensure even load distribution. |
| * |
| * Examples: |
| * - 2 threads, 4 queues: T0=[Q0,Q2], T1=[Q1,Q3] |
| * - 4 threads, 2 queues: T0=[Q0], T1=[Q1], T2=[Q0], T3=[Q1] |
| * - 3 threads, 3 queues: T0=[Q0], T1=[Q1], T2=[Q2] (1:1 mapping) |
| * |
| * Phase 1: Mark which queues each thread handles (boolean mapping) |
| */ |
| for (i = 0, j = 0; i < queues || j < nthreads; i++, j++) { |
| q_thread_map[j % nthreads][i % queues] = 1; |
| } |
| |
| /* |
| * Phase 2: Convert boolean mapping to sequential indices within each thread. |
| * |
| * Transform from: q_thread_map[thread][queue] = 1 (handles queue) |
| * To: q_thread_map[thread][queue] = N (queue index within thread) |
| * |
| * This allows each thread to know the local index of each queue it handles, |
| * which is essential for buffer allocation and management. For example: |
| * - Thread 0 handling queues [0,2] becomes: q_thread_map[0][0]=1, q_thread_map[0][2]=2 |
| * - Thread 1 handling queues [1,3] becomes: q_thread_map[1][1]=1, q_thread_map[1][3]=2 |
| */ |
| for (j = 0; j < nthreads; j++) { |
| unsigned char seq = 1; |
| |
| for (i = 0; i < queues; i++) { |
| if (q_thread_map[j][i]) |
| q_thread_map[j][i] = seq++; |
| } |
| } |
| |
| #if 0 |
| for (j = 0; j < nthreads; j++) { |
| printf("thread %0d: ", j); |
| for (i = 0; i < queues; i++) { |
| if (q_thread_map[j][i]) |
| printf("%03u ", i); |
| } |
| printf("\n"); |
| } |
| printf("\n"); |
| for (j = 0; j < nthreads; j++) { |
| for (i = 0; i < queues; i++) { |
| printf("%03u ", q_thread_map[j][i]); |
| } |
| printf("\n"); |
| } |
| #endif |
| } |