diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index f7126108a811..ab751a150f70 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -77,7 +77,7 @@ static int rds_release(struct socket *sock) rds_send_drop_to(rs, NULL); rds_rdma_drop_keys(rs); rds_notify_queue_get(rs, NULL); - __skb_queue_purge(&rs->rs_zcookie_queue); + rds_notify_msg_zcopy_purge(&rs->rs_zcookie_queue); spin_lock_bh(&rds_sock_lock); list_del_init(&rs->rs_item); @@ -180,7 +180,7 @@ static __poll_t rds_poll(struct file *file, struct socket *sock, } if (!list_empty(&rs->rs_recv_queue) || !list_empty(&rs->rs_notify_queue) || - !skb_queue_empty(&rs->rs_zcookie_queue)) + !list_empty(&rs->rs_zcookie_queue.zcookie_head)) mask |= (EPOLLIN | EPOLLRDNORM); if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) mask |= (EPOLLOUT | EPOLLWRNORM); @@ -515,7 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) INIT_LIST_HEAD(&rs->rs_recv_queue); INIT_LIST_HEAD(&rs->rs_notify_queue); INIT_LIST_HEAD(&rs->rs_cong_list); - skb_queue_head_init(&rs->rs_zcookie_queue); + rds_message_zcopy_queue_init(&rs->rs_zcookie_queue); spin_lock_init(&rs->rs_rdma_lock); rs->rs_rdma_keys = RB_ROOT; rs->rs_rx_traces = 0; diff --git a/net/rds/message.c b/net/rds/message.c index c36edbb92c34..90dcdcfe9f62 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -48,7 +48,6 @@ static unsigned int rds_exthdr_size[__RDS_EXTHDR_MAX] = { [RDS_EXTHDR_GEN_NUM] = sizeof(u32), }; - void rds_message_addref(struct rds_message *rm) { rdsdebug("addref rm %p ref %d\n", rm, refcount_read(&rm->m_refcount)); @@ -56,9 +55,9 @@ void rds_message_addref(struct rds_message *rm) } EXPORT_SYMBOL_GPL(rds_message_addref); -static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) +static inline bool rds_zcookie_add(struct rds_msg_zcopy_info *info, u32 cookie) { - struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb; + struct rds_zcopy_cookies *ck = &info->zcookies; int ncookies = ck->num; if (ncookies == RDS_MAX_ZCOOKIES) @@ -68,38 +67,61 @@ static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) return true; } +struct rds_msg_zcopy_info *rds_info_from_znotifier(struct rds_znotifier *znotif) +{ + return container_of(znotif, struct rds_msg_zcopy_info, znotif); +} + +void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *q) +{ + unsigned long flags; + LIST_HEAD(copy); + struct rds_msg_zcopy_info *info, *tmp; + + spin_lock_irqsave(&q->lock, flags); + list_splice(&q->zcookie_head, ©); + INIT_LIST_HEAD(&q->zcookie_head); + spin_unlock_irqrestore(&q->lock, flags); + + list_for_each_entry_safe(info, tmp, ©, rs_zcookie_next) { + list_del(&info->rs_zcookie_next); + kfree(info); + } +} + static void rds_rm_zerocopy_callback(struct rds_sock *rs, struct rds_znotifier *znotif) { - struct sk_buff *skb, *tail; - unsigned long flags; - struct sk_buff_head *q; + struct rds_msg_zcopy_info *info; + struct rds_msg_zcopy_queue *q; u32 cookie = znotif->z_cookie; struct rds_zcopy_cookies *ck; + struct list_head *head; + unsigned long flags; + mm_unaccount_pinned_pages(&znotif->z_mmp); q = &rs->rs_zcookie_queue; spin_lock_irqsave(&q->lock, flags); - tail = skb_peek_tail(q); - - if (tail && skb_zcookie_add(tail, cookie)) { - spin_unlock_irqrestore(&q->lock, flags); - mm_unaccount_pinned_pages(&znotif->z_mmp); - consume_skb(rds_skb_from_znotifier(znotif)); - /* caller invokes rds_wake_sk_sleep() */ - return; + head = &q->zcookie_head; + if (!list_empty(head)) { + info = list_entry(head, struct rds_msg_zcopy_info, + rs_zcookie_next); + if (info && rds_zcookie_add(info, cookie)) { + spin_unlock_irqrestore(&q->lock, flags); + kfree(rds_info_from_znotifier(znotif)); + /* caller invokes rds_wake_sk_sleep() */ + return; + } } - skb = rds_skb_from_znotifier(znotif); - ck = (struct rds_zcopy_cookies *)skb->cb; + info = rds_info_from_znotifier(znotif); + ck = &info->zcookies; memset(ck, 0, sizeof(*ck)); - WARN_ON(!skb_zcookie_add(skb, cookie)); - - __skb_queue_tail(q, skb); + WARN_ON(!rds_zcookie_add(info, cookie)); + list_add_tail(&q->zcookie_head, &info->rs_zcookie_next); spin_unlock_irqrestore(&q->lock, flags); /* caller invokes rds_wake_sk_sleep() */ - - mm_unaccount_pinned_pages(&znotif->z_mmp); } /* @@ -340,7 +362,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from) int ret = 0; int length = iov_iter_count(from); int total_copied = 0; - struct sk_buff *skb; + struct rds_msg_zcopy_info *info; rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from)); @@ -350,12 +372,11 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from) sg = rm->data.op_sg; sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */ - skb = alloc_skb(0, GFP_KERNEL); - if (!skb) + info = kzalloc(sizeof(*info), GFP_KERNEL); + if (!info) return -ENOMEM; - BUILD_BUG_ON(sizeof(skb->cb) < max_t(int, sizeof(struct rds_znotifier), - sizeof(struct rds_zcopy_cookies))); - rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb); + INIT_LIST_HEAD(&info->rs_zcookie_next); + rm->data.op_mmp_znotifier = &info->znotif; if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp, length)) { ret = -ENOMEM; @@ -389,7 +410,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from) WARN_ON_ONCE(length != 0); return ret; err: - consume_skb(skb); + kfree(info); rm->data.op_mmp_znotifier = NULL; return ret; } diff --git a/net/rds/rds.h b/net/rds/rds.h index 33b16353d8f3..74cd27c661de 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -357,16 +357,27 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie) #define RDS_MSG_FLUSH 8 struct rds_znotifier { - struct list_head z_list; struct mmpin z_mmp; u32 z_cookie; }; -#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0])) +struct rds_msg_zcopy_info { + struct list_head rs_zcookie_next; + union { + struct rds_znotifier znotif; + struct rds_zcopy_cookies zcookies; + }; +}; -static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z) +struct rds_msg_zcopy_queue { + struct list_head zcookie_head; + spinlock_t lock; /* protects zcookie_head queue */ +}; + +static inline void rds_message_zcopy_queue_init(struct rds_msg_zcopy_queue *q) { - return container_of((void *)z, struct sk_buff, cb); + spin_lock_init(&q->lock); + INIT_LIST_HEAD(&q->zcookie_head); } struct rds_message { @@ -603,8 +614,7 @@ struct rds_sock { /* Socket receive path trace points*/ u8 rs_rx_traces; u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX]; - - struct sk_buff_head rs_zcookie_queue; + struct rds_msg_zcopy_queue rs_zcookie_queue; }; static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk) @@ -803,6 +813,7 @@ void rds_message_addref(struct rds_message *rm); void rds_message_put(struct rds_message *rm); void rds_message_wait(struct rds_message *rm); void rds_message_unmapped(struct rds_message *rm); +void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *info); static inline void rds_message_make_checksum(struct rds_header *hdr) { diff --git a/net/rds/recv.c b/net/rds/recv.c index d50747725221..de50e2126e40 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -579,9 +579,10 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg, static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) { - struct sk_buff *skb; - struct sk_buff_head *q = &rs->rs_zcookie_queue; + struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue; + struct rds_msg_zcopy_info *info = NULL; struct rds_zcopy_cookies *done; + unsigned long flags; if (!msg->msg_control) return false; @@ -590,16 +591,24 @@ static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) msg->msg_controllen < CMSG_SPACE(sizeof(*done))) return false; - skb = skb_dequeue(q); - if (!skb) + spin_lock_irqsave(&q->lock, flags); + if (!list_empty(&q->zcookie_head)) { + info = list_entry(q->zcookie_head.next, + struct rds_msg_zcopy_info, rs_zcookie_next); + list_del(&info->rs_zcookie_next); + } + spin_unlock_irqrestore(&q->lock, flags); + if (!info) return false; - done = (struct rds_zcopy_cookies *)skb->cb; + done = &info->zcookies; if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done), done)) { - skb_queue_head(q, skb); + spin_lock_irqsave(&q->lock, flags); + list_add(&info->rs_zcookie_next, &q->zcookie_head); + spin_unlock_irqrestore(&q->lock, flags); return false; } - consume_skb(skb); + kfree(info); return true; }