// SPDX-License-Identifier: GPL-2.0-or-later /* Rolling buffer helpers * * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com) */ #include #include #include #include #include "internal.h" static atomic_t debug_ids; /** * netfs_folioq_alloc - Allocate a folio_queue struct * @rreq_id: Associated debugging ID for tracing purposes * @gfp: Allocation constraints * @trace: Trace tag to indicate the purpose of the allocation * * Allocate, initialise and account the folio_queue struct and log a trace line * to mark the allocation. */ struct folio_queue *netfs_folioq_alloc(unsigned int rreq_id, gfp_t gfp, unsigned int /*enum netfs_folioq_trace*/ trace) { struct folio_queue *fq; fq = kmalloc(sizeof(*fq), gfp); if (fq) { netfs_stat(&netfs_n_folioq); folioq_init(fq, rreq_id); fq->debug_id = atomic_inc_return(&debug_ids); trace_netfs_folioq(fq, trace); } return fq; } EXPORT_SYMBOL(netfs_folioq_alloc); /** * netfs_folioq_free - Free a folio_queue struct * @folioq: The object to free * @trace: Trace tag to indicate which free * * Free and unaccount the folio_queue struct. */ void netfs_folioq_free(struct folio_queue *folioq, unsigned int /*enum netfs_trace_folioq*/ trace) { trace_netfs_folioq(folioq, trace); netfs_stat_d(&netfs_n_folioq); kfree(folioq); } EXPORT_SYMBOL(netfs_folioq_free); /* * Initialise a rolling buffer. We allocate an empty folio queue struct to so * that the pointers can be independently driven by the producer and the * consumer. */ int rolling_buffer_init(struct rolling_buffer *roll, unsigned int rreq_id, unsigned int direction) { struct folio_queue *fq; fq = netfs_folioq_alloc(rreq_id, GFP_NOFS, netfs_trace_folioq_rollbuf_init); if (!fq) return -ENOMEM; roll->head = fq; roll->tail = fq; iov_iter_folio_queue(&roll->iter, direction, fq, 0, 0, 0); return 0; } /* * Add another folio_queue to a rolling buffer if there's no space left. */ int rolling_buffer_make_space(struct rolling_buffer *roll) { struct folio_queue *fq, *head = roll->head; if (!folioq_full(head)) return 0; fq = netfs_folioq_alloc(head->rreq_id, GFP_NOFS, netfs_trace_folioq_make_space); if (!fq) return -ENOMEM; fq->prev = head; roll->head = fq; if (folioq_full(head)) { /* Make sure we don't leave the master iterator pointing to a * block that might get immediately consumed. */ if (roll->iter.folioq == head && roll->iter.folioq_slot == folioq_nr_slots(head)) { roll->iter.folioq = fq; roll->iter.folioq_slot = 0; } } /* Make sure the initialisation is stored before the next pointer. * * [!] NOTE: After we set head->next, the consumer is at liberty to * immediately delete the old head. */ smp_store_release(&head->next, fq); return 0; } /* * Decant the list of folios to read into a rolling buffer. */ ssize_t rolling_buffer_load_from_ra(struct rolling_buffer *roll, struct readahead_control *ractl, struct folio_batch *put_batch) { struct folio_queue *fq; struct page **vec; int nr, ix, to; ssize_t size = 0; if (rolling_buffer_make_space(roll) < 0) return -ENOMEM; fq = roll->head; vec = (struct page **)fq->vec.folios; nr = __readahead_batch(ractl, vec + folio_batch_count(&fq->vec), folio_batch_space(&fq->vec)); ix = fq->vec.nr; to = ix + nr; fq->vec.nr = to; for (; ix < to; ix++) { struct folio *folio = folioq_folio(fq, ix); unsigned int order = folio_order(folio); fq->orders[ix] = order; size += PAGE_SIZE << order; trace_netfs_folio(folio, netfs_folio_trace_read); if (!folio_batch_add(put_batch, folio)) folio_batch_release(put_batch); } WRITE_ONCE(roll->iter.count, roll->iter.count + size); /* Store the counter after setting the slot. */ smp_store_release(&roll->next_head_slot, to); for (; ix < folioq_nr_slots(fq); ix++) folioq_clear(fq, ix); return size; } /* * Append a folio to the rolling buffer. */ ssize_t rolling_buffer_append(struct rolling_buffer *roll, struct folio *folio, unsigned int flags) { ssize_t size = folio_size(folio); int slot; if (rolling_buffer_make_space(roll) < 0) return -ENOMEM; slot = folioq_append(roll->head, folio); if (flags & ROLLBUF_MARK_1) folioq_mark(roll->head, slot); if (flags & ROLLBUF_MARK_2) folioq_mark2(roll->head, slot); WRITE_ONCE(roll->iter.count, roll->iter.count + size); /* Store the counter after setting the slot. */ smp_store_release(&roll->next_head_slot, slot); return size; } /* * Delete a spent buffer from a rolling queue and return the next in line. We * don't return the last buffer to keep the pointers independent, but return * NULL instead. */ struct folio_queue *rolling_buffer_delete_spent(struct rolling_buffer *roll) { struct folio_queue *spent = roll->tail, *next = READ_ONCE(spent->next); if (!next) return NULL; next->prev = NULL; netfs_folioq_free(spent, netfs_trace_folioq_delete); roll->tail = next; return next; } /* * Clear out a rolling queue. Folios that have mark 1 set are put. */ void rolling_buffer_clear(struct rolling_buffer *roll) { struct folio_batch fbatch; struct folio_queue *p; folio_batch_init(&fbatch); while ((p = roll->tail)) { roll->tail = p->next; for (int slot = 0; slot < folioq_count(p); slot++) { struct folio *folio = folioq_folio(p, slot); if (!folio) continue; if (folioq_is_marked(p, slot)) { trace_netfs_folio(folio, netfs_folio_trace_put); if (!folio_batch_add(&fbatch, folio)) folio_batch_release(&fbatch); } } netfs_folioq_free(p, netfs_trace_folioq_clear); } folio_batch_release(&fbatch); }