From 99c58d6480d937dbdc2b4acfdea1bcf7ab113e5e Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 20 Jul 2023 08:22:39 -0400 Subject: [PATCH 01/17] fs: dlm: remove twice newline This patch removes a newline which log_print() already adds, also removes wrapped string that causes a checkpatch warning. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index 70a4752ed913..a34f605d8505 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -240,8 +240,8 @@ static int dlm_plock_callback(struct plock_op *op) rv = notify(fl, 0); if (rv) { /* XXX: We need to cancel the fs lock here: */ - log_print("dlm_plock_callback: lock granted after lock request " - "failed; dangling lock!\n"); + log_print("%s: lock granted after lock request failed; dangling lock!", + __func__); goto out; } From 568f915655b3b4c40032104e4d0014e5e2c474b9 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 20 Jul 2023 08:22:40 -0400 Subject: [PATCH 02/17] fs: dlm: allow to F_SETLKW getting interrupted This patch implements dlm plock F_SETLKW interruption feature. If a blocking posix lock request got interrupted in user space by a signal a cancellation request for a non granted lock request to the user space lock manager will be send. The user lock manager answers either with zero or a negative errno code. A errno of -ENOENT signals that there is currently no blocking lock request waiting to being granted. In case of -ENOENT it was probably to late to request a cancellation and the pending lock got granted. In any error case we will wait until the lock is being granted as cancellation failed, this causes also that in case of an older user lock manager returning -EINVAL we will wait as cancellation is not supported which should be fine. If a user requires this feature the user should update dlm user space to support lock request cancellation. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 54 ++++++++++++++++++++++------------ include/uapi/linux/dlm_plock.h | 1 + 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index a34f605d8505..a8ffa0760913 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -74,30 +74,26 @@ static void send_op(struct plock_op *op) wake_up(&send_wq); } -/* If a process was killed while waiting for the only plock on a file, - locks_remove_posix will not see any lock on the file so it won't - send an unlock-close to us to pass on to userspace to clean up the - abandoned waiter. So, we have to insert the unlock-close when the - lock call is interrupted. */ - -static void do_unlock_close(const struct dlm_plock_info *info) +static int do_lock_cancel(const struct dlm_plock_info *orig_info) { struct plock_op *op; + int rv; op = kzalloc(sizeof(*op), GFP_NOFS); if (!op) - return; + return -ENOMEM; - op->info.optype = DLM_PLOCK_OP_UNLOCK; - op->info.pid = info->pid; - op->info.fsid = info->fsid; - op->info.number = info->number; - op->info.start = 0; - op->info.end = OFFSET_MAX; - op->info.owner = info->owner; + op->info = *orig_info; + op->info.optype = DLM_PLOCK_OP_CANCEL; + op->info.wait = 0; - op->info.flags |= DLM_PLOCK_FL_CLOSE; send_op(op); + wait_event(recv_wq, (op->done != 0)); + + rv = op->info.rv; + + dlm_release_plock_op(op); + return rv; } int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, @@ -156,7 +152,7 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, send_op(op); if (op->info.wait) { - rv = wait_event_killable(recv_wq, (op->done != 0)); + rv = wait_event_interruptible(recv_wq, (op->done != 0)); if (rv == -ERESTARTSYS) { spin_lock(&ops_lock); /* recheck under ops_lock if we got a done != 0, @@ -166,17 +162,37 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, spin_unlock(&ops_lock); goto do_lock_wait; } - list_del(&op->list); spin_unlock(&ops_lock); + rv = do_lock_cancel(&op->info); + switch (rv) { + case 0: + /* waiter was deleted in user space, answer will never come + * remove original request. The original request must be + * on recv_list because the answer of do_lock_cancel() + * synchronized it. + */ + spin_lock(&ops_lock); + list_del(&op->list); + spin_unlock(&ops_lock); + rv = -EINTR; + break; + case -ENOENT: + /* cancellation wasn't successful but op should be done */ + fallthrough; + default: + /* internal error doing cancel we need to wait */ + goto wait; + } + log_debug(ls, "%s: wait interrupted %x %llx pid %d", __func__, ls->ls_global_id, (unsigned long long)number, op->info.pid); - do_unlock_close(&op->info); dlm_release_plock_op(op); goto out; } } else { +wait: wait_event(recv_wq, (op->done != 0)); } diff --git a/include/uapi/linux/dlm_plock.h b/include/uapi/linux/dlm_plock.h index 63b6c1fd9169..eb66afcac40e 100644 --- a/include/uapi/linux/dlm_plock.h +++ b/include/uapi/linux/dlm_plock.h @@ -22,6 +22,7 @@ enum { DLM_PLOCK_OP_LOCK = 1, DLM_PLOCK_OP_UNLOCK, DLM_PLOCK_OP_GET, + DLM_PLOCK_OP_CANCEL, }; #define DLM_PLOCK_FL_CLOSE 1 From dc52cd2eff4ac924a795efcef27f8fd58a5260bb Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 20 Jul 2023 08:22:41 -0400 Subject: [PATCH 03/17] fs: dlm: fix F_CANCELLK to cancel pending request This patch fixes the current handling of F_CANCELLK by not just doing a unlock as we need to try to cancel a lock at first. A unlock makes sense on a non-blocking lock request but if it's a blocking lock request we need to cancel the request until it's not granted yet. This patch is fixing this behaviour by first try to cancel a lock request and if it's failed it's unlocking the lock which seems to be granted. Note: currently the nfs locking handling was disabled by commit 40595cdc93ed ("nfs: block notification on fs with its own ->lock"). However DLM was never being updated regarding to this change. Future patches will try to fix lockd lock requests for DLM. This patch is currently assuming the upstream DLM lockd handling is correct. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 103 +++++++++++++++++++++++++++++++++----- fs/gfs2/file.c | 9 ++-- fs/ocfs2/stack_user.c | 13 ++--- include/linux/dlm_plock.h | 2 + 4 files changed, 98 insertions(+), 29 deletions(-) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index a8ffa0760913..943d9f8e5564 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -42,6 +42,27 @@ static inline void set_version(struct dlm_plock_info *info) info->version[2] = DLM_PLOCK_VERSION_PATCH; } +static struct plock_op *plock_lookup_waiter(const struct dlm_plock_info *info) +{ + struct plock_op *op = NULL, *iter; + + list_for_each_entry(iter, &recv_list, list) { + if (iter->info.fsid == info->fsid && + iter->info.number == info->number && + iter->info.owner == info->owner && + iter->info.pid == info->pid && + iter->info.start == info->start && + iter->info.end == info->end && + iter->info.ex == info->ex && + iter->info.wait) { + op = iter; + break; + } + } + + return op; +} + static int check_version(struct dlm_plock_info *info) { if ((DLM_PLOCK_VERSION_MAJOR != info->version[0]) || @@ -334,6 +355,74 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, } EXPORT_SYMBOL_GPL(dlm_posix_unlock); +/* + * NOTE: This implementation can only handle async lock requests as nfs + * do it. It cannot handle cancellation of a pending lock request sitting + * in wait_event(), but for now only nfs is the only user local kernel + * user. + */ +int dlm_posix_cancel(dlm_lockspace_t *lockspace, u64 number, struct file *file, + struct file_lock *fl) +{ + struct dlm_plock_info info; + struct plock_op *op; + struct dlm_ls *ls; + int rv; + + /* this only works for async request for now and nfs is the only + * kernel user right now. + */ + if (WARN_ON_ONCE(!fl->fl_lmops || !fl->fl_lmops->lm_grant)) + return -EOPNOTSUPP; + + ls = dlm_find_lockspace_local(lockspace); + if (!ls) + return -EINVAL; + + memset(&info, 0, sizeof(info)); + info.pid = fl->fl_pid; + info.ex = (fl->fl_type == F_WRLCK); + info.fsid = ls->ls_global_id; + dlm_put_lockspace(ls); + info.number = number; + info.start = fl->fl_start; + info.end = fl->fl_end; + info.owner = (__u64)fl->fl_pid; + + rv = do_lock_cancel(&info); + switch (rv) { + case 0: + spin_lock(&ops_lock); + /* lock request to cancel must be on recv_list because + * do_lock_cancel() synchronizes it. + */ + op = plock_lookup_waiter(&info); + if (WARN_ON_ONCE(!op)) { + rv = -ENOLCK; + break; + } + + list_del(&op->list); + spin_unlock(&ops_lock); + WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK); + op->data->callback(op->data->fl, -EINTR); + dlm_release_plock_op(op); + rv = -EINTR; + break; + case -ENOENT: + /* if cancel wasn't successful we probably were to late + * or it was a non-blocking lock request, so just unlock it. + */ + rv = dlm_posix_unlock(lockspace, number, file, fl); + break; + default: + break; + } + + return rv; +} +EXPORT_SYMBOL_GPL(dlm_posix_cancel); + int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file, struct file_lock *fl) { @@ -457,19 +546,7 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, */ spin_lock(&ops_lock); if (info.wait) { - list_for_each_entry(iter, &recv_list, list) { - if (iter->info.fsid == info.fsid && - iter->info.number == info.number && - iter->info.owner == info.owner && - iter->info.pid == info.pid && - iter->info.start == info.start && - iter->info.end == info.end && - iter->info.ex == info.ex && - iter->info.wait) { - op = iter; - break; - } - } + op = plock_lookup_waiter(&info); } else { list_for_each_entry(iter, &recv_list, list) { if (!iter->info.wait) { diff --git a/fs/gfs2/file.c b/fs/gfs2/file.c index 1bf3c4453516..386eceb2f574 100644 --- a/fs/gfs2/file.c +++ b/fs/gfs2/file.c @@ -1436,17 +1436,14 @@ static int gfs2_lock(struct file *file, int cmd, struct file_lock *fl) if (!(fl->fl_flags & FL_POSIX)) return -ENOLCK; - if (cmd == F_CANCELLK) { - /* Hack: */ - cmd = F_SETLK; - fl->fl_type = F_UNLCK; - } if (unlikely(gfs2_withdrawn(sdp))) { if (fl->fl_type == F_UNLCK) locks_lock_file_wait(file, fl); return -EIO; } - if (IS_GETLK(cmd)) + if (cmd == F_CANCELLK) + return dlm_posix_cancel(ls->ls_dlm, ip->i_no_addr, file, fl); + else if (IS_GETLK(cmd)) return dlm_posix_get(ls->ls_dlm, ip->i_no_addr, file, fl); else if (fl->fl_type == F_UNLCK) return dlm_posix_unlock(ls->ls_dlm, ip->i_no_addr, file, fl); diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c index 05d4414d0c33..9b76ee66aeb2 100644 --- a/fs/ocfs2/stack_user.c +++ b/fs/ocfs2/stack_user.c @@ -738,18 +738,11 @@ static int user_plock(struct ocfs2_cluster_connection *conn, * * Internally, fs/dlm will pass these to a misc device, which * a userspace daemon will read and write to. - * - * For now, cancel requests (which happen internally only), - * are turned into unlocks. Most of this function taken from - * gfs2_lock. */ - if (cmd == F_CANCELLK) { - cmd = F_SETLK; - fl->fl_type = F_UNLCK; - } - - if (IS_GETLK(cmd)) + if (cmd == F_CANCELLK) + return dlm_posix_cancel(conn->cc_lockspace, ino, file, fl); + else if (IS_GETLK(cmd)) return dlm_posix_get(conn->cc_lockspace, ino, file, fl); else if (fl->fl_type == F_UNLCK) return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); diff --git a/include/linux/dlm_plock.h b/include/linux/dlm_plock.h index e6d76e8715a6..15fc856d198c 100644 --- a/include/linux/dlm_plock.h +++ b/include/linux/dlm_plock.h @@ -11,6 +11,8 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, int cmd, struct file_lock *fl); int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, struct file_lock *fl); +int dlm_posix_cancel(dlm_lockspace_t *lockspace, u64 number, struct file *file, + struct file_lock *fl); int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file, struct file_lock *fl); #endif From e717f2e8e4896f4c59a865b11d5cb957b0bfb0e1 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:38 -0400 Subject: [PATCH 04/17] fs: dlm: add missing spin_unlock This patch fixes commit dc52cd2eff4a ("fs: dlm: fix F_CANCELLK to cancel pending request") that we don't unlock the ops_lock in a rate case when a waiter cannot be found. This case can only happen when cancellation of plock operation was successful but no kernel waiter was being found. Fixes: dc52cd2eff4a ("fs: dlm: fix F_CANCELLK to cancel pending request") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 1 + 1 file changed, 1 insertion(+) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index 943d9f8e5564..44b3aab5b709 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -398,6 +398,7 @@ int dlm_posix_cancel(dlm_lockspace_t *lockspace, u64 number, struct file *file, */ op = plock_lookup_waiter(&info); if (WARN_ON_ONCE(!op)) { + spin_unlock(&ops_lock); rv = -ENOLCK; break; } From 4b056db81c5dd79d786b44c371f6e0b4371735c3 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:39 -0400 Subject: [PATCH 05/17] fs: dlm: remove unused processed_nodes The variable processed_nodes is not being used by commit 1696c75f1864 ("fs: dlm: add send ack threshold and append acks to msgs"). This patch removes the leftover of this commit. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 1 - 1 file changed, 1 deletion(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 9f14ea9f6322..f7bc22e74db2 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -863,7 +863,6 @@ struct dlm_processed_nodes { static void process_dlm_messages(struct work_struct *work) { struct processqueue_entry *pentry; - LIST_HEAD(processed_nodes); spin_lock(&processqueue_lock); pentry = list_first_entry_or_null(&processqueue, From 541adb0d4d10b4daf15f4b6b73c5d6b855d23eb5 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:40 -0400 Subject: [PATCH 06/17] fs: dlm: debugfs for queued callbacks It was useful to debug an issue with the callback queue to check if any callbacks in any lkb are for some reason not processed by the callback workqueue. The mentioned issue was fixed by commit a034c1370ded ("fs: dlm: fix DLM_IFL_CB_PENDING gets overwritten"). If there are similar issue that looks like a ast callback was not processed, we can confirm now that it is not sitting to be processed by the callback workqueue anymore. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/debug_fs.c | 101 +++++++++++++++++++++++++++++++++++++++++- fs/dlm/dlm_internal.h | 1 + 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index a1aca41c49d0..5aabcb6f0f15 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -18,6 +18,7 @@ #include "dlm_internal.h" #include "midcomms.h" #include "lock.h" +#include "ast.h" #define DLM_DEBUG_BUF_LEN 4096 static char debug_buf[DLM_DEBUG_BUF_LEN]; @@ -365,6 +366,52 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s) unlock_rsb(r); } +static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb) +{ + struct dlm_callback *cb; + + /* lkb_id lkb_flags mode flags sb_status sb_flags */ + + spin_lock(&lkb->lkb_cb_lock); + list_for_each_entry(cb, &lkb->lkb_callbacks, list) { + seq_printf(s, "%x %x %d %x %d %x\n", + lkb->lkb_id, + dlm_iflags_val(lkb), + cb->mode, + cb->flags, + cb->sb_status, + cb->sb_flags); + } + spin_unlock(&lkb->lkb_cb_lock); +} + +static void print_format5(struct dlm_rsb *r, struct seq_file *s) +{ + struct dlm_lkb *lkb; + + lock_rsb(r); + + list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { + print_format5_lock(s, lkb); + if (seq_has_overflowed(s)) + goto out; + } + + list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { + print_format5_lock(s, lkb); + if (seq_has_overflowed(s)) + goto out; + } + + list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) { + print_format5_lock(s, lkb); + if (seq_has_overflowed(s)) + goto out; + } + out: + unlock_rsb(r); +} + struct rsbtbl_iter { struct dlm_rsb *rsb; unsigned bucket; @@ -408,6 +455,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr) } print_format4(ri->rsb, seq); break; + case 5: + if (ri->header) { + seq_puts(seq, "lkb_id lkb_flags mode flags sb_status sb_flags\n"); + ri->header = 0; + } + print_format5(ri->rsb, seq); + break; } return 0; @@ -417,6 +471,7 @@ static const struct seq_operations format1_seq_ops; static const struct seq_operations format2_seq_ops; static const struct seq_operations format3_seq_ops; static const struct seq_operations format4_seq_ops; +static const struct seq_operations format5_seq_ops; static void *table_seq_start(struct seq_file *seq, loff_t *pos) { @@ -448,6 +503,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ri->format = 3; if (seq->op == &format4_seq_ops) ri->format = 4; + if (seq->op == &format5_seq_ops) + ri->format = 5; tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; @@ -602,10 +659,18 @@ static const struct seq_operations format4_seq_ops = { .show = table_seq_show, }; +static const struct seq_operations format5_seq_ops = { + .start = table_seq_start, + .next = table_seq_next, + .stop = table_seq_stop, + .show = table_seq_show, +}; + static const struct file_operations format1_fops; static const struct file_operations format2_fops; static const struct file_operations format3_fops; static const struct file_operations format4_fops; +static const struct file_operations format5_fops; static int table_open1(struct inode *inode, struct file *file) { @@ -683,7 +748,21 @@ static int table_open4(struct inode *inode, struct file *file) struct seq_file *seq; int ret; - ret = seq_open(file, &format4_seq_ops); + ret = seq_open(file, &format5_seq_ops); + if (ret) + return ret; + + seq = file->private_data; + seq->private = inode->i_private; /* the dlm_ls */ + return 0; +} + +static int table_open5(struct inode *inode, struct file *file) +{ + struct seq_file *seq; + int ret; + + ret = seq_open(file, &format5_seq_ops); if (ret) return ret; @@ -725,6 +804,14 @@ static const struct file_operations format4_fops = { .release = seq_release }; +static const struct file_operations format5_fops = { + .owner = THIS_MODULE, + .open = table_open5, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release +}; + /* * dump lkb's on the ls_waiters list */ @@ -793,6 +880,7 @@ void dlm_delete_debug_file(struct dlm_ls *ls) debugfs_remove(ls->ls_debug_locks_dentry); debugfs_remove(ls->ls_debug_all_dentry); debugfs_remove(ls->ls_debug_toss_dentry); + debugfs_remove(ls->ls_debug_queued_asts_dentry); } static int dlm_state_show(struct seq_file *file, void *offset) @@ -936,6 +1024,17 @@ void dlm_create_debug_file(struct dlm_ls *ls) dlm_root, ls, &waiters_fops); + + /* format 5 */ + + memset(name, 0, sizeof(name)); + snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_queued_asts", ls->ls_name); + + ls->ls_debug_queued_asts_dentry = debugfs_create_file(name, + 0644, + dlm_root, + ls, + &format5_fops); } void __init dlm_register_debugfs(void) diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index c8156770205e..dfc444dad329 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -598,6 +598,7 @@ struct dlm_ls { struct dentry *ls_debug_locks_dentry; /* debugfs */ struct dentry *ls_debug_all_dentry; /* debugfs */ struct dentry *ls_debug_toss_dentry; /* debugfs */ + struct dentry *ls_debug_queued_asts_dentry; /* debugfs */ wait_queue_head_t ls_uevent_wait; /* user part of join/leave */ int ls_uevent_result; From 67b5da9a40fc984d25bda90a918e490e8c2555b7 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:41 -0400 Subject: [PATCH 07/17] fs: dlm: check on plock ops when exit dlm To be sure we don't have any issues that there are leftover plock ops in either send_list or recv_list we simple check if either one of the list are empty when we exit the dlm subsystem. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index 44b3aab5b709..5c2cc8d940ef 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -628,5 +628,7 @@ int dlm_plock_init(void) void dlm_plock_exit(void) { misc_deregister(&plock_dev_misc); + WARN_ON(!list_empty(&send_list)); + WARN_ON(!list_empty(&recv_list)); } From 8c95006d55726eeebf3b863335accfba50d4bc8f Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:42 -0400 Subject: [PATCH 08/17] fs: dlm: add plock dev tracepoints I currently debug nfs plock handling and introduce those two tracepoints for getting more information about what is happening there if the user space reads plock operations from kernel and writing the result back. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 6 +++++ include/trace/events/dlm.h | 51 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index 5c2cc8d940ef..00e1d802a81c 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -11,6 +11,8 @@ #include #include +#include + #include "dlm_internal.h" #include "lockspace.h" @@ -509,6 +511,8 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count, if (!op) return -EAGAIN; + trace_dlm_plock_read(&info); + /* there is no need to get a reply from userspace for unlocks that were generated by the vfs cleaning up for a close (the process did not make an unlock call). */ @@ -536,6 +540,8 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, if (copy_from_user(&info, u, sizeof(info))) return -EFAULT; + trace_dlm_plock_write(&info); + if (check_version(&info)) return -EINVAL; diff --git a/include/trace/events/dlm.h b/include/trace/events/dlm.h index 2b09574e1243..c1a146f9fc91 100644 --- a/include/trace/events/dlm.h +++ b/include/trace/events/dlm.h @@ -7,6 +7,7 @@ #include #include +#include #include #include "../../../fs/dlm/dlm_internal.h" @@ -585,6 +586,56 @@ TRACE_EVENT(dlm_recv_message, ); +DECLARE_EVENT_CLASS(dlm_plock_template, + + TP_PROTO(const struct dlm_plock_info *info), + + TP_ARGS(info), + + TP_STRUCT__entry( + __field(uint8_t, optype) + __field(uint8_t, ex) + __field(uint8_t, wait) + __field(uint8_t, flags) + __field(uint32_t, pid) + __field(int32_t, nodeid) + __field(int32_t, rv) + __field(uint32_t, fsid) + __field(uint64_t, number) + __field(uint64_t, start) + __field(uint64_t, end) + __field(uint64_t, owner) + ), + + TP_fast_assign( + __entry->optype = info->optype; + __entry->ex = info->ex; + __entry->wait = info->wait; + __entry->flags = info->flags; + __entry->pid = info->pid; + __entry->nodeid = info->nodeid; + __entry->rv = info->rv; + __entry->fsid = info->fsid; + __entry->number = info->number; + __entry->start = info->start; + __entry->end = info->end; + __entry->owner = info->owner; + ), + + TP_printk("fsid=%u number=%llx owner=%llx optype=%d ex=%d wait=%d flags=%x pid=%u nodeid=%d rv=%d start=%llx end=%llx", + __entry->fsid, __entry->number, __entry->owner, + __entry->optype, __entry->ex, __entry->wait, + __entry->flags, __entry->pid, __entry->nodeid, + __entry->rv, __entry->start, __entry->end) + +); + +DEFINE_EVENT(dlm_plock_template, dlm_plock_read, + TP_PROTO(const struct dlm_plock_info *info), TP_ARGS(info)); + +DEFINE_EVENT(dlm_plock_template, dlm_plock_write, + TP_PROTO(const struct dlm_plock_info *info), TP_ARGS(info)); + TRACE_EVENT(dlm_send, TP_PROTO(int nodeid, int ret), From c84c47333abbbfd83212fcfe2867be4a47e82056 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:43 -0400 Subject: [PATCH 09/17] fs: dlm: remove clear_members_cb This patch is just a small cleanup to directly call remove_remote_member() instead of going over clear_members_cb() which just calls remove_remote_member(). Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/member.c | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 77d202e4a02a..f303ea8bd256 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -393,14 +393,9 @@ static void remove_remote_member(int nodeid) dlm_midcomms_remove_member(nodeid); } -static void clear_members_cb(int nodeid) -{ - remove_remote_member(nodeid); -} - void dlm_clear_members(struct dlm_ls *ls) { - clear_memb_list(&ls->ls_nodes, clear_members_cb); + clear_memb_list(&ls->ls_nodes, remove_remote_member); ls->ls_num_nodes = 0; } From 643f5cfa610f475c7465e4158b2b1fdd170fac10 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:44 -0400 Subject: [PATCH 10/17] fs: dlm: cleanup lock order This patch cleanups the lock order to hold at first the close_lock and then held the nodes_srcu read lock. Probably it will never be a problem as nodes_srcu is only a read lock preventing the node pointer getting freed. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/midcomms.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index e1a0df67b566..8ebffbfdc00a 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -1489,12 +1489,12 @@ int dlm_midcomms_close(int nodeid) synchronize_srcu(&nodes_srcu); - idx = srcu_read_lock(&nodes_srcu); mutex_lock(&close_lock); + idx = srcu_read_lock(&nodes_srcu); node = nodeid2node(nodeid, 0); if (!node) { - mutex_unlock(&close_lock); srcu_read_unlock(&nodes_srcu, idx); + mutex_unlock(&close_lock); return dlm_lowcomms_close(nodeid); } From c4f4e135c27b503d325d414819831909023b113d Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:45 -0400 Subject: [PATCH 11/17] fs: dlm: get recovery sequence number as parameter This patch removes a read of the ls->ls_recover_seq uint64_t number in _create_rcom(). If the ls->ls_recover_seq is readed the ls_recover_lock need to held. However this number was always readed before when any rcom message is received and it's not necessary to read it again from a per lockspace variable to use it for the replying message. This patch will pass the sequence number as parameter so another read of ls->ls_recover_seq and holding the ls->ls_recover_lock is not required. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dir.c | 4 +-- fs/dlm/dir.h | 2 +- fs/dlm/lock.c | 5 ++-- fs/dlm/lock.h | 3 ++- fs/dlm/member.c | 6 ++--- fs/dlm/rcom.c | 68 ++++++++++++++++++++++++++--------------------- fs/dlm/rcom.h | 10 ++++--- fs/dlm/recover.c | 58 +++++++++++++++++++++------------------- fs/dlm/recover.h | 12 ++++----- fs/dlm/recoverd.c | 16 +++++------ 10 files changed, 99 insertions(+), 85 deletions(-) diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index fb1981654bb2..3bf5bf7a37b4 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -58,7 +58,7 @@ void dlm_recover_dir_nodeid(struct dlm_ls *ls) up_read(&ls->ls_root_sem); } -int dlm_recover_directory(struct dlm_ls *ls) +int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq) { struct dlm_member *memb; char *b, *last_name = NULL; @@ -90,7 +90,7 @@ int dlm_recover_directory(struct dlm_ls *ls) } error = dlm_rcom_names(ls, memb->nodeid, - last_name, last_len); + last_name, last_len, seq); if (error) goto out_free; diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h index 03844d086be2..0635582da003 100644 --- a/fs/dlm/dir.h +++ b/fs/dlm/dir.h @@ -15,7 +15,7 @@ int dlm_dir_nodeid(struct dlm_rsb *rsb); int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash); void dlm_recover_dir_nodeid(struct dlm_ls *ls); -int dlm_recover_directory(struct dlm_ls *ls); +int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq); void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, char *outbuf, int outlen, int nodeid); diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index f511a9d7d416..b489da38e685 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -5464,7 +5464,8 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) } /* needs at least dlm_rcom + rcom_lock */ -int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) +int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc, + uint64_t seq) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct dlm_rsb *r; @@ -5509,7 +5510,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, result); - dlm_send_rcom_lock(r, lkb); + dlm_send_rcom_lock(r, lkb, seq); goto out; case -EEXIST: case 0: diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index aa5ad44d902b..222e682523b9 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -37,7 +37,8 @@ void dlm_recover_grant(struct dlm_ls *ls); int dlm_recover_waiters_post(struct dlm_ls *ls); void dlm_recover_waiters_pre(struct dlm_ls *ls); int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc); -int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc); +int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc, + uint64_t seq); int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode, uint32_t flags, void *name, unsigned int namelen); diff --git a/fs/dlm/member.c b/fs/dlm/member.c index f303ea8bd256..19f3cd96f3c0 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -449,7 +449,7 @@ static void make_member_array(struct dlm_ls *ls) /* send a status request to all members just to establish comms connections */ -static int ping_members(struct dlm_ls *ls) +static int ping_members(struct dlm_ls *ls, uint64_t seq) { struct dlm_member *memb; int error = 0; @@ -459,7 +459,7 @@ static int ping_members(struct dlm_ls *ls) error = -EINTR; break; } - error = dlm_rcom_status(ls, memb->nodeid, 0); + error = dlm_rcom_status(ls, memb->nodeid, 0, seq); if (error) break; } @@ -607,7 +607,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) make_member_array(ls); *neg_out = neg; - error = ping_members(ls); + error = ping_members(ls, rv->seq); log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); return error; } diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index f4afdf892f78..efe45e68287f 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -28,7 +28,8 @@ static int rcom_response(struct dlm_ls *ls) } static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, - struct dlm_rcom **rc_ret, char *mb, int mb_len) + struct dlm_rcom **rc_ret, char *mb, int mb_len, + uint64_t seq) { struct dlm_rcom *rc; @@ -41,16 +42,14 @@ static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, rc->rc_header.h_cmd = DLM_RCOM; rc->rc_type = cpu_to_le32(type); - - spin_lock(&ls->ls_recover_lock); - rc->rc_seq = cpu_to_le64(ls->ls_recover_seq); - spin_unlock(&ls->ls_recover_lock); + rc->rc_seq = cpu_to_le64(seq); *rc_ret = rc; } static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, - struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret) + struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret, + uint64_t seq) { int mb_len = sizeof(struct dlm_rcom) + len; struct dlm_mhandle *mh; @@ -63,14 +62,14 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, return -ENOBUFS; } - _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); + _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq); *mh_ret = mh; return 0; } static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, int len, struct dlm_rcom **rc_ret, - struct dlm_msg **msg_ret) + struct dlm_msg **msg_ret, uint64_t seq) { int mb_len = sizeof(struct dlm_rcom) + len; struct dlm_msg *msg; @@ -84,7 +83,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, return -ENOBUFS; } - _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); + _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq); *msg_ret = msg; return 0; } @@ -170,7 +169,8 @@ static void disallow_sync_reply(struct dlm_ls *ls) * node's rcom_config. */ -int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) +int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags, + uint64_t seq) { struct dlm_rcom *rc; struct dlm_msg *msg; @@ -186,7 +186,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) retry: error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS, - sizeof(struct rcom_status), &rc, &msg); + sizeof(struct rcom_status), &rc, &msg, + seq); if (error) goto out; @@ -220,7 +221,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) return error; } -static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in, + uint64_t seq) { struct dlm_rcom *rc; struct rcom_status *rs; @@ -251,7 +253,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) do_create: error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY, - len, &rc, &msg); + len, &rc, &msg, seq); if (error) return; @@ -302,7 +304,8 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) spin_unlock(&ls->ls_rcom_spin); } -int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) +int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, + int last_len, uint64_t seq) { struct dlm_rcom *rc; struct dlm_msg *msg; @@ -312,7 +315,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) retry: error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len, - &rc, &msg); + &rc, &msg, seq); if (error) goto out; memcpy(rc->rc_buf, last_name, last_len); @@ -330,7 +333,8 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) return error; } -static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in, + uint64_t seq) { struct dlm_rcom *rc; int error, inlen, outlen, nodeid; @@ -342,7 +346,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom); error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, - &rc, &msg); + &rc, &msg, seq); if (error) return; rc->rc_id = rc_in->rc_id; @@ -353,7 +357,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) send_rcom_stateless(msg, rc); } -int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) +int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq) { struct dlm_rcom *rc; struct dlm_mhandle *mh; @@ -361,7 +365,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) int error; error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length, - &rc, &mh); + &rc, &mh, seq); if (error) goto out; memcpy(rc->rc_buf, r->res_name, r->res_length); @@ -372,7 +376,8 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) return error; } -static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in, + uint64_t seq) { struct dlm_rcom *rc; struct dlm_mhandle *mh; @@ -387,7 +392,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) return; } - error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh); + error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh, + seq); if (error) return; @@ -437,7 +443,7 @@ static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); } -int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) +int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq) { struct dlm_ls *ls = r->res_ls; struct dlm_rcom *rc; @@ -448,7 +454,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) if (lkb->lkb_lvbptr) len += ls->ls_lvblen; - error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh); + error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh, + seq); if (error) goto out; @@ -462,7 +469,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) } /* needs at least dlm_rcom + rcom_lock */ -static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in, + uint64_t seq) { struct dlm_rcom *rc; struct dlm_mhandle *mh; @@ -471,7 +479,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) dlm_recover_master_copy(ls, rc_in); error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY, - sizeof(struct rcom_lock), &rc, &mh); + sizeof(struct rcom_lock), &rc, &mh, seq); if (error) return; @@ -620,21 +628,21 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) switch (rc->rc_type) { case cpu_to_le32(DLM_RCOM_STATUS): - receive_rcom_status(ls, rc); + receive_rcom_status(ls, rc, seq); break; case cpu_to_le32(DLM_RCOM_NAMES): - receive_rcom_names(ls, rc); + receive_rcom_names(ls, rc, seq); break; case cpu_to_le32(DLM_RCOM_LOOKUP): - receive_rcom_lookup(ls, rc); + receive_rcom_lookup(ls, rc, seq); break; case cpu_to_le32(DLM_RCOM_LOCK): if (le16_to_cpu(rc->rc_header.h_length) < lock_size) goto Eshort; - receive_rcom_lock(ls, rc); + receive_rcom_lock(ls, rc, seq); break; case cpu_to_le32(DLM_RCOM_STATUS_REPLY): @@ -652,7 +660,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) case cpu_to_le32(DLM_RCOM_LOCK_REPLY): if (le16_to_cpu(rc->rc_header.h_length) < lock_size) goto Eshort; - dlm_recover_process_copy(ls, rc); + dlm_recover_process_copy(ls, rc, seq); break; default: diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h index 454d3c4814ab..9dd06d43ddb4 100644 --- a/fs/dlm/rcom.h +++ b/fs/dlm/rcom.h @@ -12,10 +12,12 @@ #ifndef __RCOM_DOT_H__ #define __RCOM_DOT_H__ -int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags); -int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); -int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); -int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); +int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags, + uint64_t seq); +int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, + int last_len, uint64_t seq); +int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq); +int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq); void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 29d71a5018d4..ddb6b3312cc1 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -93,7 +93,7 @@ void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status) } static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, - int save_slots) + int save_slots, uint64_t seq) { struct dlm_rcom *rc = ls->ls_recover_buf; struct dlm_member *memb; @@ -107,7 +107,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, goto out; } - error = dlm_rcom_status(ls, memb->nodeid, 0); + error = dlm_rcom_status(ls, memb->nodeid, 0, seq); if (error) goto out; @@ -126,7 +126,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, } static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, - uint32_t status_flags) + uint32_t status_flags, uint64_t seq) { struct dlm_rcom *rc = ls->ls_recover_buf; int error = 0, delay = 0, nodeid = ls->ls_low_nodeid; @@ -137,7 +137,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, goto out; } - error = dlm_rcom_status(ls, nodeid, status_flags); + error = dlm_rcom_status(ls, nodeid, status_flags, seq); if (error) break; @@ -151,22 +151,22 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, return error; } -static int wait_status(struct dlm_ls *ls, uint32_t status) +static int wait_status(struct dlm_ls *ls, uint32_t status, uint64_t seq) { uint32_t status_all = status << 1; int error; if (ls->ls_low_nodeid == dlm_our_nodeid()) { - error = wait_status_all(ls, status, 0); + error = wait_status_all(ls, status, 0, seq); if (!error) dlm_set_recover_status(ls, status_all); } else - error = wait_status_low(ls, status_all, 0); + error = wait_status_low(ls, status_all, 0, seq); return error; } -int dlm_recover_members_wait(struct dlm_ls *ls) +int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq) { struct dlm_member *memb; struct dlm_slot *slots; @@ -180,7 +180,7 @@ int dlm_recover_members_wait(struct dlm_ls *ls) } if (ls->ls_low_nodeid == dlm_our_nodeid()) { - error = wait_status_all(ls, DLM_RS_NODES, 1); + error = wait_status_all(ls, DLM_RS_NODES, 1, seq); if (error) goto out; @@ -199,7 +199,8 @@ int dlm_recover_members_wait(struct dlm_ls *ls) dlm_set_recover_status(ls, DLM_RS_NODES_ALL); } } else { - error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS); + error = wait_status_low(ls, DLM_RS_NODES_ALL, + DLM_RSF_NEED_SLOTS, seq); if (error) goto out; @@ -209,19 +210,19 @@ int dlm_recover_members_wait(struct dlm_ls *ls) return error; } -int dlm_recover_directory_wait(struct dlm_ls *ls) +int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq) { - return wait_status(ls, DLM_RS_DIR); + return wait_status(ls, DLM_RS_DIR, seq); } -int dlm_recover_locks_wait(struct dlm_ls *ls) +int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq) { - return wait_status(ls, DLM_RS_LOCKS); + return wait_status(ls, DLM_RS_LOCKS, seq); } -int dlm_recover_done_wait(struct dlm_ls *ls) +int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq) { - return wait_status(ls, DLM_RS_DONE); + return wait_status(ls, DLM_RS_DONE, seq); } /* @@ -441,7 +442,7 @@ static void set_new_master(struct dlm_rsb *r) * equals our_nodeid below). */ -static int recover_master(struct dlm_rsb *r, unsigned int *count) +static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq) { struct dlm_ls *ls = r->res_ls; int our_nodeid, dir_nodeid; @@ -472,7 +473,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count) error = 0; } else { recover_idr_add(r); - error = dlm_send_rcom_lookup(r, dir_nodeid); + error = dlm_send_rcom_lookup(r, dir_nodeid, seq); } (*count)++; @@ -520,7 +521,7 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count) * the correct dir node. */ -int dlm_recover_masters(struct dlm_ls *ls) +int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq) { struct dlm_rsb *r; unsigned int total = 0; @@ -542,7 +543,7 @@ int dlm_recover_masters(struct dlm_ls *ls) if (nodir) error = recover_master_static(r, &count); else - error = recover_master(r, &count); + error = recover_master(r, &count, seq); unlock_rsb(r); cond_resched(); total++; @@ -614,13 +615,14 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) * an equal number of replies then recovery for the rsb is done */ -static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head) +static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head, + uint64_t seq) { struct dlm_lkb *lkb; int error = 0; list_for_each_entry(lkb, head, lkb_statequeue) { - error = dlm_send_rcom_lock(r, lkb); + error = dlm_send_rcom_lock(r, lkb, seq); if (error) break; r->res_recover_locks_count++; @@ -629,7 +631,7 @@ static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head) return error; } -static int recover_locks(struct dlm_rsb *r) +static int recover_locks(struct dlm_rsb *r, uint64_t seq) { int error = 0; @@ -637,13 +639,13 @@ static int recover_locks(struct dlm_rsb *r) DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r);); - error = recover_locks_queue(r, &r->res_grantqueue); + error = recover_locks_queue(r, &r->res_grantqueue, seq); if (error) goto out; - error = recover_locks_queue(r, &r->res_convertqueue); + error = recover_locks_queue(r, &r->res_convertqueue, seq); if (error) goto out; - error = recover_locks_queue(r, &r->res_waitqueue); + error = recover_locks_queue(r, &r->res_waitqueue, seq); if (error) goto out; @@ -656,7 +658,7 @@ static int recover_locks(struct dlm_rsb *r) return error; } -int dlm_recover_locks(struct dlm_ls *ls) +int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq) { struct dlm_rsb *r; int error, count = 0; @@ -677,7 +679,7 @@ int dlm_recover_locks(struct dlm_ls *ls) goto out; } - error = recover_locks(r); + error = recover_locks(r, seq); if (error) { up_read(&ls->ls_root_sem); goto out; diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h index 235e0d25cd48..c5ce2ef13934 100644 --- a/fs/dlm/recover.h +++ b/fs/dlm/recover.h @@ -15,13 +15,13 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)); uint32_t dlm_recover_status(struct dlm_ls *ls); void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status); -int dlm_recover_members_wait(struct dlm_ls *ls); -int dlm_recover_directory_wait(struct dlm_ls *ls); -int dlm_recover_locks_wait(struct dlm_ls *ls); -int dlm_recover_done_wait(struct dlm_ls *ls); -int dlm_recover_masters(struct dlm_ls *ls); +int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq); int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc); -int dlm_recover_locks(struct dlm_ls *ls); +int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq); void dlm_recovered_lock(struct dlm_rsb *r); int dlm_create_root_list(struct dlm_ls *ls); void dlm_release_root_list(struct dlm_ls *ls); diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 19da816cfb09..4d17491dea2f 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -90,7 +90,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_set_recover_status(ls, DLM_RS_NODES); - error = dlm_recover_members_wait(ls); + error = dlm_recover_members_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_members_wait error %d", error); goto fail; @@ -103,7 +103,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * nodes their master rsb names that hash to us. */ - error = dlm_recover_directory(ls); + error = dlm_recover_directory(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory error %d", error); goto fail; @@ -111,7 +111,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_set_recover_status(ls, DLM_RS_DIR); - error = dlm_recover_directory_wait(ls); + error = dlm_recover_directory_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory_wait error %d", error); goto fail; @@ -145,7 +145,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * departed nodes. */ - error = dlm_recover_masters(ls); + error = dlm_recover_masters(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_masters error %d", error); goto fail; @@ -155,7 +155,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * Send our locks on remastered rsb's to the new masters. */ - error = dlm_recover_locks(ls); + error = dlm_recover_locks(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks error %d", error); goto fail; @@ -163,7 +163,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_set_recover_status(ls, DLM_RS_LOCKS); - error = dlm_recover_locks_wait(ls); + error = dlm_recover_locks_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks_wait error %d", error); goto fail; @@ -187,7 +187,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) */ dlm_set_recover_status(ls, DLM_RS_LOCKS); - error = dlm_recover_locks_wait(ls); + error = dlm_recover_locks_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks_wait error %d", error); goto fail; @@ -206,7 +206,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_set_recover_status(ls, DLM_RS_DONE); - error = dlm_recover_done_wait(ls); + error = dlm_recover_done_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_done_wait error %d", error); goto fail; From 561c67d8a10142250baa2a2a4e8b5d95e9c97df9 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:46 -0400 Subject: [PATCH 12/17] fs: dlm: drop rxbuf manipulation in dlm_copy_master_names This patch removes the manipulation of the receive buffer in case of an error and be sure the buffer is null terminated before an error messagea is printed out. Instead of manipulate the receive buffer we tell inside the format string the maximum length the string buffer is being read. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dir.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index 3bf5bf7a37b4..768cf8d43b2b 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -245,9 +245,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, if (inlen > 1) { r = find_rsb_root(ls, inbuf, inlen); if (!r) { - inbuf[inlen - 1] = '\0'; - log_error(ls, "copy_master_names from %d start %d %s", - nodeid, inlen, inbuf); + log_error(ls, "copy_master_names from %d start %d %.*s", + nodeid, inlen, inlen, inbuf); goto out; } list = r->res_root_list.next; From b9d2f6ada0083bad46f37a1238fea718b575e0fa Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:47 -0400 Subject: [PATCH 13/17] fs: dlm: drop rxbuf manipulation in dlm_recover_master_copy Currently dlm_recover_master_copy() manipulates the receive buffer of an rcom lock message and modifies it on the fly so a later memcpy() to a new rcom message with the same message has those new values. This patch avoids manipulating the received rcom message by store the values for the new rcom message in paremter assigned with call by reference. Later when dlm_send_rcom_lock() constructs a new message and memcpy() the receive buffer those values will be set on the new constructed message. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lock.c | 10 +++++++--- fs/dlm/lock.h | 3 ++- fs/dlm/rcom.c | 12 ++++++++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index b489da38e685..1cf666c7401d 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -5384,7 +5384,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, back the rcom_lock struct we got but with the remid field filled in. */ /* needs at least dlm_rcom + rcom_lock */ -int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) +int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc, + __le32 *rl_remid, __le32 *rl_result) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct dlm_rsb *r; @@ -5393,6 +5394,9 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); int error; + /* init rl_remid with rcom lock rl_remid */ + *rl_remid = rl->rl_remid; + if (rl->rl_parent_lkid) { error = -EOPNOTSUPP; goto out; @@ -5448,7 +5452,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) out_remid: /* this is the new value returned to the lock holder for saving in its process-copy lkb */ - rl->rl_remid = cpu_to_le32(lkb->lkb_id); + *rl_remid = cpu_to_le32(lkb->lkb_id); lkb->lkb_recover_seq = ls->ls_recover_seq; @@ -5459,7 +5463,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) if (error && error != -EEXIST) log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", from_nodeid, remid, error); - rl->rl_result = cpu_to_le32(error); + *rl_result = cpu_to_le32(error); return error; } diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index 222e682523b9..cd67ccfbbf9b 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -36,7 +36,8 @@ void dlm_purge_mstcpy_locks(struct dlm_rsb *r); void dlm_recover_grant(struct dlm_ls *ls); int dlm_recover_waiters_post(struct dlm_ls *ls); void dlm_recover_waiters_pre(struct dlm_ls *ls); -int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc); +int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc, + __le32 *rl_remid, __le32 *rl_result); int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc, uint64_t seq); diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index efe45e68287f..0946431e370a 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -472,21 +472,25 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq) static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in, uint64_t seq) { + __le32 rl_remid, rl_result; + struct rcom_lock *rl; struct dlm_rcom *rc; struct dlm_mhandle *mh; int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); - dlm_recover_master_copy(ls, rc_in); + dlm_recover_master_copy(ls, rc_in, &rl_remid, &rl_result); error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY, sizeof(struct rcom_lock), &rc, &mh, seq); if (error) return; - /* We send back the same rcom_lock struct we received, but - dlm_recover_master_copy() has filled in rl_remid and rl_result */ - memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); + rl = (struct rcom_lock *)rc->rc_buf; + /* set rl_remid and rl_result from dlm_recover_master_copy() */ + rl->rl_remid = rl_remid; + rl->rl_result = rl_result; + rc->rc_id = rc_in->rc_id; rc->rc_seq_reply = rc_in->rc_seq; From 1151935182b40bbe398905850f6f7f4fbb262e06 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:48 -0400 Subject: [PATCH 14/17] fs: dlm: constify receive buffer The dlm receive buffer should be never manipulated as DLM is the last instance of parsing layer. This patch constify the whole receive buffer so we are sure it never gets manipulated when it's being parsed. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/dir.c | 5 +- fs/dlm/dir.h | 4 +- fs/dlm/lock.c | 109 ++++++++++++++++++++++-------------------- fs/dlm/lock.h | 14 +++--- fs/dlm/member.c | 2 +- fs/dlm/member.h | 2 +- fs/dlm/midcomms.c | 16 ++++--- fs/dlm/rcom.c | 20 ++++---- fs/dlm/rcom.h | 5 +- fs/dlm/recover.c | 2 +- fs/dlm/recover.h | 2 +- fs/dlm/requestqueue.c | 3 +- fs/dlm/requestqueue.h | 3 +- 13 files changed, 101 insertions(+), 86 deletions(-) diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index 768cf8d43b2b..f6acba4310a7 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -196,7 +196,8 @@ int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq) return error; } -static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) +static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name, + int len) { struct dlm_rsb *r; uint32_t hash, bucket; @@ -232,7 +233,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) for rsb's we're master of and whose directory node matches the requesting node. inbuf is the rsb name last sent, inlen is the name's length */ -void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, +void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, char *outbuf, int outlen, int nodeid) { struct list_head *list; diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h index 0635582da003..39ecb69d7ef3 100644 --- a/fs/dlm/dir.h +++ b/fs/dlm/dir.h @@ -16,8 +16,8 @@ int dlm_dir_nodeid(struct dlm_rsb *rsb); int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash); void dlm_recover_dir_nodeid(struct dlm_ls *ls); int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq); -void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, - char *outbuf, int outlen, int nodeid); +void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, + char *outbuf, int outlen, int nodeid); #endif /* __DIR_DOT_H__ */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index 1cf666c7401d..652c51fbbf76 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -86,8 +86,8 @@ static int send_remove(struct dlm_rsb *r); static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms, bool local); -static int receive_extralen(struct dlm_message *ms); + const struct dlm_message *ms, bool local); +static int receive_extralen(const struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); static void toss_rsb(struct kref *kref); @@ -984,8 +984,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, - unsigned int flags, int *r_nodeid, int *result) +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; uint32_t hash, b; @@ -1106,7 +1106,7 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) } } -void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) +void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) { struct dlm_rsb *r = NULL; uint32_t hash, b; @@ -1459,7 +1459,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) set RESEND and dlm_recover_waiters_post() */ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, - struct dlm_message *ms) + const struct dlm_message *ms) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int overlap_done = 0; @@ -1557,8 +1557,8 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) /* Handles situations where we might be processing a "fake" or "local" reply in which we can't try to take waiters_mutex again. */ -static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms, - bool local) +static int remove_from_waiters_ms(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; @@ -1800,7 +1800,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) /* lkb is process copy (pc) */ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { int b; @@ -1907,7 +1907,7 @@ static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) } static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { set_lvb_lock_pc(r, lkb, ms); _grant_lock(r, lkb); @@ -1945,7 +1945,7 @@ static void munge_demoted(struct dlm_lkb *lkb) lkb->lkb_grmode = DLM_LOCK_NL; } -static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) +static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms) { if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) && ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) { @@ -3641,8 +3641,9 @@ static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); } -static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, - int ret_nodeid, int rv) +static int send_lookup_reply(struct dlm_ls *ls, + const struct dlm_message *ms_in, int ret_nodeid, + int rv) { struct dlm_rsb *r = &ls->ls_local_rsb; struct dlm_message *ms; @@ -3667,14 +3668,15 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, of message, unlike the send side where we can safely send everything about the lkb for any type of message */ -static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) +static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms) { lkb->lkb_exflags = le32_to_cpu(ms->m_exflags); dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags)); dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); } -static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms, +static void receive_flags_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { if (local) @@ -3684,14 +3686,14 @@ static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms, dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); } -static int receive_extralen(struct dlm_message *ms) +static int receive_extralen(const struct dlm_message *ms) { return (le16_to_cpu(ms->m_header.h_length) - sizeof(struct dlm_message)); } static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { int len; @@ -3719,7 +3721,7 @@ static void fake_astfn(void *astparam) } static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); lkb->lkb_ownpid = le32_to_cpu(ms->m_pid); @@ -3741,7 +3743,7 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, } static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { if (lkb->lkb_status != DLM_LKSTS_GRANTED) return -EBUSY; @@ -3756,7 +3758,7 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, } static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { if (receive_lvb(ls, lkb, ms)) return -ENOMEM; @@ -3766,7 +3768,7 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, /* We fill in the local-lkb fields with the info that send_xxxx_reply() uses to send a reply and that the remote end uses to process the reply. */ -static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms) +static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb = &ls->ls_local_lkb; lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); @@ -3776,7 +3778,7 @@ static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms) /* This is called after the rsb is locked so that we can safely inspect fields in the lkb. */ -static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) +static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms) { int from = le32_to_cpu(ms->m_header.h_nodeid); int error = 0; @@ -3828,7 +3830,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) return error; } -static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -3907,7 +3909,7 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) return error; } -static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -3963,7 +3965,7 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) return error; } -static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4015,7 +4017,7 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) return error; } -static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4051,7 +4053,7 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) return error; } -static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4082,7 +4084,7 @@ static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4110,7 +4112,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms) { int len, error, ret_nodeid, from_nodeid, our_nodeid; @@ -4130,7 +4132,7 @@ static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) send_lookup_reply(ls, ms, ret_nodeid, error); } -static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) { char name[DLM_RESNAME_MAXLEN+1]; struct dlm_rsb *r; @@ -4218,12 +4220,13 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) } } -static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) { do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid)); } -static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_request_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4345,7 +4348,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) } static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms, bool local) + const struct dlm_message *ms, bool local) { /* this is the value returned from do_convert() on the master */ switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { @@ -4388,8 +4391,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, } } -static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms, - bool local) +static void _receive_convert_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_rsb *r = lkb->lkb_resource; int error; @@ -4412,7 +4415,8 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms, put_rsb(r); } -static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_convert_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; int error; @@ -4426,8 +4430,8 @@ static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms, - bool local) +static void _receive_unlock_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_rsb *r = lkb->lkb_resource; int error; @@ -4463,7 +4467,8 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms, put_rsb(r); } -static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_unlock_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; int error; @@ -4477,8 +4482,8 @@ static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms, - bool local) +static void _receive_cancel_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_rsb *r = lkb->lkb_resource; int error; @@ -4515,7 +4520,8 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms, put_rsb(r); } -static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_cancel_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; int error; @@ -4529,7 +4535,8 @@ static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_lookup_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4608,7 +4615,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) dlm_put_lkb(lkb); } -static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, +static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq) { int error = 0, noent = 0; @@ -4744,7 +4751,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, requestqueue, to processing all the saved messages, to processing new messages as they arrive. */ -static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, +static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms, int nodeid) { if (dlm_locking_stopped(ls)) { @@ -4767,7 +4774,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, /* This is called by dlm_recoverd to process messages that were saved on the requestqueue. */ -void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, +void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq) { _receive_message(ls, ms, saved_seq); @@ -4778,9 +4785,9 @@ void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, standard locking activity) or an RCOM (recovery message sent as part of lockspace recovery). */ -void dlm_receive_buffer(union dlm_packet *p, int nodeid) +void dlm_receive_buffer(const union dlm_packet *p, int nodeid) { - struct dlm_header *hd = &p->header; + const struct dlm_header *hd = &p->header; struct dlm_ls *ls; int type = 0; @@ -5334,7 +5341,7 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, /* needs at least dlm_rcom + rcom_lock */ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_rsb *r, struct dlm_rcom *rc) + struct dlm_rsb *r, const struct dlm_rcom *rc) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; @@ -5384,7 +5391,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, back the rcom_lock struct we got but with the remid field filled in. */ /* needs at least dlm_rcom + rcom_lock */ -int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc, +int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, __le32 *rl_remid, __le32 *rl_result) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; @@ -5468,7 +5475,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc, } /* needs at least dlm_rcom + rcom_lock */ -int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc, +int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, uint64_t seq) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index cd67ccfbbf9b..b54e2cbbe6e2 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -12,11 +12,11 @@ #define __LOCK_DOT_H__ void dlm_dump_rsb(struct dlm_rsb *r); -void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len); +void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len); void dlm_print_lkb(struct dlm_lkb *lkb); -void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, +void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq); -void dlm_receive_buffer(union dlm_packet *p, int nodeid); +void dlm_receive_buffer(const union dlm_packet *p, int nodeid); int dlm_modes_compat(int mode1, int mode2); void dlm_put_rsb(struct dlm_rsb *r); void dlm_hold_rsb(struct dlm_rsb *r); @@ -25,8 +25,8 @@ void dlm_scan_rsbs(struct dlm_ls *ls); int dlm_lock_recovery_try(struct dlm_ls *ls); void dlm_unlock_recovery(struct dlm_ls *ls); -int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len, - unsigned int flags, int *r_nodeid, int *result); +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result); int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len, struct dlm_rsb **r_ret); @@ -36,9 +36,9 @@ void dlm_purge_mstcpy_locks(struct dlm_rsb *r); void dlm_recover_grant(struct dlm_ls *ls); int dlm_recover_waiters_post(struct dlm_ls *ls); void dlm_recover_waiters_pre(struct dlm_ls *ls); -int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc, +int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, __le32 *rl_remid, __le32 *rl_result); -int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc, +int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, uint64_t seq); int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode, diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 19f3cd96f3c0..be7909ead71b 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -18,7 +18,7 @@ #include "midcomms.h" #include "lowcomms.h" -int dlm_slots_version(struct dlm_header *h) +int dlm_slots_version(const struct dlm_header *h) { if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS) return 0; diff --git a/fs/dlm/member.h b/fs/dlm/member.h index 433b2fac9f4a..f61cfde46314 100644 --- a/fs/dlm/member.h +++ b/fs/dlm/member.h @@ -18,7 +18,7 @@ void dlm_clear_members_gone(struct dlm_ls *ls); int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out); int dlm_is_removed(struct dlm_ls *ls, int nodeid); int dlm_is_member(struct dlm_ls *ls, int nodeid); -int dlm_slots_version(struct dlm_header *h); +int dlm_slots_version(const struct dlm_header *h); void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc, struct dlm_member *memb); void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc); diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 8ebffbfdc00a..c125496e03bf 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -499,7 +499,8 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node) spin_unlock(&node->state_lock); } -static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p) +static void dlm_receive_buffer_3_2_trace(uint32_t seq, + const union dlm_packet *p) { switch (p->header.h_cmd) { case DLM_MSG: @@ -513,7 +514,7 @@ static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p) } } -static void dlm_midcomms_receive_buffer(union dlm_packet *p, +static void dlm_midcomms_receive_buffer(const union dlm_packet *p, struct midcomms_node *node, uint32_t seq) { @@ -708,7 +709,8 @@ static int dlm_midcomms_version_check_3_2(struct midcomms_node *node) return 0; } -static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid) +static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen, + int nodeid) { int len = msglen; @@ -757,7 +759,7 @@ static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodei return 0; } -static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid) +static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodeid) { uint16_t msglen = le16_to_cpu(p->header.h_length); struct midcomms_node *node; @@ -878,7 +880,7 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node) return 0; } -static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid) +static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid) { uint16_t msglen = le16_to_cpu(p->header.h_length); struct midcomms_node *node; @@ -977,10 +979,10 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) switch (hd->h_version) { case cpu_to_le32(DLM_VERSION_3_1): - dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid); + dlm_midcomms_receive_buffer_3_1((const union dlm_packet *)ptr, nodeid); break; case cpu_to_le32(DLM_VERSION_3_2): - dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid); + dlm_midcomms_receive_buffer_3_2((const union dlm_packet *)ptr, nodeid); break; default: log_print("received invalid version header: %u from node %d, will skip this message", diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 0946431e370a..6ab029149a1d 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -221,7 +221,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags, return error; } -static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in, +static void receive_rcom_status(struct dlm_ls *ls, + const struct dlm_rcom *rc_in, uint64_t seq) { struct dlm_rcom *rc; @@ -283,7 +284,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in, send_rcom_stateless(msg, rc); } -static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in) { spin_lock(&ls->ls_rcom_spin); if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) || @@ -333,7 +334,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, return error; } -static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in, +static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in, uint64_t seq) { struct dlm_rcom *rc; @@ -376,8 +377,8 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq) return error; } -static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in, - uint64_t seq) +static void receive_rcom_lookup(struct dlm_ls *ls, + const struct dlm_rcom *rc_in, uint64_t seq) { struct dlm_rcom *rc; struct dlm_mhandle *mh; @@ -408,7 +409,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in, send_rcom(mh, rc); } -static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_lookup_reply(struct dlm_ls *ls, + const struct dlm_rcom *rc_in) { dlm_recover_master_reply(ls, rc_in); } @@ -469,7 +471,7 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq) } /* needs at least dlm_rcom + rcom_lock */ -static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in, +static void receive_rcom_lock(struct dlm_ls *ls, const struct dlm_rcom *rc_in, uint64_t seq) { __le32 rl_remid, rl_result; @@ -500,7 +502,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in, /* If the lockspace doesn't exist then still send a status message back; it's possible that it just doesn't have its global_id yet. */ -int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) +int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in) { struct dlm_rcom *rc; struct rcom_config *rf; @@ -578,7 +580,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) /* Called by dlm_recv; corresponds to dlm_receive_message() but special recovery-only comms are sent through here. */ -void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) +void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid) { int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); int stop, reply = 0, names = 0, lookup = 0, lock = 0; diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h index 9dd06d43ddb4..765926ae0020 100644 --- a/fs/dlm/rcom.h +++ b/fs/dlm/rcom.h @@ -18,8 +18,9 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len, uint64_t seq); int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq); int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq); -void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); -int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); +void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, + int nodeid); +int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in); #endif diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index ddb6b3312cc1..53917c0aa3c0 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -564,7 +564,7 @@ int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq) return error; } -int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) +int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc) { struct dlm_rsb *r; int ret_nodeid, new_master; diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h index c5ce2ef13934..dbc51013ecad 100644 --- a/fs/dlm/recover.h +++ b/fs/dlm/recover.h @@ -20,7 +20,7 @@ int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq); int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq); int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq); int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq); -int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc); +int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc); int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq); void dlm_recovered_lock(struct dlm_rsb *r); int dlm_create_root_list(struct dlm_ls *ls); diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c index 8be2893ad15b..892d6ca21e74 100644 --- a/fs/dlm/requestqueue.c +++ b/fs/dlm/requestqueue.c @@ -30,7 +30,8 @@ struct rq_entry { * lockspace is enabled on some while still suspended on others. */ -void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms) +void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, + const struct dlm_message *ms) { struct rq_entry *e; int length = le16_to_cpu(ms->m_header.h_length) - diff --git a/fs/dlm/requestqueue.h b/fs/dlm/requestqueue.h index 4e403469a845..42bfe23ceabe 100644 --- a/fs/dlm/requestqueue.h +++ b/fs/dlm/requestqueue.h @@ -11,7 +11,8 @@ #ifndef __REQUESTQUEUE_DOT_H__ #define __REQUESTQUEUE_DOT_H__ -void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms); +void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, + const struct dlm_message *ms); int dlm_process_requestqueue(struct dlm_ls *ls); void dlm_wait_requestqueue(struct dlm_ls *ls); void dlm_purge_requestqueue(struct dlm_ls *ls); From 63e711b081609748d631fc3a08b14ba01c8e4f16 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:49 -0400 Subject: [PATCH 15/17] fs: dlm: create midcomms nodes when configure This patch puts the life of a midcomms node the same as a lowcomms connection. The lowcomms connection lifetime was changed by commit 6f0b0b5d7ae7 ("fs: dlm: remove dlm_node_addrs lookup list"). In the future the midcomms node instances can be merged with lowcomms connection structure as the lifetime is the same and states can be controlled over values or flags. Before midcomms nodes were generated during version detection. This is not necessary anymore when the nodes are created when the cluster manager configures DLM via configfs. When a midcomms node is created over configfs it well set DLM_VERSION_NOT_SET as version. This indicates that the version of the midcomms node is still unknown and need to be probed via certain rcom messages. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 2 +- fs/dlm/midcomms.c | 286 +++++++++++++++++----------------------------- fs/dlm/midcomms.h | 1 + 3 files changed, 110 insertions(+), 179 deletions(-) diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 2beceff024e3..e55e0a2cd2e8 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -664,7 +664,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf, memcpy(addr, buf, len); - rv = dlm_lowcomms_addr(cm->nodeid, addr, len); + rv = dlm_midcomms_addr(cm->nodeid, addr, len); if (rv) { kfree(addr); return rv; diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index c125496e03bf..f641b36a36db 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -330,18 +330,23 @@ static void midcomms_node_reset(struct midcomms_node *node) wake_up(&node->shutdown_wait); } -static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc) +static struct midcomms_node *nodeid2node(int nodeid) { - struct midcomms_node *node, *tmp; - int r = nodeid_hash(nodeid); + return __find_node(nodeid, nodeid_hash(nodeid)); +} - node = __find_node(nodeid, r); - if (node || !alloc) - return node; +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) +{ + int ret, r = nodeid_hash(nodeid); + struct midcomms_node *node; - node = kmalloc(sizeof(*node), alloc); + ret = dlm_lowcomms_addr(nodeid, addr, len); + if (ret) + return ret; + + node = kmalloc(sizeof(*node), GFP_NOFS); if (!node) - return NULL; + return -ENOMEM; node->nodeid = nodeid; spin_lock_init(&node->state_lock); @@ -353,21 +358,11 @@ static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc) midcomms_node_reset(node); spin_lock(&nodes_lock); - /* check again if there was somebody else - * earlier here to add the node - */ - tmp = __find_node(nodeid, r); - if (tmp) { - spin_unlock(&nodes_lock); - kfree(node); - return tmp; - } - hlist_add_head_rcu(&node->hlist, &node_hash[r]); spin_unlock(&nodes_lock); node->debugfs = dlm_create_debug_comms_file(nodeid, node); - return node; + return 0; } static int dlm_send_ack(int nodeid, uint32_t seq) @@ -603,112 +598,6 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p, } } -static struct midcomms_node * -dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p, - uint16_t msglen, int (*cb)(struct midcomms_node *node)) -{ - struct midcomms_node *node = NULL; - gfp_t allocation = 0; - int ret; - - switch (p->header.h_cmd) { - case DLM_RCOM: - if (msglen < sizeof(struct dlm_rcom)) { - log_print("rcom msg too small: %u, will skip this message from node %d", - msglen, nodeid); - return NULL; - } - - switch (p->rcom.rc_type) { - case cpu_to_le32(DLM_RCOM_NAMES): - fallthrough; - case cpu_to_le32(DLM_RCOM_NAMES_REPLY): - fallthrough; - case cpu_to_le32(DLM_RCOM_STATUS): - fallthrough; - case cpu_to_le32(DLM_RCOM_STATUS_REPLY): - node = nodeid2node(nodeid, 0); - if (node) { - spin_lock(&node->state_lock); - if (node->state != DLM_ESTABLISHED) - pr_debug("receive begin RCOM msg from node %d with state %s\n", - node->nodeid, dlm_state_str(node->state)); - - switch (node->state) { - case DLM_CLOSED: - node->state = DLM_ESTABLISHED; - pr_debug("switch node %d to state %s\n", - node->nodeid, dlm_state_str(node->state)); - break; - case DLM_ESTABLISHED: - break; - default: - spin_unlock(&node->state_lock); - return NULL; - } - spin_unlock(&node->state_lock); - } - - allocation = GFP_NOFS; - break; - default: - break; - } - - break; - default: - break; - } - - node = nodeid2node(nodeid, allocation); - if (!node) { - switch (p->header.h_cmd) { - case DLM_OPTS: - if (msglen < sizeof(struct dlm_opts)) { - log_print("opts msg too small: %u, will skip this message from node %d", - msglen, nodeid); - return NULL; - } - - log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence", - p->opts.o_nextcmd, nodeid); - break; - default: - log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence", - p->header.h_cmd, nodeid); - break; - } - - return NULL; - } - - ret = cb(node); - if (ret < 0) - return NULL; - - return node; -} - -static int dlm_midcomms_version_check_3_2(struct midcomms_node *node) -{ - switch (node->version) { - case DLM_VERSION_NOT_SET: - node->version = DLM_VERSION_3_2; - wake_up(&node->shutdown_wait); - log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2, - node->nodeid); - break; - case DLM_VERSION_3_2: - break; - default: - log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", - DLM_VERSION_3_2, node->nodeid, node->version); - return -1; - } - - return 0; -} - static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen, int nodeid) { @@ -767,11 +656,38 @@ static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodei int ret, idx; idx = srcu_read_lock(&nodes_srcu); - node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen, - dlm_midcomms_version_check_3_2); - if (!node) + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) goto out; + switch (node->version) { + case DLM_VERSION_NOT_SET: + node->version = DLM_VERSION_3_2; + wake_up(&node->shutdown_wait); + log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2, + node->nodeid); + + spin_lock(&node->state_lock); + switch (node->state) { + case DLM_CLOSED: + node->state = DLM_ESTABLISHED; + pr_debug("switch node %d to state %s\n", + node->nodeid, dlm_state_str(node->state)); + break; + default: + break; + } + spin_unlock(&node->state_lock); + + break; + case DLM_VERSION_3_2: + break; + default: + log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", + DLM_VERSION_3_2, node->nodeid, node->version); + goto out; + } + switch (p->header.h_cmd) { case DLM_RCOM: /* these rcom message we use to determine version. @@ -860,8 +776,19 @@ static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodei srcu_read_unlock(&nodes_srcu, idx); } -static int dlm_midcomms_version_check_3_1(struct midcomms_node *node) +static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid) { + uint16_t msglen = le16_to_cpu(p->header.h_length); + struct midcomms_node *node; + int idx; + + idx = srcu_read_lock(&nodes_srcu); + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) { + srcu_read_unlock(&nodes_srcu, idx); + return; + } + switch (node->version) { case DLM_VERSION_NOT_SET: node->version = DLM_VERSION_3_1; @@ -874,22 +801,6 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node) default: log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", DLM_VERSION_3_1, node->nodeid, node->version); - return -1; - } - - return 0; -} - -static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid) -{ - uint16_t msglen = le16_to_cpu(p->header.h_length); - struct midcomms_node *node; - int idx; - - idx = srcu_read_lock(&nodes_srcu); - node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen, - dlm_midcomms_version_check_3_1); - if (!node) { srcu_read_unlock(&nodes_srcu, idx); return; } @@ -1005,8 +916,8 @@ void dlm_midcomms_unack_msg_resend(int nodeid) int idx, ret; idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) { srcu_read_unlock(&nodes_srcu, idx); return; } @@ -1092,11 +1003,9 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, int idx; idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { - WARN_ON_ONCE(1); + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) goto err; - } /* this is a bug, however we going on and hope it will be resolved */ WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags)); @@ -1237,8 +1146,34 @@ void dlm_midcomms_init(void) dlm_lowcomms_init(); } +static void midcomms_node_release(struct rcu_head *rcu) +{ + struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu); + + WARN_ON_ONCE(atomic_read(&node->send_queue_cnt)); + dlm_send_queue_flush(node); + kfree(node); +} + void dlm_midcomms_exit(void) { + struct midcomms_node *node; + int i, idx; + + idx = srcu_read_lock(&nodes_srcu); + for (i = 0; i < CONN_HASH_SIZE; i++) { + hlist_for_each_entry_rcu(node, &node_hash[i], hlist) { + dlm_delete_debug_comms_file(node->debugfs); + + spin_lock(&nodes_lock); + hlist_del_rcu(&node->hlist); + spin_unlock(&nodes_lock); + + call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release); + } + } + srcu_read_unlock(&nodes_srcu, idx); + dlm_lowcomms_exit(); } @@ -1279,8 +1214,8 @@ void dlm_midcomms_add_member(int nodeid) int idx; idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, GFP_NOFS); - if (!node) { + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) { srcu_read_unlock(&nodes_srcu, idx); return; } @@ -1324,8 +1259,8 @@ void dlm_midcomms_remove_member(int nodeid) int idx; idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) { srcu_read_unlock(&nodes_srcu, idx); return; } @@ -1369,15 +1304,6 @@ void dlm_midcomms_remove_member(int nodeid) srcu_read_unlock(&nodes_srcu, idx); } -static void midcomms_node_release(struct rcu_head *rcu) -{ - struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu); - - WARN_ON_ONCE(atomic_read(&node->send_queue_cnt)); - dlm_send_queue_flush(node); - kfree(node); -} - void dlm_midcomms_version_wait(void) { struct midcomms_node *node; @@ -1440,7 +1366,7 @@ static void midcomms_shutdown(struct midcomms_node *node) node->state == DLM_CLOSED || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags), DLM_SHUTDOWN_TIMEOUT); - if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) + if (!ret) pr_debug("active shutdown timed out for node %d with state %s\n", node->nodeid, dlm_state_str(node->state)); else @@ -1458,14 +1384,6 @@ void dlm_midcomms_shutdown(void) for (i = 0; i < CONN_HASH_SIZE; i++) { hlist_for_each_entry_rcu(node, &node_hash[i], hlist) { midcomms_shutdown(node); - - dlm_delete_debug_comms_file(node->debugfs); - - spin_lock(&nodes_lock); - hlist_del_rcu(&node->hlist); - spin_unlock(&nodes_lock); - - call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release); } } srcu_read_unlock(&nodes_srcu, idx); @@ -1481,7 +1399,7 @@ int dlm_midcomms_close(int nodeid) idx = srcu_read_lock(&nodes_srcu); /* Abort pending close/remove operation */ - node = nodeid2node(nodeid, 0); + node = nodeid2node(nodeid); if (node) { /* let shutdown waiters leave */ set_bit(DLM_NODE_FLAG_CLOSE, &node->flags); @@ -1493,7 +1411,7 @@ int dlm_midcomms_close(int nodeid) mutex_lock(&close_lock); idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); + node = nodeid2node(nodeid); if (!node) { srcu_read_unlock(&nodes_srcu, idx); mutex_unlock(&close_lock); @@ -1501,10 +1419,22 @@ int dlm_midcomms_close(int nodeid) } ret = dlm_lowcomms_close(nodeid); - spin_lock(&node->state_lock); - midcomms_node_reset(node); - spin_unlock(&node->state_lock); + dlm_delete_debug_comms_file(node->debugfs); + + spin_lock(&nodes_lock); + hlist_del_rcu(&node->hlist); + spin_unlock(&nodes_lock); srcu_read_unlock(&nodes_srcu, idx); + + /* wait that all readers left until flush send queue */ + synchronize_srcu(&nodes_srcu); + + /* drop all pending dlm messages, this is fine as + * this function get called when the node is fenced + */ + dlm_send_queue_flush(node); + + call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release); mutex_unlock(&close_lock); return ret; diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h index 9f8c9605013d..e7246fb3ef57 100644 --- a/fs/dlm/midcomms.h +++ b/fs/dlm/midcomms.h @@ -20,6 +20,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, gfp_t allocation, char **ppc); void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name, int namelen); +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); void dlm_midcomms_version_wait(void); int dlm_midcomms_close(int nodeid); int dlm_midcomms_start(void); From a3d85fcf268ea40c024e864b219b72516236d15b Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Tue, 1 Aug 2023 14:09:50 -0400 Subject: [PATCH 16/17] fs: dlm: don't use RCOM_NAMES for version detection Currently RCOM_STATUS and RCOM_NAMES inclusive their replies are being used to determine the DLM version. The RCOM_NAMES messages are triggered in DLM recovery when calling dlm_recover_directory() only. At this time the DLM version need to be determined. I ran some tests and did not expirenced some issues. When the DLM version detection was developed probably I run once in a case of RCOM_NAMES and the version was not detected yet. However it seems to be not necessary. For backwards compatibility we still need to accept RCOM_NAMES messages which are not protected regarding the DLM message reliability layer aka stateless message. This patch changes that RCOM_NAMES we are sending out after this patch are not stateless anymore. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/rcom.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index 6ab029149a1d..3b734aed26b5 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -308,15 +308,15 @@ static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in) int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len, uint64_t seq) { + struct dlm_mhandle *mh; struct dlm_rcom *rc; - struct dlm_msg *msg; int error = 0; ls->ls_recover_nodeid = nodeid; retry: - error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len, - &rc, &msg, seq); + error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, + &rc, &mh, seq); if (error) goto out; memcpy(rc->rc_buf, last_name, last_len); @@ -324,7 +324,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, allow_sync_reply(ls, &rc->rc_id); memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); - send_rcom_stateless(msg, rc); + send_rcom(mh, rc); error = dlm_wait_function(ls, &rcom_response); disallow_sync_reply(ls); @@ -337,17 +337,17 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in, uint64_t seq) { + struct dlm_mhandle *mh; struct dlm_rcom *rc; int error, inlen, outlen, nodeid; - struct dlm_msg *msg; nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); inlen = le16_to_cpu(rc_in->rc_header.h_length) - sizeof(struct dlm_rcom); outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom); - error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, - &rc, &msg, seq); + error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, + &rc, &mh, seq); if (error) return; rc->rc_id = rc_in->rc_id; @@ -355,7 +355,7 @@ static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in, dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, nodeid); - send_rcom_stateless(msg, rc); + send_rcom(mh, rc); } int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq) From 7c53e847ff5e97f033fdd31f71949807633d506b Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 24 Aug 2023 16:51:42 -0400 Subject: [PATCH 17/17] dlm: fix plock lookup when using multiple lockspaces All posix lock ops, for all lockspaces (gfs2 file systems) are sent to userspace (dlm_controld) through a single misc device. The dlm_controld daemon reads the ops from the misc device and sends them to other cluster nodes using separate, per-lockspace cluster api communication channels. The ops for a single lockspace are ordered at this level, so that the results are received in the same sequence that the requests were sent. When the results are sent back to the kernel via the misc device, they are again funneled through the single misc device for all lockspaces. When the dlm code in the kernel processes the results from the misc device, these results will be returned in the same sequence that the requests were sent, on a per-lockspace basis. A recent change in this request/reply matching code missed the "per-lockspace" check (fsid comparison) when matching request and reply, so replies could be incorrectly matched to requests from other lockspaces. Cc: stable@vger.kernel.org Reported-by: Barry Marson Fixes: 57e2c2f2d94c ("fs: dlm: fix mismatch of plock results from userspace") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/plock.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index 00e1d802a81c..e6b4c1a21446 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -556,7 +556,8 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, op = plock_lookup_waiter(&info); } else { list_for_each_entry(iter, &recv_list, list) { - if (!iter->info.wait) { + if (!iter->info.wait && + iter->info.fsid == info.fsid) { op = iter; break; } @@ -568,8 +569,7 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, if (info.wait) WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK); else - WARN_ON(op->info.fsid != info.fsid || - op->info.number != info.number || + WARN_ON(op->info.number != info.number || op->info.owner != info.owner || op->info.optype != info.optype);