rxrpc: Allow the kernel to mark a call as being non-interruptible

Allow kernel services using AF_RXRPC to indicate that a call should be
non-interruptible.  This allows kafs to make things like lock-extension and
writeback data storage calls non-interruptible.

If this is set, signals will be ignored for operations on that call where
possible - such as waiting to get a call channel on an rxrpc connection.

It doesn't prevent UDP sendmsg from being interrupted, but that will be
handled by packet retransmission.

rxrpc_kernel_recv_data() isn't affected by this since that never waits,
preferring instead to return -EAGAIN and leave the waiting to the caller.

Userspace initiated calls can't be set to be uninterruptible at this time.

Signed-off-by: David Howells <dhowells@redhat.com>
This commit is contained in:
David Howells 2019-05-09 08:21:21 +01:00
parent 0ab4c95948
commit b960a34b73
8 changed files with 28 additions and 4 deletions

View File

@ -796,7 +796,9 @@ The kernel interface functions are as follows:
s64 tx_total_len, s64 tx_total_len,
gfp_t gfp, gfp_t gfp,
rxrpc_notify_rx_t notify_rx, rxrpc_notify_rx_t notify_rx,
bool upgrade); bool upgrade,
bool intr,
unsigned int debug_id);
This allocates the infrastructure to make a new RxRPC call and assigns This allocates the infrastructure to make a new RxRPC call and assigns
call and connection numbers. The call will be made on the UDP port that call and connection numbers. The call will be made on the UDP port that
@ -824,6 +826,13 @@ The kernel interface functions are as follows:
the server upgrade the service to a better one. The resultant service ID the server upgrade the service to a better one. The resultant service ID
is returned by rxrpc_kernel_recv_data(). is returned by rxrpc_kernel_recv_data().
intr should be set to true if the call should be interruptible. If this
is not set, this function may not return until a channel has been
allocated; if it is set, the function may return -ERESTARTSYS.
debug_id is the call debugging ID to be used for tracing. This can be
obtained by atomically incrementing rxrpc_debug_id.
If this function is successful, an opaque reference to the RxRPC call is If this function is successful, an opaque reference to the RxRPC call is
returned. The caller now holds a reference on this and it must be returned. The caller now holds a reference on this and it must be
properly ended. properly ended.

View File

@ -417,6 +417,7 @@ void afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, gfp_t gfp)
afs_wake_up_async_call : afs_wake_up_async_call :
afs_wake_up_call_waiter), afs_wake_up_call_waiter),
call->upgrade, call->upgrade,
true,
call->debug_id); call->debug_id);
if (IS_ERR(rxcall)) { if (IS_ERR(rxcall)) {
ret = PTR_ERR(rxcall); ret = PTR_ERR(rxcall);

View File

@ -45,6 +45,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
gfp_t, gfp_t,
rxrpc_notify_rx_t, rxrpc_notify_rx_t,
bool, bool,
bool,
unsigned int); unsigned int);
int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *, int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
struct msghdr *, size_t, struct msghdr *, size_t,

View File

