diff --git a/fs/fuse/control.c b/fs/fuse/control.c index 5247df896c5d01..10d6a08e6a7b02 100644 --- a/fs/fuse/control.c +++ b/fs/fuse/control.c @@ -203,6 +203,125 @@ static const struct file_operations fuse_conn_congestion_threshold_ops = { .write = fuse_conn_congestion_threshold_write, }; +static ssize_t fuse_conn_writethrough_threshold_read(struct file *file, + char __user *buf, + size_t len, loff_t *ppos) +{ + struct fuse_conn *fc; + unsigned val; + + fc = fuse_ctl_file_conn_get(file); + if (!fc) + return 0; + + val = READ_ONCE(fc->writethrough_threshold); + fuse_conn_put(fc); + + return fuse_conn_limit_read(file, buf, len, ppos, val); +} + +static ssize_t fuse_conn_writethrough_threshold_write(struct file *file, + const char __user *buf, + size_t count, loff_t *ppos) +{ + struct fuse_conn *fc; + char kbuf[32]; + unsigned long long val; + char *end; + + if (*ppos) + return -EINVAL; + if (count == 0 || count >= sizeof(kbuf)) + return -EINVAL; + if (copy_from_user(kbuf, buf, count)) + return -EFAULT; + kbuf[count] = '\0'; + + /* memparse accepts a bare suffix without a digit; require a digit */ + if (kbuf[0] < '0' || kbuf[0] > '9') + return -EINVAL; + + val = memparse(kbuf, &end); + end = skip_spaces(end); + if (*end) + return -EINVAL; + if (val > UINT_MAX) + return -EINVAL; + + fc = fuse_ctl_file_conn_get(file); + if (!fc) + return -ENOENT; + + WRITE_ONCE(fc->writethrough_threshold, (unsigned int)val); + fuse_conn_put(fc); + + return count; +} + +static const struct file_operations fuse_conn_writethrough_threshold_ops = { + .open = nonseekable_open, + .read = fuse_conn_writethrough_threshold_read, + .write = fuse_conn_writethrough_threshold_write, +}; + +static ssize_t fuse_conn_force_dio_on_contention_read(struct file *file, + char __user *buf, + size_t len, loff_t *ppos) +{ + struct fuse_conn *fc; + char tmp[32]; + size_t size; + int val; + + fc = fuse_ctl_file_conn_get(file); + if (!fc) + return 0; + + val = READ_ONCE(fc->force_dio_on_contention); + fuse_conn_put(fc); + + size = sprintf(tmp, "%d\n", val); + return simple_read_from_buffer(buf, len, ppos, tmp, size); +} + +static ssize_t fuse_conn_force_dio_on_contention_write(struct file *file, + const char __user *buf, + size_t count, loff_t *ppos) +{ + struct fuse_conn *fc; + int val; + int err; + + if (*ppos) + return -EINVAL; + + /* + * -1 disables; otherwise the number of other writers that must already + * hold the inode before it is latched into direct IO (0 = first writer, + * 1 = second writer, ...). + */ + err = kstrtoint_from_user(buf, count, 0, &val); + if (err) + return err; + if (val < -1) + return -EINVAL; + + fc = fuse_ctl_file_conn_get(file); + if (!fc) + return -ENOENT; + + WRITE_ONCE(fc->force_dio_on_contention, val); + fuse_conn_put(fc); + + return count; +} + +static const struct file_operations fuse_conn_force_dio_on_contention_ops = { + .open = nonseekable_open, + .read = fuse_conn_force_dio_on_contention_read, + .write = fuse_conn_force_dio_on_contention_write, +}; + static struct dentry *fuse_ctl_add_dentry(struct dentry *parent, struct fuse_conn *fc, const char *name, int mode, @@ -269,7 +388,13 @@ int fuse_ctl_add_conn(struct fuse_conn *fc) NULL, &fuse_conn_max_background_ops) || !fuse_ctl_add_dentry(parent, fc, "congestion_threshold", S_IFREG | 0600, NULL, - &fuse_conn_congestion_threshold_ops)) + &fuse_conn_congestion_threshold_ops) || + !fuse_ctl_add_dentry(parent, fc, "writethrough_threshold", + S_IFREG | 0600, NULL, + &fuse_conn_writethrough_threshold_ops) || + !fuse_ctl_add_dentry(parent, fc, "force_dio_on_contention", + S_IFREG | 0600, NULL, + &fuse_conn_force_dio_on_contention_ops)) goto err; return 0; diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c index f235417dad466b..4a25601ce93d29 100644 --- a/fs/fuse/dev_uring.c +++ b/fs/fuse/dev_uring.c @@ -22,12 +22,6 @@ MODULE_PARM_DESC(enable_uring, #define FUSE_RING_HEADER_PG 0 #define FUSE_RING_PAYLOAD_PG 1 -/* Threshold that determines if a better queue should be searched for */ -#define FUSE_URING_Q_THRESHOLD 2 - -/* Number of (re)tries to find a better queue */ -#define FUSE_URING_Q_TRIES 3 - /* redfs only to allow patch backports */ #define IO_URING_F_TASK_DEAD (1 << 13) @@ -43,6 +37,16 @@ static inline void io_uring_cmd_private_sz_check(size_t cmd_sz) ) #endif +/* Number of queued fuse requests until a queue is considered full + * Basically no entries, as synchronization is with bitmaps and lockless. I.e. + * no accuracy - queues always get a bit more requests that way. Lightly + * loaded queues is wanted to reduced kernel/userspace switches. + */ +#define FUSE_URING_QUEUE_THRESHOLD 0 + +static unsigned int fuse_uring_get_random_qid(struct fuse_ring *ring, + const struct cpumask *mask); + bool fuse_uring_enabled(void) { return enable_uring; @@ -225,22 +229,20 @@ static void io_pages_free(struct page ***pages, int npages) *pages = NULL; } -static void fuse_ring_destruct_q_map(struct fuse_queue_map *q_map) -{ - free_cpumask_var(q_map->registered_q_mask); - kfree(q_map->cpu_to_qid); -} - -static void fuse_uring_destruct_q_masks(struct fuse_ring *ring) +static void fuse_ring_destruct_q_masks(struct fuse_ring *ring) { - int node; - - fuse_ring_destruct_q_map(&ring->q_map); + free_cpumask_var(ring->avail_q_mask); + if (ring->per_numa_avail_q_mask) { + for (int node = 0; node < ring->nr_numa_nodes; node++) + free_cpumask_var(ring->per_numa_avail_q_mask[node]); + kfree(ring->per_numa_avail_q_mask); + } - if (ring->numa_q_map) { - for (node = 0; node < ring->nr_numa_nodes; node++) - fuse_ring_destruct_q_map(&ring->numa_q_map[node]); - kfree(ring->numa_q_map); + free_cpumask_var(ring->registered_q_mask); + if (ring->numa_registered_q_mask) { + for (int node = 0; node < ring->nr_numa_nodes; node++) + free_cpumask_var(ring->numa_registered_q_mask[node]); + kfree(ring->numa_registered_q_mask); } } @@ -278,44 +280,41 @@ void fuse_uring_destruct(struct fuse_conn *fc) ring->queues[qid] = NULL; } - fuse_uring_destruct_q_masks(ring); + fuse_ring_destruct_q_masks(ring); kfree(ring->queues); kfree(ring); fc->ring = NULL; } -static int fuse_uring_init_q_map(struct fuse_queue_map *q_map, size_t nr_cpu) +static int fuse_ring_create_q_masks(struct fuse_ring *ring, int nr_queues) { - if (!zalloc_cpumask_var(&q_map->registered_q_mask, GFP_KERNEL_ACCOUNT)) + if (!zalloc_cpumask_var(&ring->avail_q_mask, GFP_KERNEL_ACCOUNT)) return -ENOMEM; - q_map->cpu_to_qid = kcalloc(nr_cpu, sizeof(*q_map->cpu_to_qid), - GFP_KERNEL_ACCOUNT); - if (!q_map->cpu_to_qid) + if (!zalloc_cpumask_var(&ring->registered_q_mask, GFP_KERNEL_ACCOUNT)) return -ENOMEM; - return 0; -} - -static int fuse_uring_create_q_masks(struct fuse_ring *ring, size_t nr_queues) -{ - int err, node; - - err = fuse_uring_init_q_map(&ring->q_map, nr_queues); - if (err) - return err; - - ring->numa_q_map = kcalloc(ring->nr_numa_nodes, - sizeof(*ring->numa_q_map), - GFP_KERNEL_ACCOUNT); - if (!ring->numa_q_map) + ring->per_numa_avail_q_mask = kcalloc(ring->nr_numa_nodes, + sizeof(*ring->per_numa_avail_q_mask), + GFP_KERNEL_ACCOUNT); + if (!ring->per_numa_avail_q_mask) return -ENOMEM; - for (node = 0; node < ring->nr_numa_nodes; node++) { - err = fuse_uring_init_q_map(&ring->numa_q_map[node], - nr_queues); - if (err) - return err; + for (int node = 0; node < ring->nr_numa_nodes; node++) + if (!zalloc_cpumask_var(&ring->per_numa_avail_q_mask[node], + GFP_KERNEL_ACCOUNT)) + return -ENOMEM; + + ring->numa_registered_q_mask = kcalloc(ring->nr_numa_nodes, + sizeof(*ring->numa_registered_q_mask), + GFP_KERNEL_ACCOUNT); + if (!ring->numa_registered_q_mask) + return -ENOMEM; + for (int node = 0; node < ring->nr_numa_nodes; node++) { + if (!zalloc_cpumask_var(&ring->numa_registered_q_mask[node], + GFP_KERNEL_ACCOUNT)) + return -ENOMEM; } + return 0; } @@ -344,7 +343,7 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) max_payload_size = max(FUSE_MIN_READ_BUFFER, fc->max_write); max_payload_size = max(max_payload_size, fc->max_pages * PAGE_SIZE); - err = fuse_uring_create_q_masks(ring, nr_queues); + err = fuse_ring_create_q_masks(ring, nr_queues); if (err) goto out_err; @@ -367,42 +366,18 @@ static struct fuse_ring *fuse_uring_create(struct fuse_conn *fc) return ring; out_err: - fuse_uring_destruct_q_masks(ring); + fuse_ring_destruct_q_masks(ring); kfree(ring->queues); kfree(ring); return res; } -static void fuse_uring_cpu_qid_mapping(struct fuse_ring *ring, int qid, - struct fuse_queue_map *q_map, - int node) -{ - int cpu, qid_idx, mapping_count = 0; - size_t nr_queues; - - cpumask_set_cpu(qid, q_map->registered_q_mask); - nr_queues = cpumask_weight(q_map->registered_q_mask); - for (cpu = 0; cpu < ring->max_nr_queues; cpu++) { - if (node != -1 && cpu_to_node(cpu) != node) - continue; - - qid_idx = mapping_count % nr_queues; - q_map->cpu_to_qid[cpu] = cpumask_nth(qid_idx, - q_map->registered_q_mask); - mapping_count++; - pr_debug("%s node=%d qid=%d qid_idx=%d nr_queues=%zu %d->%d\n", - __func__, node, qid, qid_idx, nr_queues, cpu, - q_map->cpu_to_qid[cpu]); - } -} - static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, int qid) { struct fuse_conn *fc = ring->fc; struct fuse_ring_queue *queue; struct list_head *pq; - int node; queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT); if (!queue) @@ -415,6 +390,7 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, queue->qid = qid; queue->ring = ring; + queue->numa_node = cpu_to_node(qid); spin_lock_init(&queue->lock); INIT_LIST_HEAD(&queue->ent_avail_queue); @@ -441,21 +417,6 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring, */ WRITE_ONCE(ring->queues[qid], queue); - /* Static mapping from cpu to per numa queues */ - node = cpu_to_node(qid); - fuse_uring_cpu_qid_mapping(ring, qid, &ring->numa_q_map[node], node); - - /* - * smp_store_release, as the variable is read without fc->lock and - * we need to avoid compiler re-ordering of updating the nr_queues - * and setting ring->numa_queues[node].cpu_to_qid above - */ - smp_store_release (&ring->numa_q_map[node].nr_queues, - ring->numa_q_map[node].nr_queues + 1); - - /* global mapping */ - fuse_uring_cpu_qid_mapping(ring, qid, &ring->q_map, -1); - spin_unlock(&fc->lock); return queue; @@ -625,13 +586,16 @@ void fuse_uring_stop_queues(struct fuse_ring *ring) fuse_uring_abort_end_queue_requests(queue); fuse_uring_teardown_entries(queue); - } - /* Reset all queue masks, we won't process any more IO */ - cpumask_clear(ring->q_map.registered_q_mask); - for (node = 0; node < ring->nr_numa_nodes; node++) { - if (ring->numa_q_map) - cpumask_clear(ring->numa_q_map[node].registered_q_mask); + cpumask_clear_cpu(qid, ring->registered_q_mask); + cpumask_clear_cpu(qid, ring->avail_q_mask); + for (node = 0; node < ring->nr_numa_nodes; node++) { + /* Clear the queue from all masks */ + cpumask_clear_cpu(qid, + ring->numa_registered_q_mask[node]); + cpumask_clear_cpu(qid, + ring->per_numa_avail_q_mask[node]); + } } if (atomic_read(&ring->queue_refs) > 0) { @@ -973,9 +937,17 @@ static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ent, static void fuse_uring_ent_avail(struct fuse_ring_ent *ent, struct fuse_ring_queue *queue) { + struct fuse_ring *ring = queue->ring; + int node = queue->numa_node; + WARN_ON_ONCE(!ent->cmd); list_move(&ent->list, &queue->ent_avail_queue); ent->state = FRRS_AVAILABLE; + + if (queue->nr_reqs <= FUSE_URING_QUEUE_THRESHOLD) { + cpumask_set_cpu(queue->qid, ring->avail_q_mask); + cpumask_set_cpu(queue->qid, ring->per_numa_avail_q_mask[node]); + } } /* Used to find the request on SQE commit */ @@ -998,6 +970,8 @@ static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent, struct fuse_req *req) { struct fuse_ring_queue *queue = ent->queue; + struct fuse_ring *ring = queue->ring; + int node = queue->numa_node; lockdep_assert_held(&queue->lock); @@ -1012,6 +986,16 @@ static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ent, ent->state = FRRS_FUSE_REQ; list_move_tail(&ent->list, &queue->ent_w_req_queue); fuse_uring_add_to_pq(ent, req); + + /* + * If there are no more available entries, mark the queue as unavailable + * in both global and per-NUMA node masks + */ + if (list_empty(&queue->ent_avail_queue)) { + cpumask_clear_cpu(queue->qid, ring->avail_q_mask); + cpumask_clear_cpu(queue->qid, + ring->per_numa_avail_q_mask[node]); + } } /* Fetch the next fuse request if available */ @@ -1401,6 +1385,10 @@ static int fuse_uring_register(struct io_uring_cmd *cmd, /* Marks the ring entry as ready */ fuse_uring_next_fuse_req(ent, queue, issue_flags); + cpumask_set_cpu(queue->qid, ring->registered_q_mask); + cpumask_set_cpu(queue->qid, + ring->numa_registered_q_mask[queue->numa_node]); + return 0; } @@ -1500,80 +1488,109 @@ static void fuse_uring_send_in_task(struct io_uring_cmd *cmd, fuse_uring_send(ent, cmd, err, issue_flags); } -static struct fuse_ring_queue *fuse_uring_select_queue(struct fuse_ring *ring, - bool background) +static struct fuse_ring_queue * +fuse_uring_get_first_queue(struct fuse_ring *ring, const struct cpumask *mask) { - unsigned int qid; - int node, tries = 0; - unsigned int nr_queues; - unsigned int cpu = task_cpu(current); - struct fuse_ring_queue *queue, *primary_queue = NULL; - - /* - * Background requests result in better performance on a different - * CPU, unless CPUs are already busy. - */ - if (background) - cpu++; + int qid; -retry: - cpu = cpu % ring->max_nr_queues; - - /* numa local registered queue bitmap */ - node = cpu_to_node(cpu); - if (WARN_ONCE(node >= ring->nr_numa_nodes, - "Node number (%d) exceeds nr nodes (%d)\n", - node, ring->nr_numa_nodes)) { - node = 0; - } + /* Find the first available CPU in this mask */ + qid = cpumask_first(mask); - nr_queues = READ_ONCE(ring->numa_q_map[node].nr_queues); - if (nr_queues) { - /* prefer the queue that corresponds to the current cpu */ - queue = READ_ONCE(ring->queues[cpu]); - if (queue) { - if (queue->nr_reqs <= FUSE_URING_Q_THRESHOLD) - return queue; - primary_queue = queue; - } + /* Check if we found a valid CPU */ + if (qid >= ring->max_nr_queues) + return NULL; /* No available queues */ - qid = ring->numa_q_map[node].cpu_to_qid[cpu]; - if (WARN_ON_ONCE(qid >= ring->max_nr_queues)) - return NULL; - if (qid != cpu) { - queue = READ_ONCE(ring->queues[qid]); + /* This is the global mask, cpu is already the global qid */ + return ring->queues[qid]; +} - /* Might happen on teardown */ - if (unlikely(!queue)) - return NULL; +/* + * Return a random queue from the registered queues mask + * + * Uses a deterministic but well-distributed algorithm to select + * a random queue from the provided CPU mask. + */ +static unsigned int fuse_uring_get_random_qid(struct fuse_ring *ring, + const struct cpumask *mask) +{ + unsigned int nr_bits = cpumask_weight(mask); + unsigned int nth, cpu; - if (queue->nr_reqs <= FUSE_URING_Q_THRESHOLD) - return queue; - } + if (nr_bits == 0) + return UINT_MAX; - /* Retries help for load balancing */ - if (tries < FUSE_URING_Q_TRIES && tries + 1 < nr_queues) { - if (!primary_queue) - primary_queue = queue; + /* Fast path for single CPU */ + if (nr_bits == 1) + return cpumask_first(mask); - /* Increase cpu, assuming it will map to a different qid*/ - cpu++; - tries++; - goto retry; - } + /* + * Use current jiffies and task PID to create a pseudo-random + * but well-distributed selection that varies across calls + */ + nth = (get_random_u32() ^ (jiffies & 0xFFFF) ^ + (current->pid & 0xFFFF)) % + nr_bits; + + /* Find the CPU at that position */ + for_each_cpu(cpu, mask) { + if (nth-- == 0) + return cpu; } - /* Retries exceeded, take the primary target queue */ - if (primary_queue) - return primary_queue; + return UINT_MAX; +} - /* global registered queue bitmap */ - qid = ring->q_map.cpu_to_qid[cpu]; - if (WARN_ON_ONCE(qid >= ring->max_nr_queues)) { - /* Might happen on teardown */ - return NULL; +/* + * Get the best queue for the current CPU + */ +static struct fuse_ring_queue *fuse_uring_get_queue(struct fuse_ring *ring) +{ + unsigned int qid; + struct fuse_ring_queue *queue, *local_queue = NULL; + int local_node; + struct cpumask *mask; + struct fuse_conn *fc = ring->fc; + + qid = task_cpu(current); + local_node = cpu_to_node(qid); + if (WARN_ON_ONCE(local_node >= ring->nr_numa_nodes || local_node < 0)) + local_node = 0; + + /* First check if current CPU's queue is available */ + if (qid < ring->max_nr_queues) { + local_queue = queue = ring->queues[qid]; + if (queue && queue->nr_reqs <= FUSE_URING_QUEUE_THRESHOLD) + return queue; } - return READ_ONCE(ring->queues[qid]); + + /* Second check if there are any available queues on the local node */ + mask = ring->per_numa_avail_q_mask[local_node]; + queue = fuse_uring_get_first_queue(ring, mask); + if (queue) + return queue; + + /* Third check if there are any available queues on any node */ + queue = fuse_uring_get_first_queue(ring, ring->avail_q_mask); + if (queue) + return queue; + + /* No free queue, use the local queue if it exists */ + if (local_queue) + return local_queue; + + /* Try to use a random queue from the local NUMA node, if there is one */ + mask = ring->numa_registered_q_mask[local_node]; + qid = fuse_uring_get_random_qid(ring, mask); + if (qid < ring->max_nr_queues) + return ring->queues[qid]; + + /* Finally, use a random queue among all queues that are registered */ + qid = fuse_uring_get_random_qid(ring, ring->registered_q_mask); + if (qid < ring->max_nr_queues) + return ring->queues[qid]; + + WARN_ON_ONCE(fc->connected); + return NULL; } static void fuse_uring_dispatch_ent(struct fuse_ring_ent *ent, bool bg) @@ -1613,7 +1630,7 @@ void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req) int err; err = -EINVAL; - queue = fuse_uring_select_queue(ring, false); + queue = fuse_uring_get_queue(ring); if (!queue) goto err; @@ -1628,6 +1645,19 @@ void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req) struct fuse_ring_ent, list); queue->nr_reqs++; + /* + * Update queue availability based on number of requests + * A queue is considered busy if it has more than + * FUSE_URING_QUEUE_THRESHOLD requests + */ + if (queue->nr_reqs == FUSE_URING_QUEUE_THRESHOLD + 1) { + /* Queue just became busy */ + cpumask_clear_cpu(queue->qid, ring->avail_q_mask); + cpumask_clear_cpu( + queue->qid, + ring->per_numa_avail_q_mask[queue->numa_node]); + } + if (ent) fuse_uring_add_req_to_ring_ent(ent, req); else @@ -1655,7 +1685,7 @@ bool fuse_uring_queue_bq_req(struct fuse_req *req) struct fuse_ring_queue *queue; struct fuse_ring_ent *ent = NULL; - queue = fuse_uring_select_queue(ring, true); + queue = fuse_uring_get_queue(ring); if (!queue) return false; @@ -1701,12 +1731,25 @@ bool fuse_uring_queue_bq_req(struct fuse_req *req) bool fuse_uring_remove_pending_req(struct fuse_req *req) { struct fuse_ring_queue *queue = req->ring_queue; + struct fuse_ring *ring = queue->ring; + int node = queue->numa_node; bool removed = fuse_remove_pending_req(req, &queue->lock); if (removed) { /* Update counters after successful removal */ spin_lock(&queue->lock); queue->nr_reqs--; + + /* + * Update queue availability based on number of requests + * A queue is considered available if it has FUSE_URING_QUEUE_THRESHOLD or fewer requests + */ + if (queue->nr_reqs == FUSE_URING_QUEUE_THRESHOLD) { + /* Queue just became available */ + cpumask_set_cpu(queue->qid, ring->avail_q_mask); + cpumask_set_cpu(queue->qid, + ring->per_numa_avail_q_mask[node]); + } spin_unlock(&queue->lock); } diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h index 667f668ef3277f..d0b6dd1e85ff49 100644 --- a/fs/fuse/dev_uring_i.h +++ b/fs/fuse/dev_uring_i.h @@ -70,6 +70,9 @@ struct fuse_ring_queue { /* queue id, corresponds to the cpu core */ unsigned int qid; + /* NUMA node this queue belongs to */ + int numa_node; + /* * queue lock, taken when any value in the queue changes _and_ also * a ring entry state changes. @@ -108,17 +111,6 @@ struct fuse_ring_queue { bool stopped; }; -struct fuse_queue_map { - /* Tracks which queues are registered */ - cpumask_var_t registered_q_mask; - - /* number of registered queues */ - size_t nr_queues; - - /* cpu to qid mapping */ - int *cpu_to_qid; -}; - /** * Describes if uring is for communication and holds alls the data needed * for uring communication @@ -143,11 +135,17 @@ struct fuse_ring { */ unsigned int stop_debug_log : 1; - /* per numa node queue tracking */ - struct fuse_queue_map *numa_q_map; + /* Tracks which queues are available (empty) globally */ + cpumask_var_t avail_q_mask; + + /* Tracks which queues are available per NUMA node */ + cpumask_var_t *per_numa_avail_q_mask; + + /* Tracks which queues are registered */ + cpumask_var_t registered_q_mask; - /* all queue tracking */ - struct fuse_queue_map q_map; + /* Tracks which queues are registered per NUMA node */ + cpumask_var_t *numa_registered_q_mask; wait_queue_head_t stop_waitq; diff --git a/fs/fuse/file.c b/fs/fuse/file.c index 467093acadbb89..2bc0b9806e5206 100644 --- a/fs/fuse/file.c +++ b/fs/fuse/file.c @@ -350,6 +350,8 @@ static int fuse_open(struct inode *inode, struct file *file) bool is_truncate = (file->f_flags & O_TRUNC) && fc->atomic_o_trunc; bool is_wb_truncate = is_truncate && fc->writeback_cache; bool dax_truncate = is_truncate && FUSE_IS_DAX(inode); + bool force_dio = false; + int dio_threshold = READ_ONCE(fc->force_dio_on_contention); if (fuse_is_bad(inode)) return -EIO; @@ -358,7 +360,26 @@ static int fuse_open(struct inode *inode, struct file *file) if (err) return err; - if (is_wb_truncate || dax_truncate) + /* + * If this file is opened for writing while another writer already has + * the inode open, the cached write path (fuse_cache_write_iter()) + * serializes them on the exclusive inode lock. Latch the inode into + * direct IO so writes use the shared-lock parallel dio path instead. + * The page cache is flushed and dropped below (fuse_launder_folio() + * writes back dirty folios during the invalidate), making the switch + * coherent. Disabled for DAX, passthrough/backing and mmapped inodes, + * which cannot bypass the page cache. + */ + if (dio_threshold >= 0 && + (file->f_mode & FMODE_WRITE) && fc->writeback_cache && + !FUSE_IS_DAX(inode) && !fuse_inode_backing(fi) && + !mapping_mapped(inode->i_mapping)) { + spin_lock(&fi->lock); + force_dio = fuse_writers_contended(fi, dio_threshold); + spin_unlock(&fi->lock); + } + + if (is_wb_truncate || dax_truncate || force_dio) inode_lock(inode); if (dax_truncate) { @@ -368,7 +389,7 @@ static int fuse_open(struct inode *inode, struct file *file) goto out_inode_unlock; } - if (is_wb_truncate || dax_truncate) + if (is_wb_truncate || dax_truncate || force_dio) fuse_set_nowrite(inode); err = fuse_do_open(fm, get_node_id(inode), file, false); @@ -381,18 +402,33 @@ static int fuse_open(struct inode *inode, struct file *file) fuse_truncate_update_attr(inode, file); } - if (is_wb_truncate || dax_truncate) + if (is_wb_truncate || dax_truncate || force_dio) fuse_release_nowrite(inode); if (!err) { + /* + * Latch into forced direct IO whenever a second writer is + * contending, independently of the O_TRUNC page-cache handling + * below -- an O_TRUNC multi-writer open must switch to direct IO + * too. Drop the page cache regardless of FOPEN_KEEP_CACHE. + * Still under inode_lock, so a writer contending in + * fuse_cache_write_iter() re-checks the latch after it acquires + * the inode lock and re-routes to direct IO instead of + * repopulating the cache dropped here. + */ + bool latch = force_dio && !fuse_inode_backing(fi) && + !mapping_mapped(inode->i_mapping); + + if (latch) + set_bit(FUSE_I_FORCE_DIO, &fi->state); if (is_truncate) truncate_pagecache(inode, 0); - else if (!(ff->open_flags & FOPEN_KEEP_CACHE)) + else if (latch || !(ff->open_flags & FOPEN_KEEP_CACHE)) invalidate_inode_pages2(inode->i_mapping); } if (dax_truncate) filemap_invalidate_unlock(inode->i_mapping); out_inode_unlock: - if (is_wb_truncate || dax_truncate) + if (is_wb_truncate || dax_truncate || force_dio) inode_unlock(inode); return err; @@ -411,6 +447,17 @@ static void fuse_prepare_release(struct fuse_inode *fi, struct fuse_file *ff, if (likely(fi)) { spin_lock(&fi->lock); list_del(&ff->write_entry); + /* + * Leave forced direct IO mode once the last writer is gone; + * a lone (or no) writer no longer contends on the inode lock. + * Restore FUSE_I_CACHE_IO_MODE for any frozen cached opens. + */ + if (test_bit(FUSE_I_FORCE_DIO, &fi->state) && + list_empty(&fi->write_files)) { + clear_bit(FUSE_I_FORCE_DIO, &fi->state); + if (fi->iocachectr > 0) + set_bit(FUSE_I_CACHE_IO_MODE, &fi->state); + } spin_unlock(&fi->lock); } spin_lock(&fc->lock); @@ -449,9 +496,25 @@ void fuse_file_release(struct inode *inode, struct fuse_file *ff, struct fuse_inode *fi = get_fuse_inode(inode); struct fuse_release_args *ra = &ff->args->release_args; int opcode = isdir ? FUSE_RELEASEDIR : FUSE_RELEASE; + bool was_force_dio = test_bit(FUSE_I_FORCE_DIO, &fi->state); fuse_prepare_release(fi, ff, open_flags, opcode, false); + /* + * If this release dropped the last writer, fuse_prepare_release() + * cleared the forced-direct-IO latch (under fi->lock). Drop any clean + * folios a read racing the latch may have repopulated, so they cannot + * be served stale once caching mode resumes. No inode lock or + * wb_inval_rwsem: release may run on the fuse server thread (async + * fput from aio completion), where blocking on a contended inode lock + * could stall the connection. Writes were routed direct while + * latched, so only clean folios exist and this invalidate is + * server-free; the last writer is gone, so no forced-dio writer can + * race the drop. + */ + if (was_force_dio && !test_bit(FUSE_I_FORCE_DIO, &fi->state)) + invalidate_inode_pages2(inode->i_mapping); + if (ra && ff->flock) { ra->inarg.release_flags |= FUSE_RELEASE_FLOCK_UNLOCK; ra->inarg.lock_owner = fuse_lock_owner_id(ff->fm->fc, id); @@ -1497,13 +1560,6 @@ static ssize_t fuse_perform_write(struct kiocb *iocb, struct iov_iter *ii) return res; } -static bool fuse_io_past_eof(struct kiocb *iocb, struct iov_iter *iter) -{ - struct inode *inode = file_inode(iocb->ki_filp); - - return iocb->ki_pos + iov_iter_count(iter) > i_size_read(inode); -} - /* * @return true if an exclusive lock for direct IO writes is needed */ @@ -1513,9 +1569,15 @@ static bool fuse_dio_wr_exclusive_lock(struct kiocb *iocb, struct iov_iter *from struct fuse_file *ff = file->private_data; struct inode *inode = file_inode(iocb->ki_filp); struct fuse_inode *fi = get_fuse_inode(inode); + bool force_dio = test_bit(FUSE_I_FORCE_DIO, &fi->state); - /* Server side has to advise that it supports parallel dio writes. */ - if (!(ff->open_flags & FOPEN_PARALLEL_DIRECT_WRITES)) + /* + * Server side has to advise that it supports parallel dio writes. + * When the inode is latched into forced direct IO due to multi-opener + * contention, parallel writes are used unconditionally: the page cache + * has been flushed and is bypassed for this inode. + */ + if (!force_dio && !(ff->open_flags & FOPEN_PARALLEL_DIRECT_WRITES)) return true; /* @@ -1526,11 +1588,7 @@ static bool fuse_dio_wr_exclusive_lock(struct kiocb *iocb, struct iov_iter *from return true; /* shared locks are not allowed with parallel page cache IO */ - if (test_bit(FUSE_I_CACHE_IO_MODE, &fi->state)) - return true; - - /* Parallel dio beyond EOF is not supported, at least for now. */ - if (fuse_io_past_eof(iocb, from)) + if (!force_dio && test_bit(FUSE_I_CACHE_IO_MODE, &fi->state)) return true; return false; @@ -1551,10 +1609,14 @@ static void fuse_dio_lock(struct kiocb *iocb, struct iov_iter *from, * New parallal dio allowed only if inode is not in caching * mode and denies new opens in caching mode. This check * should be performed only after taking shared inode lock. - * Previous past eof check was without inode lock and might - * have raced, so check it again. + * + * Under the forced-dio latch the uncached-io accounting is + * bypassed entirely -- fuse_dio_unlock() likewise skips + * fuse_inode_uncached_io_end() -- so do not take a reference + * here. An unbalanced start would drive fi->iocachectr + * permanently negative and hang the next caching-mode open. */ - if (fuse_io_past_eof(iocb, from) || + if (!test_bit(FUSE_I_FORCE_DIO, &fi->state) && fuse_inode_uncached_io_start(fi, NULL) != 0) { inode_unlock_shared(inode); inode_lock(inode); @@ -1571,8 +1633,13 @@ static void fuse_dio_unlock(struct kiocb *iocb, bool exclusive) if (exclusive) { inode_unlock(inode); } else { - /* Allow opens in caching mode after last parallel dio end */ - fuse_inode_uncached_io_end(fi); + /* + * Allow opens in caching mode after last parallel dio end. + * Skipped under the forced-dio latch, which never took an + * uncached_io reference in fuse_dio_lock(). + */ + if (!test_bit(FUSE_I_FORCE_DIO, &fi->state)) + fuse_inode_uncached_io_end(fi); inode_unlock_shared(inode); } } @@ -1666,6 +1733,51 @@ static ssize_t fuse_writeback_write_iter(struct kiocb *iocb, return written < 0 ? written : total_written; } +/* + * With writeback caching the request size seen by the server depends on + * how many contiguous dirty pages the flusher finds, which is bounded by + * dirty throttling: with BDI_CAP_STRICTLIMIT the dirty window can degrade + * to a single page under streaming writes, turning large application + * writes into page-sized requests. + * + * Writes that already match the server's preferred alignment gain + * nothing from accumulating in the page cache, so send them through + * fuse_perform_write() instead, which packs requests up to max_write. + * They create no dirty pages, hence no DLM write lock needs to be cached + * for them. Unaligned writes keep using the writeback cache, where they + * can merge with neighbouring data. + * + * A non-zero writethrough_threshold additionally forces any write at or + * above that size through fuse_perform_write() regardless of alignment. + */ +static bool fuse_use_writeback_cache(struct fuse_conn *fc, struct kiocb *iocb, + struct iov_iter *from) +{ + size_t count = iov_iter_count(from); + unsigned int wt; + u64 align; + bool ret; + + if (!fc->big_writes) + return true; + + /* these rely on the semantics of their current paths */ + if (iocb->ki_flags & (IOCB_DIRECT | IOCB_APPEND | IOCB_NOWAIT)) + return true; + + wt = READ_ONCE(fc->writethrough_threshold); + if (wt && count >= wt) + return false; + + align = fc->alignment_pages ? + (u64)fc->alignment_pages << PAGE_SHIFT : PAGE_SIZE; + + ret = !IS_ALIGNED(iocb->ki_pos, align) || !IS_ALIGNED((u64)count, align); + return ret; +} + +static ssize_t fuse_direct_write_iter(struct kiocb *iocb, struct iov_iter *from); + static ssize_t fuse_cache_write_iter(struct kiocb *iocb, struct iov_iter *from) { struct file *file = iocb->ki_filp; @@ -1673,9 +1785,20 @@ static ssize_t fuse_cache_write_iter(struct kiocb *iocb, struct iov_iter *from) struct address_space *mapping = file->f_mapping; ssize_t written = 0; struct inode *inode = mapping->host; + struct fuse_inode *fi = get_fuse_inode(inode); ssize_t err, count; struct fuse_conn *fc = get_fuse_conn(inode); bool writeback = false; + bool wb_guard = false; + + /* + * The inode may have been latched into forced direct IO after this + * write was routed here but before it acquired any lock. Re-route to + * the direct path (before taking a DLM lock) so we do not repopulate + * the page cache that the latch just dropped. + */ + if (fuse_inode_force_dio(inode)) + return fuse_direct_write_iter(iocb, from); if (fc->writeback_cache) { /* Update size (EOF optimization) and mode (SUID clearing) */ @@ -1684,8 +1807,9 @@ static ssize_t fuse_cache_write_iter(struct kiocb *iocb, struct iov_iter *from) if (err) return err; - if (!fc->handle_killpriv_v2 || - !setattr_should_drop_suidgid(idmap, file_inode(file))) { + if ((!fc->handle_killpriv_v2 || + !setattr_should_drop_suidgid(idmap, file_inode(file))) && + fuse_use_writeback_cache(fc, iocb, from)) { writeback = true; /* @@ -1715,6 +1839,39 @@ static ssize_t fuse_cache_write_iter(struct kiocb *iocb, struct iov_iter *from) inode_lock(inode); + /* + * The forced-direct-IO latch may have been set by a second writer's + * open() while we blocked acquiring the inode lock (fuse_open() holds + * it across the latch + page-cache invalidate). Re-check now that we + * hold the lock and re-route to the direct path rather than repopulate + * the just-dropped cache. The DLM write-lock notification issued above + * is harmless: the direct path performs its own server coordination. + */ + if (fuse_inode_force_dio(inode)) { + inode_unlock(inode); + return fuse_direct_write_iter(iocb, from); + } + + /* + * When force_dio_on_contention is enabled, hold wb_inval_rwsem for + * read across the page-cache dirtying so a concurrent + * NOTIFY_INVAL_INODE -- which latches the inode into direct IO under + * the write side of this lock (via trylock) -- cannot strand the + * folios we are about to write. Re-check the latch under it and + * re-route to the direct path if set. Gated on the knob so writeback + * mounts without the feature never touch the rwsem, and taken before + * task_io_account_write() so a re-route is not double-counted. + */ + wb_guard = READ_ONCE(fc->force_dio_on_contention) >= 0; + if (wb_guard) { + down_read(&fi->wb_inval_rwsem); + if (fuse_inode_force_dio(inode)) { + up_read(&fi->wb_inval_rwsem); + inode_unlock(inode); + return fuse_direct_write_iter(iocb, from); + } + } + err = count = generic_write_checks(iocb, from); if (err <= 0) goto out; @@ -1741,6 +1898,8 @@ static ssize_t fuse_cache_write_iter(struct kiocb *iocb, struct iov_iter *from) written = fuse_perform_write(iocb, from); } out: + if (wb_guard) + up_read(&fi->wb_inval_rwsem); inode_unlock(inode); if (written > 0) written = generic_write_sync(iocb, written); @@ -1975,14 +2134,16 @@ static ssize_t __fuse_direct_read(struct fuse_io_priv *io, return res; } -static ssize_t fuse_direct_IO(struct kiocb *iocb, struct iov_iter *iter); +static ssize_t __fuse_direct_IO(struct kiocb *iocb, struct iov_iter *iter, + bool exclusive); static ssize_t fuse_direct_read_iter(struct kiocb *iocb, struct iov_iter *to) { ssize_t res; if (!is_sync_kiocb(iocb)) { - res = fuse_direct_IO(iocb, to); + /* exclusive is unused on reads; rollback is write-only */ + res = __fuse_direct_IO(iocb, to, true); } else { struct fuse_io_priv io = FUSE_IO_PRIV_SYNC(iocb); @@ -2005,7 +2166,7 @@ static ssize_t fuse_direct_write_iter(struct kiocb *iocb, struct iov_iter *from) if (res > 0) { task_io_account_write(res); if (!is_sync_kiocb(iocb)) { - res = fuse_direct_IO(iocb, from); + res = __fuse_direct_IO(iocb, from, exclusive); } else { struct fuse_io_priv io = FUSE_IO_PRIV_SYNC(iocb); @@ -2042,7 +2203,7 @@ static ssize_t fuse_file_read_iter(struct kiocb *iocb, struct iov_iter *to) return fuse_dax_read_iter(iocb, to); /* FOPEN_DIRECT_IO overrides FOPEN_PASSTHROUGH */ - if (ff->open_flags & FOPEN_DIRECT_IO) + if ((ff->open_flags & FOPEN_DIRECT_IO) || fuse_inode_force_dio(inode)) return fuse_direct_read_iter(iocb, to); else if (fuse_file_passthrough(ff)) return fuse_passthrough_read_iter(iocb, to); @@ -2063,7 +2224,7 @@ static ssize_t fuse_file_write_iter(struct kiocb *iocb, struct iov_iter *from) return fuse_dax_write_iter(iocb, from); /* FOPEN_DIRECT_IO overrides FOPEN_PASSTHROUGH */ - if (ff->open_flags & FOPEN_DIRECT_IO) + if ((ff->open_flags & FOPEN_DIRECT_IO) || fuse_inode_force_dio(inode)) return fuse_direct_write_iter(iocb, from); else if (fuse_file_passthrough(ff)) return fuse_passthrough_write_iter(iocb, from); @@ -2114,19 +2275,15 @@ static void fuse_writepage_finish(struct fuse_writepage_args *wpa) struct fuse_args_pages *ap = &wpa->ia.ap; struct inode *inode = wpa->inode; struct fuse_inode *fi = get_fuse_inode(inode); - struct backing_dev_info *bdi = inode_to_bdi(inode); int i; - for (i = 0; i < ap->num_folios; i++) { + for (i = 0; i < ap->num_folios; i++) /* * Benchmarks showed that ending writeback within the * scope of the fi->lock alleviates xarray lock * contention and noticeably improves performance. */ iomap_finish_folio_write(inode, ap->folios[i], 1); - dec_wb_stat(&bdi->wb, WB_WRITEBACK); - wb_writeout_inc(&bdi->wb); - } wake_up(&fi->page_waitq); } @@ -2301,14 +2458,11 @@ static void fuse_writepage_add_to_bucket(struct fuse_conn *fc, static void fuse_writepage_args_page_fill(struct fuse_writepage_args *wpa, struct folio *folio, uint32_t folio_index, loff_t offset, unsigned len) { - struct inode *inode = folio->mapping->host; struct fuse_args_pages *ap = &wpa->ia.ap; ap->folios[folio_index] = folio; ap->descs[folio_index].offset = offset; ap->descs[folio_index].length = len; - - inc_wb_stat(&inode_to_bdi(inode)->wb, WB_WRITEBACK); } static struct fuse_writepage_args *fuse_writepage_args_setup(struct folio *folio, @@ -2691,6 +2845,34 @@ static int fuse_file_mmap(struct file *file, struct vm_area_struct *vma) else if (fuse_inode_backing(get_fuse_inode(inode))) return -ENODEV; + /* + * If the inode was latched into forced direct IO due to multi-opener + * contention, a mapping needs the page cache, so revert to caching + * mode. The exclusive inode lock drains in-flight parallel (shared + * lock) dio writes before the page cache is reused. Cached opens that + * were frozen while latched are still counted in iocachectr, so restore + * FUSE_I_CACHE_IO_MODE for them. + */ + if (fuse_inode_force_dio(inode)) { + struct fuse_inode *fi = get_fuse_inode(inode); + + /* + * Revert without the inode lock or wb_inval_rwsem: ->mmap runs + * under mmap_lock, and the buffered write path holds both the + * inode lock and wb_inval_rwsem across a fault on the user buffer + * (which takes mmap_lock), so taking either here would invert + * lock order (ABBA). Clearing the latch and dropping the cache + * is sufficient -- writers re-check the latch and route to cached + * IO once it is clear, and in-flight parallel dio drains itself. + */ + spin_lock(&fi->lock); + clear_bit(FUSE_I_FORCE_DIO, &fi->state); + if (fi->iocachectr > 0) + set_bit(FUSE_I_CACHE_IO_MODE, &fi->state); + spin_unlock(&fi->lock); + invalidate_inode_pages2(file->f_mapping); + } + /* * FOPEN_DIRECT_IO handling is special compared to O_DIRECT, * as does not allow MAP_SHARED mmap without FUSE_DIRECT_IO_ALLOW_MMAP. @@ -3119,7 +3301,7 @@ static inline loff_t fuse_round_up(struct fuse_conn *fc, loff_t off) } static ssize_t -fuse_direct_IO(struct kiocb *iocb, struct iov_iter *iter) +__fuse_direct_IO(struct kiocb *iocb, struct iov_iter *iter, bool exclusive) { DECLARE_COMPLETION_ONSTACK(wait); ssize_t ret = 0; @@ -3213,14 +3395,27 @@ fuse_direct_IO(struct kiocb *iocb, struct iov_iter *iter) if (iov_iter_rw(iter) == WRITE) { fuse_write_update_attr(inode, pos, ret); - /* For extending writes we already hold exclusive lock */ - if (ret < 0 && offset + count > i_size) + /* + * Whole-file rollback is only safe under an exclusive lock. + * Parallel writers commit i_size only on success (nothing to + * undo); the server owns failed-extend cleanup. + */ + if (exclusive && ret < 0 && offset + count > i_size) fuse_do_truncate(file); } return ret; } +static ssize_t fuse_direct_IO(struct kiocb *iocb, struct iov_iter *iter) +{ + /* + * Only reached via generic_file_direct_write() (caching-mode + * O_DIRECT), which holds the inode lock exclusively. + */ + return __fuse_direct_IO(iocb, iter, true); +} + static int fuse_writeback_range(struct inode *inode, loff_t start, loff_t end) { int err = filemap_write_and_wait_range(inode->i_mapping, start, LLONG_MAX); @@ -3502,10 +3697,25 @@ void fuse_init_file_inode(struct inode *inode, unsigned int flags) fi->iocachectr = 0; init_waitqueue_head(&fi->page_waitq); init_waitqueue_head(&fi->direct_io_waitq); + init_rwsem(&fi->wb_inval_rwsem); if (IS_ENABLED(CONFIG_FUSE_DAX)) fuse_dax_inode_init(inode, flags); - if (enable_large_folios) - mapping_set_large_folios(inode->i_mapping); + if (enable_large_folios) { + /* + * Readahead and writeback batch whole folios into a single + * request, capped at min(fc->max_pages, fc->max_read/PAGE_SIZE) + * pages. The page cache must therefore never build a folio + * larger than that, or fuse_readahead() trips WARN_ON(!pages) + * and then dereferences a NULL ap->folios[0] in + * fuse_send_readpages(). Bound the folio order to the request + * limit instead of MAX_PAGECACHE_ORDER. + */ + unsigned int max_pages = min(fc->max_pages, + fc->max_read >> PAGE_SHIFT); + + mapping_set_folio_order_range(inode->i_mapping, 0, + ilog2(max_pages ?: 1)); + } } diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h index 365c28cb282146..cc85a21fe081be 100644 --- a/fs/fuse/fuse_i.h +++ b/fs/fuse/fuse_i.h @@ -47,7 +47,7 @@ #define FUSE_NAME_MAX (PATH_MAX - 1) /** Number of dentries for each connection in the control filesystem */ -#define FUSE_CTL_NUM_DENTRIES 5 +#define FUSE_CTL_NUM_DENTRIES 6 /* Frequency (in seconds) of request timeout checks, if opted into */ #define FUSE_TIMEOUT_TIMER_FREQ 15 @@ -186,6 +186,23 @@ struct fuse_inode { /* dlm locked areas we have sent lock requests for */ struct fuse_dlm_cache dlm_locked_areas; + + /* + * Serializes buffered-write page-cache dirtying against + * the forced-direct-IO latch transitions that cannot + * take the inode lock -- most importantly + * NOTIFY_INVAL_INODE (fuse_reverse_inval_inode()), which + * may be delivered by the same server thread that still + * owes a reply to an in-flight write holding the inode + * lock. The buffered writer holds this for read around + * the dirtying and re-checks the latch under it; the + * latch set/clear sites hold it for write around their + * page-cache invalidate + latch update. The writer's + * read-side section must stay free of server round-trips + * (under fc->dlm the partial-write RMW read is skipped), + * or the down_write() could wait on the server. + */ + struct rw_semaphore wb_inval_rwsem; }; /* readdir cache (directory only) */ @@ -257,6 +274,13 @@ enum { FUSE_I_BTIME, /* Wants or already has page cache IO */ FUSE_I_CACHE_IO_MODE, + /* + * Latched into direct IO because several entities have the file open + * and were contending on the exclusive inode lock of the cached write + * path. Reads and writes are routed direct (shared-lock parallel dio) + * until the last writer closes. See fuse_open()/fuse_file_io_open(). + */ + FUSE_I_FORCE_DIO, }; struct fuse_conn; @@ -1045,6 +1069,21 @@ struct fuse_conn { /* The foffset alignment in PAGE */ unsigned int alignment_pages; + /* Buffered writes >= this size bypass the writeback cache (0 = off) */ + unsigned int writethrough_threshold; + + /* + * Latch an inode into direct IO once it is open for writing by enough + * entities to contend on the exclusive inode lock in the cached write + * path. The value is the number of *other* writers that must already + * hold the inode before the latch trips: 0 forces DIO on the first + * writer, 1 on the second (i.e. as soon as a second writer appears -- + * the historic on/off behaviour), N on the (N+1)th. -1 disables the + * feature. Requires the server to tolerate concurrent direct writes + * (as cluster/parallel filesystems do). + */ + int force_dio_on_contention; + /** * XArray tracking tasks that need DLM retry. * Maps task pointer -> struct fuse_dlm_retry. @@ -1600,6 +1639,35 @@ void fuse_inode_uncached_io_end(struct fuse_inode *fi); int fuse_file_io_open(struct file *file, struct inode *inode); void fuse_file_io_release(struct fuse_file *ff, struct inode *inode); +/* Inode latched into forced direct IO due to multi-opener lock contention */ +static inline bool fuse_inode_force_dio(struct inode *inode) +{ + return test_bit(FUSE_I_FORCE_DIO, &get_fuse_inode(inode)->state); +} + +/* + * True when the inode is contended enough to force direct IO: at least + * @threshold writers are already linked on write_files, besides the opener or + * remote modifier that prompted the check. @threshold is fc->force_dio_on_- + * contention: 0 trips on the first writer, 1 on the second, N on the (N+1)th, + * and a negative value disables the feature. Must be called with fi->lock held. + */ +static inline bool fuse_writers_contended(struct fuse_inode *fi, int threshold) +{ + struct fuse_file *ff; + int count = 0; + + if (threshold < 0) + return false; + + list_for_each_entry(ff, &fi->write_files, write_entry) { + if (count >= threshold) + return true; + count++; + } + return count >= threshold; +} + /* file.c */ struct fuse_file *fuse_file_open(struct fuse_mount *fm, u64 nodeid, struct inode *inode, diff --git a/fs/fuse/inode.c b/fs/fuse/inode.c index 6a94de9528210f..037f68c4fb4a0f 100644 --- a/fs/fuse/inode.c +++ b/fs/fuse/inode.c @@ -764,6 +764,7 @@ int fuse_reverse_inval_inode(struct fuse_conn *fc, u64 nodeid, struct inode *inode; pgoff_t pg_start; pgoff_t pg_end; + int dio_threshold; inode = fuse_ilookup(fc, nodeid, NULL); if (!inode) @@ -806,8 +807,49 @@ int fuse_reverse_inval_inode(struct fuse_conn *fc, u64 nodeid, pg_end == -1 ? 0 : (offset + len - 1)); - invalidate_inode_pages2_range(inode->i_mapping, - pg_start, pg_end); + /* + * A data invalidation means another (remote) entity is modifying + * the file. If it is also open for writing here, latch the inode + * into direct IO (when enabled), as for the open-time multi-writer + * case. Take wb_inval_rwsem for write so the buffered write path + * -- which holds it for read across its dirtying and re-checks the + * latch under it -- cannot strand dirty folios after the cache is + * dropped. Use a trylock and never block: this may run on the + * server thread that still owes an in-flight write (holding the + * inode lock) its reply, so blocking on the rwsem (or the inode + * lock) would deadlock. If a writer is active, skip the latch + * this round (best effort); the invalidate still runs. Only + * regular files initialise the rwsem (it shares storage with the + * readdir-cache union arm), so gate on S_ISREG. The latch is + * whole-inode, so when set drop the whole mapping rather than just + * the notified range, or dirty folios outside it would be invisible + * to the forced direct reads (stale read / lost write). + */ + dio_threshold = READ_ONCE(fc->force_dio_on_contention); + + if (S_ISREG(inode->i_mode) && dio_threshold >= 0 && + fc->writeback_cache && !FUSE_IS_DAX(inode) && + !fuse_inode_backing(fi) && !mapping_mapped(inode->i_mapping) && + down_write_trylock(&fi->wb_inval_rwsem)) { + bool latched = false; + + spin_lock(&fi->lock); + if (fuse_writers_contended(fi, dio_threshold)) { + set_bit(FUSE_I_FORCE_DIO, &fi->state); + latched = true; + } + spin_unlock(&fi->lock); + + if (latched) + invalidate_inode_pages2(inode->i_mapping); + else + invalidate_inode_pages2_range(inode->i_mapping, + pg_start, pg_end); + up_write(&fi->wb_inval_rwsem); + } else { + invalidate_inode_pages2_range(inode->i_mapping, + pg_start, pg_end); + } } iput(inode); return 0; @@ -1203,6 +1245,7 @@ void fuse_conn_init(struct fuse_conn *fc, struct fuse_mount *fm, atomic_set(&fc->num_waiting, 0); fc->max_background = FUSE_DEFAULT_MAX_BACKGROUND; fc->congestion_threshold = FUSE_DEFAULT_CONGESTION_THRESHOLD; + fc->force_dio_on_contention = -1; /* disabled by default */ atomic64_set(&fc->khctr, 0); fc->polled_files = RB_ROOT; fc->blocked = 0; @@ -1704,6 +1747,7 @@ static void process_init_reply(struct fuse_mount *fm, struct fuse_args *args, else fm->sb->s_bdi->ra_pages = min(fm->sb->s_bdi->ra_pages, ra_pages); + fm->sb->s_bdi->io_pages = fc->max_pages; fc->minor = arg->minor; fc->max_write = arg->minor < 5 ? 4096 : arg->max_write; fc->max_write = max_t(unsigned, 4096, fc->max_write); @@ -1832,9 +1876,13 @@ static int fuse_bdi_init(struct fuse_conn *fc, struct super_block *sb) if (err) return err; - /* fuse does it's own writeback accounting */ - sb->s_bdi->capabilities &= ~BDI_CAP_WRITEBACK_ACCT; - sb->s_bdi->capabilities |= BDI_CAP_STRICTLIMIT; + /* + * fuse uses the default (core) writeback accounting now that the manual + * WB_WRITEBACK updates are gone, so leave BDI_CAP_WRITEBACK_ACCT set: + * clearing it would zero the per-bdi writeback stats and writeout + * fraction that balance_dirty_pages() throttles on. + */ + sb->s_bdi->capabilities &= ~BDI_CAP_STRICTLIMIT; /* * For a single fuse filesystem use max 1% of dirty + diff --git a/fs/fuse/iomode.c b/fs/fuse/iomode.c index c99e285f3183ef..66ee36145a60bc 100644 --- a/fs/fuse/iomode.c +++ b/fs/fuse/iomode.c @@ -233,6 +233,16 @@ int fuse_file_io_open(struct file *file, struct inode *inode) !(ff->open_flags & FOPEN_PASSTHROUGH)) return 0; + /* + * The inode was latched into direct IO because several entities have + * it open and were contending on the exclusive inode lock of the + * cached write path. Open this file uncached as well so that its IO + * is routed direct and it does not re-enter caching mode. + */ + if (test_bit(FUSE_I_FORCE_DIO, &fi->state) && + !(ff->open_flags & FOPEN_PASSTHROUGH)) + return 0; + if (ff->open_flags & FOPEN_PASSTHROUGH) err = fuse_file_passthrough_open(inode, file); else diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h index e721148c95d07d..9a1e895dd5df1b 100644 --- a/include/linux/backing-dev.h +++ b/include/linux/backing-dev.h @@ -66,16 +66,6 @@ static inline void wb_stat_mod(struct bdi_writeback *wb, percpu_counter_add_batch(&wb->stat[item], amount, WB_STAT_BATCH); } -static inline void inc_wb_stat(struct bdi_writeback *wb, enum wb_stat_item item) -{ - wb_stat_mod(wb, item, 1); -} - -static inline void dec_wb_stat(struct bdi_writeback *wb, enum wb_stat_item item) -{ - wb_stat_mod(wb, item, -1); -} - static inline s64 wb_stat(struct bdi_writeback *wb, enum wb_stat_item item) { return percpu_counter_read_positive(&wb->stat[item]); diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h index 23374de18e2fd9..0493b39af4adbb 100644 --- a/include/uapi/linux/fuse.h +++ b/include/uapi/linux/fuse.h @@ -445,8 +445,8 @@ struct fuse_file_lock { * init_out.request_timeout contains the timeout (in secs) * FUSE_INVAL_INODE_ENTRY: invalidate inode aliases when doing inode invalidation * FUSE_EXPIRE_INODE_ENTRY: expire inode aliases when doing inode invalidation - * FUSE_ALIGN_PG_ORDER: page order (power of 2 exponent for number of pages) for - * optimal io-size alignment + * FUSE_ALIGN_PG_ORDER: alignment order (power of 2 exponent of the IO size + * in bytes) for optimal io-size alignment * FUSE_URING_REDUCED_Q: Client (kernel) supports less queues - Server is free * to register between 1 and nr-core io-uring queues */ @@ -947,7 +947,8 @@ struct fuse_init_in { #define FUSE_COMPAT_22_INIT_OUT_SIZE 24 /* - * align_page_order: Number of pages for optimal IO, or a multiple of that + * align_page_order: log2 of the optimal IO size in bytes; IO is optimal + * when sized and aligned to (1 << align_page_order) or a multiple of it */ struct fuse_init_out { uint32_t major;