linux-stable/net/sunrpc/xprtrdma/svc_rdma_pcl.c
Chuck Lever 78147ca8b4 svcrdma: Add a "parsed chunk list" data structure
This simple data structure binds the location of each data payload
inside of an RPC message to the chunk that will be used to push it
to or pull it from the client.

There are several benefits to this small additional overhead:

 * It enables support for more than one chunk in incoming Read and
   Write lists.

 * It translates the version-specific on-the-wire format into a
   generic in-memory structure, enabling support for multiple
   versions of the RPC/RDMA transport protocol.

 * It enables the server to re-organize a chunk list if it needs to
   adjust where Read chunk data lands in server memory without
   altering the contents of the XDR-encoded Receive buffer.

Construction of these lists is done while sanity checking each
incoming RPC/RDMA header. Subsequent patches will make use of the
generated data structures.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
2020-11-30 13:00:22 -05:00

307 lines
7.7 KiB
C

// SPDX-License-Identifier: GPL-2.0
/*
* Copyright (c) 2020 Oracle. All rights reserved.
*/
#include <linux/sunrpc/svc_rdma.h>
#include <linux/sunrpc/rpc_rdma.h>
#include "xprt_rdma.h"
#include <trace/events/rpcrdma.h>
/**
* pcl_free - Release all memory associated with a parsed chunk list
* @pcl: parsed chunk list
*
*/
void pcl_free(struct svc_rdma_pcl *pcl)
{
while (!list_empty(&pcl->cl_chunks)) {
struct svc_rdma_chunk *chunk;
chunk = pcl_first_chunk(pcl);
list_del(&chunk->ch_list);
kfree(chunk);
}
}
static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position)
{
struct svc_rdma_chunk *chunk;
chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL);
if (!chunk)
return NULL;
chunk->ch_position = position;
chunk->ch_length = 0;
chunk->ch_payload_length = 0;
chunk->ch_segcount = 0;
return chunk;
}
static struct svc_rdma_chunk *
pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position)
{
struct svc_rdma_chunk *pos;
pcl_for_each_chunk(pos, pcl) {
if (pos->ch_position == position)
return pos;
}
return NULL;
}
static void pcl_insert_position(struct svc_rdma_pcl *pcl,
struct svc_rdma_chunk *chunk)
{
struct svc_rdma_chunk *pos;
pcl_for_each_chunk(pos, pcl) {
if (pos->ch_position > chunk->ch_position)
break;
}
__list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list);
pcl->cl_count++;
}
static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt,
struct svc_rdma_chunk *chunk,
u32 handle, u32 length, u64 offset)
{
struct svc_rdma_segment *segment;
segment = &chunk->ch_segments[chunk->ch_segcount];
segment->rs_handle = handle;
segment->rs_length = length;
segment->rs_offset = offset;
trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment);
chunk->ch_length += length;
chunk->ch_segcount++;
}
/**
* pcl_alloc_call - Construct a parsed chunk list for the Call body
* @rctxt: Ingress receive context
* @p: Start of an un-decoded Read list
*
* Assumptions:
* - The incoming Read list has already been sanity checked.
* - cl_count is already set to the number of segments in
* the un-decoded list.
* - The list might not be in order by position.
*
* Return values:
* %true: Parsed chunk list was successfully constructed, and
* cl_count is updated to be the number of chunks (ie.
* unique positions) in the Read list.
* %false: Memory allocation failed.
*/
bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
{
struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl;
unsigned int i, segcount = pcl->cl_count;
pcl->cl_count = 0;
for (i = 0; i < segcount; i++) {
struct svc_rdma_chunk *chunk;
u32 position, handle, length;
u64 offset;
p++; /* skip the list discriminator */
p = xdr_decode_read_segment(p, &position, &handle,
&length, &offset);
if (position != 0)
continue;
if (pcl_is_empty(pcl)) {
chunk = pcl_alloc_chunk(segcount, position);
if (!chunk)
return false;
pcl_insert_position(pcl, chunk);
} else {
chunk = list_first_entry(&pcl->cl_chunks,
struct svc_rdma_chunk,
ch_list);
}
pcl_set_read_segment(rctxt, chunk, handle, length, offset);
}
return true;
}
/**
* pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
* @rctxt: Ingress receive context
* @p: Start of an un-decoded Read list
*
* Assumptions:
* - The incoming Read list has already been sanity checked.
* - cl_count is already set to the number of segments in
* the un-decoded list.
* - The list might not be in order by position.
*
* Return values:
* %true: Parsed chunk list was successfully constructed, and
* cl_count is updated to be the number of chunks (ie.
* unique position values) in the Read list.
* %false: Memory allocation failed.
*
* TODO:
* - Check for chunk range overlaps
*/
bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
{
struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl;
unsigned int i, segcount = pcl->cl_count;
pcl->cl_count = 0;
for (i = 0; i < segcount; i++) {
struct svc_rdma_chunk *chunk;
u32 position, handle, length;
u64 offset;
p++; /* skip the list discriminator */
p = xdr_decode_read_segment(p, &position, &handle,
&length, &offset);
if (position == 0)
continue;
chunk = pcl_lookup_position(pcl, position);
if (!chunk) {
chunk = pcl_alloc_chunk(segcount, position);
if (!chunk)
return false;
pcl_insert_position(pcl, chunk);
}
pcl_set_read_segment(rctxt, chunk, handle, length, offset);
}
return true;
}
/**
* pcl_alloc_write - Construct a parsed chunk list from a Write list
* @rctxt: Ingress receive context
* @pcl: Parsed chunk list to populate
* @p: Start of an un-decoded Write list
*
* Assumptions:
* - The incoming Write list has already been sanity checked, and
* - cl_count is set to the number of chunks in the un-decoded list.
*
* Return values:
* %true: Parsed chunk list was successfully constructed.
* %false: Memory allocation failed.
*/
bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
struct svc_rdma_pcl *pcl, __be32 *p)
{
struct svc_rdma_segment *segment;
struct svc_rdma_chunk *chunk;
unsigned int i, j;
u32 segcount;
for (i = 0; i < pcl->cl_count; i++) {
p++; /* skip the list discriminator */
segcount = be32_to_cpup(p++);
chunk = pcl_alloc_chunk(segcount, 0);
if (!chunk)
return false;
list_add_tail(&chunk->ch_list, &pcl->cl_chunks);
for (j = 0; j < segcount; j++) {
segment = &chunk->ch_segments[j];
p = xdr_decode_rdma_segment(p, &segment->rs_handle,
&segment->rs_length,
&segment->rs_offset);
trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j);
chunk->ch_length += segment->rs_length;
chunk->ch_segcount++;
}
}
return true;
}
static int pcl_process_region(const struct xdr_buf *xdr,
unsigned int offset, unsigned int length,
int (*actor)(const struct xdr_buf *, void *),
void *data)
{
struct xdr_buf subbuf;
if (!length)
return 0;
if (xdr_buf_subsegment(xdr, &subbuf, offset, length))
return -EMSGSIZE;
return actor(&subbuf, data);
}
/**
* pcl_process_nonpayloads - Process non-payload regions inside @xdr
* @pcl: Chunk list to process
* @xdr: xdr_buf to process
* @actor: Function to invoke on each non-payload region
* @data: Arguments for @actor
*
* This mechanism must ignore not only result payloads that were already
* sent via RDMA Write, but also XDR padding for those payloads that
* the upper layer has added.
*
* Assumptions:
* The xdr->len and ch_position fields are aligned to 4-byte multiples.
*
* Returns:
* On success, zero,
* %-EMSGSIZE on XDR buffer overflow, or
* The return value of @actor
*/
int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
const struct xdr_buf *xdr,
int (*actor)(const struct xdr_buf *, void *),
void *data)
{
struct svc_rdma_chunk *chunk, *next;
unsigned int start;
int ret;
chunk = pcl_first_chunk(pcl);
/* No result payloads were generated */
if (!chunk || !chunk->ch_payload_length)
return actor(xdr, data);
/* Process the region before the first result payload */
ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data);
if (ret < 0)
return ret;
/* Process the regions between each middle result payload */
while ((next = pcl_next_chunk(pcl, chunk))) {
if (!next->ch_payload_length)
break;
start = pcl_chunk_end_offset(chunk);
ret = pcl_process_region(xdr, start, next->ch_position - start,
actor, data);
if (ret < 0)
return ret;
chunk = next;
}
/* Process the region after the last result payload */
start = pcl_chunk_end_offset(chunk);
ret = pcl_process_region(xdr, start, xdr->len - start, actor, data);
if (ret < 0)
return ret;
return 0;
}