@ -270,6 +270,7 @@ static int rxrpc_listen(struct socket *sock, int backlog)
* @gfp: The allocation constraints * @gfp: The allocation constraints
* @notify_rx: Where to send notifications instead of socket queue * @notify_rx: Where to send notifications instead of socket queue
* @upgrade: Request service upgrade for call * @upgrade: Request service upgrade for call
* @intr: The call is interruptible
* @debug_id: The debug ID for tracing to be assigned to the call * @debug_id: The debug ID for tracing to be assigned to the call
* *
* Allow a kernel service to begin a call on the nominated socket. This just * Allow a kernel service to begin a call on the nominated socket. This just
@ -287,6 +288,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
gfp_t gfp, gfp_t gfp,
rxrpc_notify_rx_t notify_rx, rxrpc_notify_rx_t notify_rx,
bool upgrade, bool upgrade,
bool intr,
unsigned int debug_id) unsigned int debug_id)
{ {
struct rxrpc_conn_parameters cp; struct rxrpc_conn_parameters cp;
@ -311,6 +313,7 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
memset(&p, 0, sizeof(p)); memset(&p, 0, sizeof(p));
p.user_call_ID = user_call_ID; p.user_call_ID = user_call_ID;
p.tx_total_len = tx_total_len; p.tx_total_len = tx_total_len;
p.intr = intr;
memset(&cp, 0, sizeof(cp)); memset(&cp, 0, sizeof(cp));
cp.local = rx->local; cp.local = rx->local;

View File

@ -482,6 +482,7 @@ enum rxrpc_call_flag {
RXRPC_CALL_BEGAN_RX_TIMER, /* We began the expect_rx_by timer */ RXRPC_CALL_BEGAN_RX_TIMER, /* We began the expect_rx_by timer */
RXRPC_CALL_RX_HEARD, /* The peer responded at least once to this call */ RXRPC_CALL_RX_HEARD, /* The peer responded at least once to this call */
RXRPC_CALL_RX_UNDERRUN, /* Got data underrun */ RXRPC_CALL_RX_UNDERRUN, /* Got data underrun */
RXRPC_CALL_IS_INTR, /* The call is interruptible */
}; };
/* /*
@ -711,6 +712,7 @@ struct rxrpc_call_params {
u32 normal; /* Max time since last call packet (msec) */ u32 normal; /* Max time since last call packet (msec) */
} timeouts; } timeouts;
u8 nr_timeouts; /* Number of timeouts specified */ u8 nr_timeouts; /* Number of timeouts specified */
bool intr; /* The call is interruptible */
}; };
struct rxrpc_send_params { struct rxrpc_send_params {

View File

@ -241,6 +241,8 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
return call; return call;
} }
if (p->intr)
__set_bit(RXRPC_CALL_IS_INTR, &call->flags);
call->tx_total_len = p->tx_total_len; call->tx_total_len = p->tx_total_len;
trace_rxrpc_call(call, rxrpc_call_new_client, atomic_read(&call->usage), trace_rxrpc_call(call, rxrpc_call_new_client, atomic_read(&call->usage),
here, (const void *)p->user_call_ID); here, (const void *)p->user_call_ID);

View File

@ -656,10 +656,14 @@ static int rxrpc_wait_for_channel(struct rxrpc_call *call, gfp_t gfp)
add_wait_queue_exclusive(&call->waitq, &myself); add_wait_queue_exclusive(&call->waitq, &myself);
for (;;) { for (;;) {
set_current_state(TASK_INTERRUPTIBLE); if (test_bit(RXRPC_CALL_IS_INTR, &call->flags))
set_current_state(TASK_INTERRUPTIBLE);
else
set_current_state(TASK_UNINTERRUPTIBLE);
if (call->call_id) if (call->call_id)
break; break;
if (signal_pending(current)) { if (test_bit(RXRPC_CALL_IS_INTR, &call->flags) &&
signal_pending(current)) {
ret = -ERESTARTSYS; ret = -ERESTARTSYS;
break; break;
} }

View File

@ -80,7 +80,8 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
if (call->state >= RXRPC_CALL_COMPLETE) if (call->state >= RXRPC_CALL_COMPLETE)
return call->error; return call->error;
if (timeout == 0 && if (test_bit(RXRPC_CALL_IS_INTR, &call->flags) &&
timeout == 0 &&
tx_win == tx_start && signal_pending(current)) tx_win == tx_start && signal_pending(current))
return -EINTR; return -EINTR;
@ -620,6 +621,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
.call.tx_total_len = -1, .call.tx_total_len = -1,
.call.user_call_ID = 0, .call.user_call_ID = 0,
.call.nr_timeouts = 0, .call.nr_timeouts = 0,
.call.intr = true,
.abort_code = 0, .abort_code = 0,
.command = RXRPC_CMD_SEND_DATA, .command = RXRPC_CMD_SEND_DATA,
.exclusive = false, .exclusive = false,