diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 2beceff024e3..e55e0a2cd2e8 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -664,7 +664,7 @@ static ssize_t comm_addr_store(struct config_item *item, const char *buf, memcpy(addr, buf, len); - rv = dlm_lowcomms_addr(cm->nodeid, addr, len); + rv = dlm_midcomms_addr(cm->nodeid, addr, len); if (rv) { kfree(addr); return rv; diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index a1aca41c49d0..5aabcb6f0f15 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -18,6 +18,7 @@ #include "dlm_internal.h" #include "midcomms.h" #include "lock.h" +#include "ast.h" #define DLM_DEBUG_BUF_LEN 4096 static char debug_buf[DLM_DEBUG_BUF_LEN]; @@ -365,6 +366,52 @@ static void print_format4(struct dlm_rsb *r, struct seq_file *s) unlock_rsb(r); } +static void print_format5_lock(struct seq_file *s, struct dlm_lkb *lkb) +{ + struct dlm_callback *cb; + + /* lkb_id lkb_flags mode flags sb_status sb_flags */ + + spin_lock(&lkb->lkb_cb_lock); + list_for_each_entry(cb, &lkb->lkb_callbacks, list) { + seq_printf(s, "%x %x %d %x %d %x\n", + lkb->lkb_id, + dlm_iflags_val(lkb), + cb->mode, + cb->flags, + cb->sb_status, + cb->sb_flags); + } + spin_unlock(&lkb->lkb_cb_lock); +} + +static void print_format5(struct dlm_rsb *r, struct seq_file *s) +{ + struct dlm_lkb *lkb; + + lock_rsb(r); + + list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) { + print_format5_lock(s, lkb); + if (seq_has_overflowed(s)) + goto out; + } + + list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) { + print_format5_lock(s, lkb); + if (seq_has_overflowed(s)) + goto out; + } + + list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) { + print_format5_lock(s, lkb); + if (seq_has_overflowed(s)) + goto out; + } + out: + unlock_rsb(r); +} + struct rsbtbl_iter { struct dlm_rsb *rsb; unsigned bucket; @@ -408,6 +455,13 @@ static int table_seq_show(struct seq_file *seq, void *iter_ptr) } print_format4(ri->rsb, seq); break; + case 5: + if (ri->header) { + seq_puts(seq, "lkb_id lkb_flags mode flags sb_status sb_flags\n"); + ri->header = 0; + } + print_format5(ri->rsb, seq); + break; } return 0; @@ -417,6 +471,7 @@ static const struct seq_operations format1_seq_ops; static const struct seq_operations format2_seq_ops; static const struct seq_operations format3_seq_ops; static const struct seq_operations format4_seq_ops; +static const struct seq_operations format5_seq_ops; static void *table_seq_start(struct seq_file *seq, loff_t *pos) { @@ -448,6 +503,8 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) ri->format = 3; if (seq->op == &format4_seq_ops) ri->format = 4; + if (seq->op == &format5_seq_ops) + ri->format = 5; tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep; @@ -602,10 +659,18 @@ static const struct seq_operations format4_seq_ops = { .show = table_seq_show, }; +static const struct seq_operations format5_seq_ops = { + .start = table_seq_start, + .next = table_seq_next, + .stop = table_seq_stop, + .show = table_seq_show, +}; + static const struct file_operations format1_fops; static const struct file_operations format2_fops; static const struct file_operations format3_fops; static const struct file_operations format4_fops; +static const struct file_operations format5_fops; static int table_open1(struct inode *inode, struct file *file) { @@ -683,7 +748,21 @@ static int table_open4(struct inode *inode, struct file *file) struct seq_file *seq; int ret; - ret = seq_open(file, &format4_seq_ops); + ret = seq_open(file, &format5_seq_ops); + if (ret) + return ret; + + seq = file->private_data; + seq->private = inode->i_private; /* the dlm_ls */ + return 0; +} + +static int table_open5(struct inode *inode, struct file *file) +{ + struct seq_file *seq; + int ret; + + ret = seq_open(file, &format5_seq_ops); if (ret) return ret; @@ -725,6 +804,14 @@ static const struct file_operations format4_fops = { .release = seq_release }; +static const struct file_operations format5_fops = { + .owner = THIS_MODULE, + .open = table_open5, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release +}; + /* * dump lkb's on the ls_waiters list */ @@ -793,6 +880,7 @@ void dlm_delete_debug_file(struct dlm_ls *ls) debugfs_remove(ls->ls_debug_locks_dentry); debugfs_remove(ls->ls_debug_all_dentry); debugfs_remove(ls->ls_debug_toss_dentry); + debugfs_remove(ls->ls_debug_queued_asts_dentry); } static int dlm_state_show(struct seq_file *file, void *offset) @@ -936,6 +1024,17 @@ void dlm_create_debug_file(struct dlm_ls *ls) dlm_root, ls, &waiters_fops); + + /* format 5 */ + + memset(name, 0, sizeof(name)); + snprintf(name, DLM_LOCKSPACE_LEN + 8, "%s_queued_asts", ls->ls_name); + + ls->ls_debug_queued_asts_dentry = debugfs_create_file(name, + 0644, + dlm_root, + ls, + &format5_fops); } void __init dlm_register_debugfs(void) diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index fb1981654bb2..f6acba4310a7 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -58,7 +58,7 @@ void dlm_recover_dir_nodeid(struct dlm_ls *ls) up_read(&ls->ls_root_sem); } -int dlm_recover_directory(struct dlm_ls *ls) +int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq) { struct dlm_member *memb; char *b, *last_name = NULL; @@ -90,7 +90,7 @@ int dlm_recover_directory(struct dlm_ls *ls) } error = dlm_rcom_names(ls, memb->nodeid, - last_name, last_len); + last_name, last_len, seq); if (error) goto out_free; @@ -196,7 +196,8 @@ int dlm_recover_directory(struct dlm_ls *ls) return error; } -static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) +static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name, + int len) { struct dlm_rsb *r; uint32_t hash, bucket; @@ -232,7 +233,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) for rsb's we're master of and whose directory node matches the requesting node. inbuf is the rsb name last sent, inlen is the name's length */ -void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, +void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, char *outbuf, int outlen, int nodeid) { struct list_head *list; @@ -245,9 +246,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, if (inlen > 1) { r = find_rsb_root(ls, inbuf, inlen); if (!r) { - inbuf[inlen - 1] = '\0'; - log_error(ls, "copy_master_names from %d start %d %s", - nodeid, inlen, inbuf); + log_error(ls, "copy_master_names from %d start %d %.*s", + nodeid, inlen, inlen, inbuf); goto out; } list = r->res_root_list.next; diff --git a/fs/dlm/dir.h b/fs/dlm/dir.h index 03844d086be2..39ecb69d7ef3 100644 --- a/fs/dlm/dir.h +++ b/fs/dlm/dir.h @@ -15,9 +15,9 @@ int dlm_dir_nodeid(struct dlm_rsb *rsb); int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash); void dlm_recover_dir_nodeid(struct dlm_ls *ls); -int dlm_recover_directory(struct dlm_ls *ls); -void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, - char *outbuf, int outlen, int nodeid); +int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq); +void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, + char *outbuf, int outlen, int nodeid); #endif /* __DIR_DOT_H__ */ diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index c8156770205e..dfc444dad329 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -598,6 +598,7 @@ struct dlm_ls { struct dentry *ls_debug_locks_dentry; /* debugfs */ struct dentry *ls_debug_all_dentry; /* debugfs */ struct dentry *ls_debug_toss_dentry; /* debugfs */ + struct dentry *ls_debug_queued_asts_dentry; /* debugfs */ wait_queue_head_t ls_uevent_wait; /* user part of join/leave */ int ls_uevent_result; diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index f511a9d7d416..652c51fbbf76 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -86,8 +86,8 @@ static int send_remove(struct dlm_rsb *r); static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms, bool local); -static int receive_extralen(struct dlm_message *ms); + const struct dlm_message *ms, bool local); +static int receive_extralen(const struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); static void toss_rsb(struct kref *kref); @@ -984,8 +984,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) */ -int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, - unsigned int flags, int *r_nodeid, int *result) +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result) { struct dlm_rsb *r = NULL; uint32_t hash, b; @@ -1106,7 +1106,7 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) } } -void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) +void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) { struct dlm_rsb *r = NULL; uint32_t hash, b; @@ -1459,7 +1459,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) set RESEND and dlm_recover_waiters_post() */ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, - struct dlm_message *ms) + const struct dlm_message *ms) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int overlap_done = 0; @@ -1557,8 +1557,8 @@ static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) /* Handles situations where we might be processing a "fake" or "local" reply in which we can't try to take waiters_mutex again. */ -static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms, - bool local) +static int remove_from_waiters_ms(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; @@ -1800,7 +1800,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) /* lkb is process copy (pc) */ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { int b; @@ -1907,7 +1907,7 @@ static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) } static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { set_lvb_lock_pc(r, lkb, ms); _grant_lock(r, lkb); @@ -1945,7 +1945,7 @@ static void munge_demoted(struct dlm_lkb *lkb) lkb->lkb_grmode = DLM_LOCK_NL; } -static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) +static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms) { if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) && ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) { @@ -3641,8 +3641,9 @@ static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); } -static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, - int ret_nodeid, int rv) +static int send_lookup_reply(struct dlm_ls *ls, + const struct dlm_message *ms_in, int ret_nodeid, + int rv) { struct dlm_rsb *r = &ls->ls_local_rsb; struct dlm_message *ms; @@ -3667,14 +3668,15 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, of message, unlike the send side where we can safely send everything about the lkb for any type of message */ -static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) +static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms) { lkb->lkb_exflags = le32_to_cpu(ms->m_exflags); dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags)); dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); } -static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms, +static void receive_flags_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { if (local) @@ -3684,14 +3686,14 @@ static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms, dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); } -static int receive_extralen(struct dlm_message *ms) +static int receive_extralen(const struct dlm_message *ms) { return (le16_to_cpu(ms->m_header.h_length) - sizeof(struct dlm_message)); } static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { int len; @@ -3719,7 +3721,7 @@ static void fake_astfn(void *astparam) } static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); lkb->lkb_ownpid = le32_to_cpu(ms->m_pid); @@ -3741,7 +3743,7 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, } static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { if (lkb->lkb_status != DLM_LKSTS_GRANTED) return -EBUSY; @@ -3756,7 +3758,7 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, } static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_message *ms) + const struct dlm_message *ms) { if (receive_lvb(ls, lkb, ms)) return -ENOMEM; @@ -3766,7 +3768,7 @@ static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, /* We fill in the local-lkb fields with the info that send_xxxx_reply() uses to send a reply and that the remote end uses to process the reply. */ -static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms) +static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb = &ls->ls_local_lkb; lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); @@ -3776,7 +3778,7 @@ static void setup_local_lkb(struct dlm_ls *ls, struct dlm_message *ms) /* This is called after the rsb is locked so that we can safely inspect fields in the lkb. */ -static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) +static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms) { int from = le32_to_cpu(ms->m_header.h_nodeid); int error = 0; @@ -3828,7 +3830,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) return error; } -static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -3907,7 +3909,7 @@ static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) return error; } -static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -3963,7 +3965,7 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) return error; } -static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4015,7 +4017,7 @@ static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) return error; } -static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4051,7 +4053,7 @@ static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) return error; } -static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4082,7 +4084,7 @@ static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4110,7 +4112,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms) { int len, error, ret_nodeid, from_nodeid, our_nodeid; @@ -4130,7 +4132,7 @@ static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) send_lookup_reply(ls, ms, ret_nodeid, error); } -static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) { char name[DLM_RESNAME_MAXLEN+1]; struct dlm_rsb *r; @@ -4218,12 +4220,13 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) } } -static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) { do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid)); } -static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_request_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4345,7 +4348,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) } static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, - struct dlm_message *ms, bool local) + const struct dlm_message *ms, bool local) { /* this is the value returned from do_convert() on the master */ switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { @@ -4388,8 +4391,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, } } -static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms, - bool local) +static void _receive_convert_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_rsb *r = lkb->lkb_resource; int error; @@ -4412,7 +4415,8 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms, put_rsb(r); } -static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_convert_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; int error; @@ -4426,8 +4430,8 @@ static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms, - bool local) +static void _receive_unlock_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_rsb *r = lkb->lkb_resource; int error; @@ -4463,7 +4467,8 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms, put_rsb(r); } -static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_unlock_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; int error; @@ -4477,8 +4482,8 @@ static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms, - bool local) +static void _receive_cancel_reply(struct dlm_lkb *lkb, + const struct dlm_message *ms, bool local) { struct dlm_rsb *r = lkb->lkb_resource; int error; @@ -4515,7 +4520,8 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms, put_rsb(r); } -static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) +static int receive_cancel_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; int error; @@ -4529,7 +4535,8 @@ static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) return 0; } -static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) +static void receive_lookup_reply(struct dlm_ls *ls, + const struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; @@ -4608,7 +4615,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) dlm_put_lkb(lkb); } -static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, +static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq) { int error = 0, noent = 0; @@ -4744,7 +4751,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, requestqueue, to processing all the saved messages, to processing new messages as they arrive. */ -static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, +static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms, int nodeid) { if (dlm_locking_stopped(ls)) { @@ -4767,7 +4774,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, /* This is called by dlm_recoverd to process messages that were saved on the requestqueue. */ -void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, +void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq) { _receive_message(ls, ms, saved_seq); @@ -4778,9 +4785,9 @@ void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, standard locking activity) or an RCOM (recovery message sent as part of lockspace recovery). */ -void dlm_receive_buffer(union dlm_packet *p, int nodeid) +void dlm_receive_buffer(const union dlm_packet *p, int nodeid) { - struct dlm_header *hd = &p->header; + const struct dlm_header *hd = &p->header; struct dlm_ls *ls; int type = 0; @@ -5334,7 +5341,7 @@ static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, /* needs at least dlm_rcom + rcom_lock */ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, - struct dlm_rsb *r, struct dlm_rcom *rc) + struct dlm_rsb *r, const struct dlm_rcom *rc) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; @@ -5384,7 +5391,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, back the rcom_lock struct we got but with the remid field filled in. */ /* needs at least dlm_rcom + rcom_lock */ -int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) +int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, + __le32 *rl_remid, __le32 *rl_result) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct dlm_rsb *r; @@ -5393,6 +5401,9 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); int error; + /* init rl_remid with rcom lock rl_remid */ + *rl_remid = rl->rl_remid; + if (rl->rl_parent_lkid) { error = -EOPNOTSUPP; goto out; @@ -5448,7 +5459,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) out_remid: /* this is the new value returned to the lock holder for saving in its process-copy lkb */ - rl->rl_remid = cpu_to_le32(lkb->lkb_id); + *rl_remid = cpu_to_le32(lkb->lkb_id); lkb->lkb_recover_seq = ls->ls_recover_seq; @@ -5459,12 +5470,13 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) if (error && error != -EEXIST) log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", from_nodeid, remid, error); - rl->rl_result = cpu_to_le32(error); + *rl_result = cpu_to_le32(error); return error; } /* needs at least dlm_rcom + rcom_lock */ -int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) +int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, + uint64_t seq) { struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; struct dlm_rsb *r; @@ -5509,7 +5521,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, result); - dlm_send_rcom_lock(r, lkb); + dlm_send_rcom_lock(r, lkb, seq); goto out; case -EEXIST: case 0: diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index aa5ad44d902b..b54e2cbbe6e2 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -12,11 +12,11 @@ #define __LOCK_DOT_H__ void dlm_dump_rsb(struct dlm_rsb *r); -void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len); +void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len); void dlm_print_lkb(struct dlm_lkb *lkb); -void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, +void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq); -void dlm_receive_buffer(union dlm_packet *p, int nodeid); +void dlm_receive_buffer(const union dlm_packet *p, int nodeid); int dlm_modes_compat(int mode1, int mode2); void dlm_put_rsb(struct dlm_rsb *r); void dlm_hold_rsb(struct dlm_rsb *r); @@ -25,8 +25,8 @@ void dlm_scan_rsbs(struct dlm_ls *ls); int dlm_lock_recovery_try(struct dlm_ls *ls); void dlm_unlock_recovery(struct dlm_ls *ls); -int dlm_master_lookup(struct dlm_ls *ls, int nodeid, char *name, int len, - unsigned int flags, int *r_nodeid, int *result); +int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, + int len, unsigned int flags, int *r_nodeid, int *result); int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len, struct dlm_rsb **r_ret); @@ -36,8 +36,10 @@ void dlm_purge_mstcpy_locks(struct dlm_rsb *r); void dlm_recover_grant(struct dlm_ls *ls); int dlm_recover_waiters_post(struct dlm_ls *ls); void dlm_recover_waiters_pre(struct dlm_ls *ls); -int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc); -int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc); +int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, + __le32 *rl_remid, __le32 *rl_result); +int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, + uint64_t seq); int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode, uint32_t flags, void *name, unsigned int namelen); diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 9f14ea9f6322..f7bc22e74db2 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -863,7 +863,6 @@ struct dlm_processed_nodes { static void process_dlm_messages(struct work_struct *work) { struct processqueue_entry *pentry; - LIST_HEAD(processed_nodes); spin_lock(&processqueue_lock); pentry = list_first_entry_or_null(&processqueue, diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 77d202e4a02a..be7909ead71b 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -18,7 +18,7 @@ #include "midcomms.h" #include "lowcomms.h" -int dlm_slots_version(struct dlm_header *h) +int dlm_slots_version(const struct dlm_header *h) { if ((le32_to_cpu(h->h_version) & 0x0000FFFF) < DLM_HEADER_SLOTS) return 0; @@ -393,14 +393,9 @@ static void remove_remote_member(int nodeid) dlm_midcomms_remove_member(nodeid); } -static void clear_members_cb(int nodeid) -{ - remove_remote_member(nodeid); -} - void dlm_clear_members(struct dlm_ls *ls) { - clear_memb_list(&ls->ls_nodes, clear_members_cb); + clear_memb_list(&ls->ls_nodes, remove_remote_member); ls->ls_num_nodes = 0; } @@ -454,7 +449,7 @@ static void make_member_array(struct dlm_ls *ls) /* send a status request to all members just to establish comms connections */ -static int ping_members(struct dlm_ls *ls) +static int ping_members(struct dlm_ls *ls, uint64_t seq) { struct dlm_member *memb; int error = 0; @@ -464,7 +459,7 @@ static int ping_members(struct dlm_ls *ls) error = -EINTR; break; } - error = dlm_rcom_status(ls, memb->nodeid, 0); + error = dlm_rcom_status(ls, memb->nodeid, 0, seq); if (error) break; } @@ -612,7 +607,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) make_member_array(ls); *neg_out = neg; - error = ping_members(ls); + error = ping_members(ls, rv->seq); log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); return error; } diff --git a/fs/dlm/member.h b/fs/dlm/member.h index 433b2fac9f4a..f61cfde46314 100644 --- a/fs/dlm/member.h +++ b/fs/dlm/member.h @@ -18,7 +18,7 @@ void dlm_clear_members_gone(struct dlm_ls *ls); int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out); int dlm_is_removed(struct dlm_ls *ls, int nodeid); int dlm_is_member(struct dlm_ls *ls, int nodeid); -int dlm_slots_version(struct dlm_header *h); +int dlm_slots_version(const struct dlm_header *h); void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc, struct dlm_member *memb); void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc); diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index e1a0df67b566..f641b36a36db 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -330,18 +330,23 @@ static void midcomms_node_reset(struct midcomms_node *node) wake_up(&node->shutdown_wait); } -static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc) +static struct midcomms_node *nodeid2node(int nodeid) { - struct midcomms_node *node, *tmp; - int r = nodeid_hash(nodeid); + return __find_node(nodeid, nodeid_hash(nodeid)); +} - node = __find_node(nodeid, r); - if (node || !alloc) - return node; +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) +{ + int ret, r = nodeid_hash(nodeid); + struct midcomms_node *node; - node = kmalloc(sizeof(*node), alloc); + ret = dlm_lowcomms_addr(nodeid, addr, len); + if (ret) + return ret; + + node = kmalloc(sizeof(*node), GFP_NOFS); if (!node) - return NULL; + return -ENOMEM; node->nodeid = nodeid; spin_lock_init(&node->state_lock); @@ -353,21 +358,11 @@ static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc) midcomms_node_reset(node); spin_lock(&nodes_lock); - /* check again if there was somebody else - * earlier here to add the node - */ - tmp = __find_node(nodeid, r); - if (tmp) { - spin_unlock(&nodes_lock); - kfree(node); - return tmp; - } - hlist_add_head_rcu(&node->hlist, &node_hash[r]); spin_unlock(&nodes_lock); node->debugfs = dlm_create_debug_comms_file(nodeid, node); - return node; + return 0; } static int dlm_send_ack(int nodeid, uint32_t seq) @@ -499,7 +494,8 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node) spin_unlock(&node->state_lock); } -static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p) +static void dlm_receive_buffer_3_2_trace(uint32_t seq, + const union dlm_packet *p) { switch (p->header.h_cmd) { case DLM_MSG: @@ -513,7 +509,7 @@ static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p) } } -static void dlm_midcomms_receive_buffer(union dlm_packet *p, +static void dlm_midcomms_receive_buffer(const union dlm_packet *p, struct midcomms_node *node, uint32_t seq) { @@ -602,113 +598,8 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p, } } -static struct midcomms_node * -dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p, - uint16_t msglen, int (*cb)(struct midcomms_node *node)) -{ - struct midcomms_node *node = NULL; - gfp_t allocation = 0; - int ret; - - switch (p->header.h_cmd) { - case DLM_RCOM: - if (msglen < sizeof(struct dlm_rcom)) { - log_print("rcom msg too small: %u, will skip this message from node %d", - msglen, nodeid); - return NULL; - } - - switch (p->rcom.rc_type) { - case cpu_to_le32(DLM_RCOM_NAMES): - fallthrough; - case cpu_to_le32(DLM_RCOM_NAMES_REPLY): - fallthrough; - case cpu_to_le32(DLM_RCOM_STATUS): - fallthrough; - case cpu_to_le32(DLM_RCOM_STATUS_REPLY): - node = nodeid2node(nodeid, 0); - if (node) { - spin_lock(&node->state_lock); - if (node->state != DLM_ESTABLISHED) - pr_debug("receive begin RCOM msg from node %d with state %s\n", - node->nodeid, dlm_state_str(node->state)); - - switch (node->state) { - case DLM_CLOSED: - node->state = DLM_ESTABLISHED; - pr_debug("switch node %d to state %s\n", - node->nodeid, dlm_state_str(node->state)); - break; - case DLM_ESTABLISHED: - break; - default: - spin_unlock(&node->state_lock); - return NULL; - } - spin_unlock(&node->state_lock); - } - - allocation = GFP_NOFS; - break; - default: - break; - } - - break; - default: - break; - } - - node = nodeid2node(nodeid, allocation); - if (!node) { - switch (p->header.h_cmd) { - case DLM_OPTS: - if (msglen < sizeof(struct dlm_opts)) { - log_print("opts msg too small: %u, will skip this message from node %d", - msglen, nodeid); - return NULL; - } - - log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence", - p->opts.o_nextcmd, nodeid); - break; - default: - log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence", - p->header.h_cmd, nodeid); - break; - } - - return NULL; - } - - ret = cb(node); - if (ret < 0) - return NULL; - - return node; -} - -static int dlm_midcomms_version_check_3_2(struct midcomms_node *node) -{ - switch (node->version) { - case DLM_VERSION_NOT_SET: - node->version = DLM_VERSION_3_2; - wake_up(&node->shutdown_wait); - log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2, - node->nodeid); - break; - case DLM_VERSION_3_2: - break; - default: - log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", - DLM_VERSION_3_2, node->nodeid, node->version); - return -1; - } - - return 0; -} - -static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid) +static int dlm_opts_check_msglen(const union dlm_packet *p, uint16_t msglen, + int nodeid) { int len = msglen; @@ -757,7 +648,7 @@ static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodei return 0; } -static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid) +static void dlm_midcomms_receive_buffer_3_2(const union dlm_packet *p, int nodeid) { uint16_t msglen = le16_to_cpu(p->header.h_length); struct midcomms_node *node; @@ -765,11 +656,38 @@ static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid) int ret, idx; idx = srcu_read_lock(&nodes_srcu); - node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen, - dlm_midcomms_version_check_3_2); - if (!node) + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) goto out; + switch (node->version) { + case DLM_VERSION_NOT_SET: + node->version = DLM_VERSION_3_2; + wake_up(&node->shutdown_wait); + log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2, + node->nodeid); + + spin_lock(&node->state_lock); + switch (node->state) { + case DLM_CLOSED: + node->state = DLM_ESTABLISHED; + pr_debug("switch node %d to state %s\n", + node->nodeid, dlm_state_str(node->state)); + break; + default: + break; + } + spin_unlock(&node->state_lock); + + break; + case DLM_VERSION_3_2: + break; + default: + log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", + DLM_VERSION_3_2, node->nodeid, node->version); + goto out; + } + switch (p->header.h_cmd) { case DLM_RCOM: /* these rcom message we use to determine version. @@ -858,8 +776,19 @@ static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid) srcu_read_unlock(&nodes_srcu, idx); } -static int dlm_midcomms_version_check_3_1(struct midcomms_node *node) +static void dlm_midcomms_receive_buffer_3_1(const union dlm_packet *p, int nodeid) { + uint16_t msglen = le16_to_cpu(p->header.h_length); + struct midcomms_node *node; + int idx; + + idx = srcu_read_lock(&nodes_srcu); + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) { + srcu_read_unlock(&nodes_srcu, idx); + return; + } + switch (node->version) { case DLM_VERSION_NOT_SET: node->version = DLM_VERSION_3_1; @@ -872,22 +801,6 @@ static int dlm_midcomms_version_check_3_1(struct midcomms_node *node) default: log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", DLM_VERSION_3_1, node->nodeid, node->version); - return -1; - } - - return 0; -} - -static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid) -{ - uint16_t msglen = le16_to_cpu(p->header.h_length); - struct midcomms_node *node; - int idx; - - idx = srcu_read_lock(&nodes_srcu); - node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen, - dlm_midcomms_version_check_3_1); - if (!node) { srcu_read_unlock(&nodes_srcu, idx); return; } @@ -977,10 +890,10 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) switch (hd->h_version) { case cpu_to_le32(DLM_VERSION_3_1): - dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid); + dlm_midcomms_receive_buffer_3_1((const union dlm_packet *)ptr, nodeid); break; case cpu_to_le32(DLM_VERSION_3_2): - dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid); + dlm_midcomms_receive_buffer_3_2((const union dlm_packet *)ptr, nodeid); break; default: log_print("received invalid version header: %u from node %d, will skip this message", @@ -1003,8 +916,8 @@ void dlm_midcomms_unack_msg_resend(int nodeid) int idx, ret; idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) { srcu_read_unlock(&nodes_srcu, idx); return; } @@ -1090,11 +1003,9 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, int idx; idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { - WARN_ON_ONCE(1); + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) goto err; - } /* this is a bug, however we going on and hope it will be resolved */ WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags)); @@ -1235,8 +1146,34 @@ void dlm_midcomms_init(void) dlm_lowcomms_init(); } +static void midcomms_node_release(struct rcu_head *rcu) +{ + struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu); + + WARN_ON_ONCE(atomic_read(&node->send_queue_cnt)); + dlm_send_queue_flush(node); + kfree(node); +} + void dlm_midcomms_exit(void) { + struct midcomms_node *node; + int i, idx; + + idx = srcu_read_lock(&nodes_srcu); + for (i = 0; i < CONN_HASH_SIZE; i++) { + hlist_for_each_entry_rcu(node, &node_hash[i], hlist) { + dlm_delete_debug_comms_file(node->debugfs); + + spin_lock(&nodes_lock); + hlist_del_rcu(&node->hlist); + spin_unlock(&nodes_lock); + + call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release); + } + } + srcu_read_unlock(&nodes_srcu, idx); + dlm_lowcomms_exit(); } @@ -1277,8 +1214,8 @@ void dlm_midcomms_add_member(int nodeid) int idx; idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, GFP_NOFS); - if (!node) { + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) { srcu_read_unlock(&nodes_srcu, idx); return; } @@ -1322,8 +1259,8 @@ void dlm_midcomms_remove_member(int nodeid) int idx; idx = srcu_read_lock(&nodes_srcu); - node = nodeid2node(nodeid, 0); - if (!node) { + node = nodeid2node(nodeid); + if (WARN_ON_ONCE(!node)) { srcu_read_unlock(&nodes_srcu, idx); return; } @@ -1367,15 +1304,6 @@ void dlm_midcomms_remove_member(int nodeid) srcu_read_unlock(&nodes_srcu, idx); } -static void midcomms_node_release(struct rcu_head *rcu) -{ - struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu); - - WARN_ON_ONCE(atomic_read(&node->send_queue_cnt)); - dlm_send_queue_flush(node); - kfree(node); -} - void dlm_midcomms_version_wait(void) { struct midcomms_node *node; @@ -1438,7 +1366,7 @@ static void midcomms_shutdown(struct midcomms_node *node) node->state == DLM_CLOSED || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags), DLM_SHUTDOWN_TIMEOUT); - if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) + if (!ret) pr_debug("active shutdown timed out for node %d with state %s\n", node->nodeid, dlm_state_str(node->state)); else @@ -1456,14 +1384,6 @@ void dlm_midcomms_shutdown(void) for (i = 0; i < CONN_HASH_SIZE; i++) { hlist_for_each_entry_rcu(node, &node_hash[i], hlist) { midcomms_shutdown(node); - - dlm_delete_debug_comms_file(node->debugfs); - - spin_lock(&nodes_lock); - hlist_del_rcu(&node->hlist); - spin_unlock(&nodes_lock); - - call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release); } } srcu_read_unlock(&nodes_srcu, idx); @@ -1479,7 +1399,7 @@ int dlm_midcomms_close(int nodeid) idx = srcu_read_lock(&nodes_srcu); /* Abort pending close/remove operation */ - node = nodeid2node(nodeid, 0); + node = nodeid2node(nodeid); if (node) { /* let shutdown waiters leave */ set_bit(DLM_NODE_FLAG_CLOSE, &node->flags); @@ -1489,20 +1409,32 @@ int dlm_midcomms_close(int nodeid) synchronize_srcu(&nodes_srcu); - idx = srcu_read_lock(&nodes_srcu); mutex_lock(&close_lock); - node = nodeid2node(nodeid, 0); + idx = srcu_read_lock(&nodes_srcu); + node = nodeid2node(nodeid); if (!node) { - mutex_unlock(&close_lock); srcu_read_unlock(&nodes_srcu, idx); + mutex_unlock(&close_lock); return dlm_lowcomms_close(nodeid); } ret = dlm_lowcomms_close(nodeid); - spin_lock(&node->state_lock); - midcomms_node_reset(node); - spin_unlock(&node->state_lock); + dlm_delete_debug_comms_file(node->debugfs); + + spin_lock(&nodes_lock); + hlist_del_rcu(&node->hlist); + spin_unlock(&nodes_lock); srcu_read_unlock(&nodes_srcu, idx); + + /* wait that all readers left until flush send queue */ + synchronize_srcu(&nodes_srcu); + + /* drop all pending dlm messages, this is fine as + * this function get called when the node is fenced + */ + dlm_send_queue_flush(node); + + call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release); mutex_unlock(&close_lock); return ret; diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h index 9f8c9605013d..e7246fb3ef57 100644 --- a/fs/dlm/midcomms.h +++ b/fs/dlm/midcomms.h @@ -20,6 +20,7 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, gfp_t allocation, char **ppc); void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, const void *name, int namelen); +int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len); void dlm_midcomms_version_wait(void); int dlm_midcomms_close(int nodeid); int dlm_midcomms_start(void); diff --git a/fs/dlm/plock.c b/fs/dlm/plock.c index 70a4752ed913..e6b4c1a21446 100644 --- a/fs/dlm/plock.c +++ b/fs/dlm/plock.c @@ -11,6 +11,8 @@ #include #include +#include + #include "dlm_internal.h" #include "lockspace.h" @@ -42,6 +44,27 @@ static inline void set_version(struct dlm_plock_info *info) info->version[2] = DLM_PLOCK_VERSION_PATCH; } +static struct plock_op *plock_lookup_waiter(const struct dlm_plock_info *info) +{ + struct plock_op *op = NULL, *iter; + + list_for_each_entry(iter, &recv_list, list) { + if (iter->info.fsid == info->fsid && + iter->info.number == info->number && + iter->info.owner == info->owner && + iter->info.pid == info->pid && + iter->info.start == info->start && + iter->info.end == info->end && + iter->info.ex == info->ex && + iter->info.wait) { + op = iter; + break; + } + } + + return op; +} + static int check_version(struct dlm_plock_info *info) { if ((DLM_PLOCK_VERSION_MAJOR != info->version[0]) || @@ -74,30 +97,26 @@ static void send_op(struct plock_op *op) wake_up(&send_wq); } -/* If a process was killed while waiting for the only plock on a file, - locks_remove_posix will not see any lock on the file so it won't - send an unlock-close to us to pass on to userspace to clean up the - abandoned waiter. So, we have to insert the unlock-close when the - lock call is interrupted. */ - -static void do_unlock_close(const struct dlm_plock_info *info) +static int do_lock_cancel(const struct dlm_plock_info *orig_info) { struct plock_op *op; + int rv; op = kzalloc(sizeof(*op), GFP_NOFS); if (!op) - return; + return -ENOMEM; - op->info.optype = DLM_PLOCK_OP_UNLOCK; - op->info.pid = info->pid; - op->info.fsid = info->fsid; - op->info.number = info->number; - op->info.start = 0; - op->info.end = OFFSET_MAX; - op->info.owner = info->owner; + op->info = *orig_info; + op->info.optype = DLM_PLOCK_OP_CANCEL; + op->info.wait = 0; - op->info.flags |= DLM_PLOCK_FL_CLOSE; send_op(op); + wait_event(recv_wq, (op->done != 0)); + + rv = op->info.rv; + + dlm_release_plock_op(op); + return rv; } int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, @@ -156,7 +175,7 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, send_op(op); if (op->info.wait) { - rv = wait_event_killable(recv_wq, (op->done != 0)); + rv = wait_event_interruptible(recv_wq, (op->done != 0)); if (rv == -ERESTARTSYS) { spin_lock(&ops_lock); /* recheck under ops_lock if we got a done != 0, @@ -166,17 +185,37 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, spin_unlock(&ops_lock); goto do_lock_wait; } - list_del(&op->list); spin_unlock(&ops_lock); + rv = do_lock_cancel(&op->info); + switch (rv) { + case 0: + /* waiter was deleted in user space, answer will never come + * remove original request. The original request must be + * on recv_list because the answer of do_lock_cancel() + * synchronized it. + */ + spin_lock(&ops_lock); + list_del(&op->list); + spin_unlock(&ops_lock); + rv = -EINTR; + break; + case -ENOENT: + /* cancellation wasn't successful but op should be done */ + fallthrough; + default: + /* internal error doing cancel we need to wait */ + goto wait; + } + log_debug(ls, "%s: wait interrupted %x %llx pid %d", __func__, ls->ls_global_id, (unsigned long long)number, op->info.pid); - do_unlock_close(&op->info); dlm_release_plock_op(op); goto out; } } else { +wait: wait_event(recv_wq, (op->done != 0)); } @@ -240,8 +279,8 @@ static int dlm_plock_callback(struct plock_op *op) rv = notify(fl, 0); if (rv) { /* XXX: We need to cancel the fs lock here: */ - log_print("dlm_plock_callback: lock granted after lock request " - "failed; dangling lock!\n"); + log_print("%s: lock granted after lock request failed; dangling lock!", + __func__); goto out; } @@ -318,6 +357,75 @@ int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, } EXPORT_SYMBOL_GPL(dlm_posix_unlock); +/* + * NOTE: This implementation can only handle async lock requests as nfs + * do it. It cannot handle cancellation of a pending lock request sitting + * in wait_event(), but for now only nfs is the only user local kernel + * user. + */ +int dlm_posix_cancel(dlm_lockspace_t *lockspace, u64 number, struct file *file, + struct file_lock *fl) +{ + struct dlm_plock_info info; + struct plock_op *op; + struct dlm_ls *ls; + int rv; + + /* this only works for async request for now and nfs is the only + * kernel user right now. + */ + if (WARN_ON_ONCE(!fl->fl_lmops || !fl->fl_lmops->lm_grant)) + return -EOPNOTSUPP; + + ls = dlm_find_lockspace_local(lockspace); + if (!ls) + return -EINVAL; + + memset(&info, 0, sizeof(info)); + info.pid = fl->fl_pid; + info.ex = (fl->fl_type == F_WRLCK); + info.fsid = ls->ls_global_id; + dlm_put_lockspace(ls); + info.number = number; + info.start = fl->fl_start; + info.end = fl->fl_end; + info.owner = (__u64)fl->fl_pid; + + rv = do_lock_cancel(&info); + switch (rv) { + case 0: + spin_lock(&ops_lock); + /* lock request to cancel must be on recv_list because + * do_lock_cancel() synchronizes it. + */ + op = plock_lookup_waiter(&info); + if (WARN_ON_ONCE(!op)) { + spin_unlock(&ops_lock); + rv = -ENOLCK; + break; + } + + list_del(&op->list); + spin_unlock(&ops_lock); + WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK); + op->data->callback(op->data->fl, -EINTR); + dlm_release_plock_op(op); + rv = -EINTR; + break; + case -ENOENT: + /* if cancel wasn't successful we probably were to late + * or it was a non-blocking lock request, so just unlock it. + */ + rv = dlm_posix_unlock(lockspace, number, file, fl); + break; + default: + break; + } + + return rv; +} +EXPORT_SYMBOL_GPL(dlm_posix_cancel); + int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file, struct file_lock *fl) { @@ -403,6 +511,8 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count, if (!op) return -EAGAIN; + trace_dlm_plock_read(&info); + /* there is no need to get a reply from userspace for unlocks that were generated by the vfs cleaning up for a close (the process did not make an unlock call). */ @@ -430,6 +540,8 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, if (copy_from_user(&info, u, sizeof(info))) return -EFAULT; + trace_dlm_plock_write(&info); + if (check_version(&info)) return -EINVAL; @@ -441,22 +553,11 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, */ spin_lock(&ops_lock); if (info.wait) { - list_for_each_entry(iter, &recv_list, list) { - if (iter->info.fsid == info.fsid && - iter->info.number == info.number && - iter->info.owner == info.owner && - iter->info.pid == info.pid && - iter->info.start == info.start && - iter->info.end == info.end && - iter->info.ex == info.ex && - iter->info.wait) { - op = iter; - break; - } - } + op = plock_lookup_waiter(&info); } else { list_for_each_entry(iter, &recv_list, list) { - if (!iter->info.wait) { + if (!iter->info.wait && + iter->info.fsid == info.fsid) { op = iter; break; } @@ -468,8 +569,7 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count, if (info.wait) WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK); else - WARN_ON(op->info.fsid != info.fsid || - op->info.number != info.number || + WARN_ON(op->info.number != info.number || op->info.owner != info.owner || op->info.optype != info.optype); @@ -534,5 +634,7 @@ int dlm_plock_init(void) void dlm_plock_exit(void) { misc_deregister(&plock_dev_misc); + WARN_ON(!list_empty(&send_list)); + WARN_ON(!list_empty(&recv_list)); } diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c index f4afdf892f78..3b734aed26b5 100644 --- a/fs/dlm/rcom.c +++ b/fs/dlm/rcom.c @@ -28,7 +28,8 @@ static int rcom_response(struct dlm_ls *ls) } static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, - struct dlm_rcom **rc_ret, char *mb, int mb_len) + struct dlm_rcom **rc_ret, char *mb, int mb_len, + uint64_t seq) { struct dlm_rcom *rc; @@ -41,16 +42,14 @@ static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, rc->rc_header.h_cmd = DLM_RCOM; rc->rc_type = cpu_to_le32(type); - - spin_lock(&ls->ls_recover_lock); - rc->rc_seq = cpu_to_le64(ls->ls_recover_seq); - spin_unlock(&ls->ls_recover_lock); + rc->rc_seq = cpu_to_le64(seq); *rc_ret = rc; } static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, - struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret) + struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret, + uint64_t seq) { int mb_len = sizeof(struct dlm_rcom) + len; struct dlm_mhandle *mh; @@ -63,14 +62,14 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, return -ENOBUFS; } - _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); + _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq); *mh_ret = mh; return 0; } static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, int len, struct dlm_rcom **rc_ret, - struct dlm_msg **msg_ret) + struct dlm_msg **msg_ret, uint64_t seq) { int mb_len = sizeof(struct dlm_rcom) + len; struct dlm_msg *msg; @@ -84,7 +83,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, return -ENOBUFS; } - _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); + _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq); *msg_ret = msg; return 0; } @@ -170,7 +169,8 @@ static void disallow_sync_reply(struct dlm_ls *ls) * node's rcom_config. */ -int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) +int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags, + uint64_t seq) { struct dlm_rcom *rc; struct dlm_msg *msg; @@ -186,7 +186,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) retry: error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS, - sizeof(struct rcom_status), &rc, &msg); + sizeof(struct rcom_status), &rc, &msg, + seq); if (error) goto out; @@ -220,7 +221,9 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) return error; } -static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_status(struct dlm_ls *ls, + const struct dlm_rcom *rc_in, + uint64_t seq) { struct dlm_rcom *rc; struct rcom_status *rs; @@ -251,7 +254,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) do_create: error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY, - len, &rc, &msg); + len, &rc, &msg, seq); if (error) return; @@ -281,7 +284,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) send_rcom_stateless(msg, rc); } -static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in) { spin_lock(&ls->ls_rcom_spin); if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) || @@ -302,17 +305,18 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) spin_unlock(&ls->ls_rcom_spin); } -int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) +int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, + int last_len, uint64_t seq) { + struct dlm_mhandle *mh; struct dlm_rcom *rc; - struct dlm_msg *msg; int error = 0; ls->ls_recover_nodeid = nodeid; retry: - error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len, - &rc, &msg); + error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, + &rc, &mh, seq); if (error) goto out; memcpy(rc->rc_buf, last_name, last_len); @@ -320,7 +324,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) allow_sync_reply(ls, &rc->rc_id); memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); - send_rcom_stateless(msg, rc); + send_rcom(mh, rc); error = dlm_wait_function(ls, &rcom_response); disallow_sync_reply(ls); @@ -330,19 +334,20 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) return error; } -static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in, + uint64_t seq) { + struct dlm_mhandle *mh; struct dlm_rcom *rc; int error, inlen, outlen, nodeid; - struct dlm_msg *msg; nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); inlen = le16_to_cpu(rc_in->rc_header.h_length) - sizeof(struct dlm_rcom); outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom); - error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, - &rc, &msg); + error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, + &rc, &mh, seq); if (error) return; rc->rc_id = rc_in->rc_id; @@ -350,10 +355,10 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, nodeid); - send_rcom_stateless(msg, rc); + send_rcom(mh, rc); } -int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) +int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq) { struct dlm_rcom *rc; struct dlm_mhandle *mh; @@ -361,7 +366,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) int error; error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length, - &rc, &mh); + &rc, &mh, seq); if (error) goto out; memcpy(rc->rc_buf, r->res_name, r->res_length); @@ -372,7 +377,8 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) return error; } -static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_lookup(struct dlm_ls *ls, + const struct dlm_rcom *rc_in, uint64_t seq) { struct dlm_rcom *rc; struct dlm_mhandle *mh; @@ -387,7 +393,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) return; } - error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh); + error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh, + seq); if (error) return; @@ -402,7 +409,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) send_rcom(mh, rc); } -static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_lookup_reply(struct dlm_ls *ls, + const struct dlm_rcom *rc_in) { dlm_recover_master_reply(ls, rc_in); } @@ -437,7 +445,7 @@ static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); } -int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) +int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq) { struct dlm_ls *ls = r->res_ls; struct dlm_rcom *rc; @@ -448,7 +456,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) if (lkb->lkb_lvbptr) len += ls->ls_lvblen; - error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh); + error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh, + seq); if (error) goto out; @@ -462,23 +471,28 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) } /* needs at least dlm_rcom + rcom_lock */ -static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) +static void receive_rcom_lock(struct dlm_ls *ls, const struct dlm_rcom *rc_in, + uint64_t seq) { + __le32 rl_remid, rl_result; + struct rcom_lock *rl; struct dlm_rcom *rc; struct dlm_mhandle *mh; int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); - dlm_recover_master_copy(ls, rc_in); + dlm_recover_master_copy(ls, rc_in, &rl_remid, &rl_result); error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY, - sizeof(struct rcom_lock), &rc, &mh); + sizeof(struct rcom_lock), &rc, &mh, seq); if (error) return; - /* We send back the same rcom_lock struct we received, but - dlm_recover_master_copy() has filled in rl_remid and rl_result */ - memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); + rl = (struct rcom_lock *)rc->rc_buf; + /* set rl_remid and rl_result from dlm_recover_master_copy() */ + rl->rl_remid = rl_remid; + rl->rl_result = rl_result; + rc->rc_id = rc_in->rc_id; rc->rc_seq_reply = rc_in->rc_seq; @@ -488,7 +502,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) /* If the lockspace doesn't exist then still send a status message back; it's possible that it just doesn't have its global_id yet. */ -int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) +int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in) { struct dlm_rcom *rc; struct rcom_config *rf; @@ -566,7 +580,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) /* Called by dlm_recv; corresponds to dlm_receive_message() but special recovery-only comms are sent through here. */ -void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) +void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid) { int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); int stop, reply = 0, names = 0, lookup = 0, lock = 0; @@ -620,21 +634,21 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) switch (rc->rc_type) { case cpu_to_le32(DLM_RCOM_STATUS): - receive_rcom_status(ls, rc); + receive_rcom_status(ls, rc, seq); break; case cpu_to_le32(DLM_RCOM_NAMES): - receive_rcom_names(ls, rc); + receive_rcom_names(ls, rc, seq); break; case cpu_to_le32(DLM_RCOM_LOOKUP): - receive_rcom_lookup(ls, rc); + receive_rcom_lookup(ls, rc, seq); break; case cpu_to_le32(DLM_RCOM_LOCK): if (le16_to_cpu(rc->rc_header.h_length) < lock_size) goto Eshort; - receive_rcom_lock(ls, rc); + receive_rcom_lock(ls, rc, seq); break; case cpu_to_le32(DLM_RCOM_STATUS_REPLY): @@ -652,7 +666,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) case cpu_to_le32(DLM_RCOM_LOCK_REPLY): if (le16_to_cpu(rc->rc_header.h_length) < lock_size) goto Eshort; - dlm_recover_process_copy(ls, rc); + dlm_recover_process_copy(ls, rc, seq); break; default: diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h index 454d3c4814ab..765926ae0020 100644 --- a/fs/dlm/rcom.h +++ b/fs/dlm/rcom.h @@ -12,12 +12,15 @@ #ifndef __RCOM_DOT_H__ #define __RCOM_DOT_H__ -int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags); -int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); -int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); -int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); -void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); -int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); +int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags, + uint64_t seq); +int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, + int last_len, uint64_t seq); +int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq); +int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq); +void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, + int nodeid); +int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in); #endif diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index 29d71a5018d4..53917c0aa3c0 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -93,7 +93,7 @@ void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status) } static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, - int save_slots) + int save_slots, uint64_t seq) { struct dlm_rcom *rc = ls->ls_recover_buf; struct dlm_member *memb; @@ -107,7 +107,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, goto out; } - error = dlm_rcom_status(ls, memb->nodeid, 0); + error = dlm_rcom_status(ls, memb->nodeid, 0, seq); if (error) goto out; @@ -126,7 +126,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status, } static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, - uint32_t status_flags) + uint32_t status_flags, uint64_t seq) { struct dlm_rcom *rc = ls->ls_recover_buf; int error = 0, delay = 0, nodeid = ls->ls_low_nodeid; @@ -137,7 +137,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, goto out; } - error = dlm_rcom_status(ls, nodeid, status_flags); + error = dlm_rcom_status(ls, nodeid, status_flags, seq); if (error) break; @@ -151,22 +151,22 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status, return error; } -static int wait_status(struct dlm_ls *ls, uint32_t status) +static int wait_status(struct dlm_ls *ls, uint32_t status, uint64_t seq) { uint32_t status_all = status << 1; int error; if (ls->ls_low_nodeid == dlm_our_nodeid()) { - error = wait_status_all(ls, status, 0); + error = wait_status_all(ls, status, 0, seq); if (!error) dlm_set_recover_status(ls, status_all); } else - error = wait_status_low(ls, status_all, 0); + error = wait_status_low(ls, status_all, 0, seq); return error; } -int dlm_recover_members_wait(struct dlm_ls *ls) +int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq) { struct dlm_member *memb; struct dlm_slot *slots; @@ -180,7 +180,7 @@ int dlm_recover_members_wait(struct dlm_ls *ls) } if (ls->ls_low_nodeid == dlm_our_nodeid()) { - error = wait_status_all(ls, DLM_RS_NODES, 1); + error = wait_status_all(ls, DLM_RS_NODES, 1, seq); if (error) goto out; @@ -199,7 +199,8 @@ int dlm_recover_members_wait(struct dlm_ls *ls) dlm_set_recover_status(ls, DLM_RS_NODES_ALL); } } else { - error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS); + error = wait_status_low(ls, DLM_RS_NODES_ALL, + DLM_RSF_NEED_SLOTS, seq); if (error) goto out; @@ -209,19 +210,19 @@ int dlm_recover_members_wait(struct dlm_ls *ls) return error; } -int dlm_recover_directory_wait(struct dlm_ls *ls) +int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq) { - return wait_status(ls, DLM_RS_DIR); + return wait_status(ls, DLM_RS_DIR, seq); } -int dlm_recover_locks_wait(struct dlm_ls *ls) +int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq) { - return wait_status(ls, DLM_RS_LOCKS); + return wait_status(ls, DLM_RS_LOCKS, seq); } -int dlm_recover_done_wait(struct dlm_ls *ls) +int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq) { - return wait_status(ls, DLM_RS_DONE); + return wait_status(ls, DLM_RS_DONE, seq); } /* @@ -441,7 +442,7 @@ static void set_new_master(struct dlm_rsb *r) * equals our_nodeid below). */ -static int recover_master(struct dlm_rsb *r, unsigned int *count) +static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq) { struct dlm_ls *ls = r->res_ls; int our_nodeid, dir_nodeid; @@ -472,7 +473,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count) error = 0; } else { recover_idr_add(r); - error = dlm_send_rcom_lookup(r, dir_nodeid); + error = dlm_send_rcom_lookup(r, dir_nodeid, seq); } (*count)++; @@ -520,7 +521,7 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count) * the correct dir node. */ -int dlm_recover_masters(struct dlm_ls *ls) +int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq) { struct dlm_rsb *r; unsigned int total = 0; @@ -542,7 +543,7 @@ int dlm_recover_masters(struct dlm_ls *ls) if (nodir) error = recover_master_static(r, &count); else - error = recover_master(r, &count); + error = recover_master(r, &count, seq); unlock_rsb(r); cond_resched(); total++; @@ -563,7 +564,7 @@ int dlm_recover_masters(struct dlm_ls *ls) return error; } -int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) +int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc) { struct dlm_rsb *r; int ret_nodeid, new_master; @@ -614,13 +615,14 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) * an equal number of replies then recovery for the rsb is done */ -static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head) +static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head, + uint64_t seq) { struct dlm_lkb *lkb; int error = 0; list_for_each_entry(lkb, head, lkb_statequeue) { - error = dlm_send_rcom_lock(r, lkb); + error = dlm_send_rcom_lock(r, lkb, seq); if (error) break; r->res_recover_locks_count++; @@ -629,7 +631,7 @@ static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head) return error; } -static int recover_locks(struct dlm_rsb *r) +static int recover_locks(struct dlm_rsb *r, uint64_t seq) { int error = 0; @@ -637,13 +639,13 @@ static int recover_locks(struct dlm_rsb *r) DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r);); - error = recover_locks_queue(r, &r->res_grantqueue); + error = recover_locks_queue(r, &r->res_grantqueue, seq); if (error) goto out; - error = recover_locks_queue(r, &r->res_convertqueue); + error = recover_locks_queue(r, &r->res_convertqueue, seq); if (error) goto out; - error = recover_locks_queue(r, &r->res_waitqueue); + error = recover_locks_queue(r, &r->res_waitqueue, seq); if (error) goto out; @@ -656,7 +658,7 @@ static int recover_locks(struct dlm_rsb *r) return error; } -int dlm_recover_locks(struct dlm_ls *ls) +int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq) { struct dlm_rsb *r; int error, count = 0; @@ -677,7 +679,7 @@ int dlm_recover_locks(struct dlm_ls *ls) goto out; } - error = recover_locks(r); + error = recover_locks(r, seq); if (error) { up_read(&ls->ls_root_sem); goto out; diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h index 235e0d25cd48..dbc51013ecad 100644 --- a/fs/dlm/recover.h +++ b/fs/dlm/recover.h @@ -15,13 +15,13 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)); uint32_t dlm_recover_status(struct dlm_ls *ls); void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status); -int dlm_recover_members_wait(struct dlm_ls *ls); -int dlm_recover_directory_wait(struct dlm_ls *ls); -int dlm_recover_locks_wait(struct dlm_ls *ls); -int dlm_recover_done_wait(struct dlm_ls *ls); -int dlm_recover_masters(struct dlm_ls *ls); -int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc); -int dlm_recover_locks(struct dlm_ls *ls); +int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq); +int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc); +int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq); void dlm_recovered_lock(struct dlm_rsb *r); int dlm_create_root_list(struct dlm_ls *ls); void dlm_release_root_list(struct dlm_ls *ls); diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 19da816cfb09..4d17491dea2f 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -90,7 +90,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_set_recover_status(ls, DLM_RS_NODES); - error = dlm_recover_members_wait(ls); + error = dlm_recover_members_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_members_wait error %d", error); goto fail; @@ -103,7 +103,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * nodes their master rsb names that hash to us. */ - error = dlm_recover_directory(ls); + error = dlm_recover_directory(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory error %d", error); goto fail; @@ -111,7 +111,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_set_recover_status(ls, DLM_RS_DIR); - error = dlm_recover_directory_wait(ls); + error = dlm_recover_directory_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory_wait error %d", error); goto fail; @@ -145,7 +145,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * departed nodes. */ - error = dlm_recover_masters(ls); + error = dlm_recover_masters(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_masters error %d", error); goto fail; @@ -155,7 +155,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) * Send our locks on remastered rsb's to the new masters. */ - error = dlm_recover_locks(ls); + error = dlm_recover_locks(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks error %d", error); goto fail; @@ -163,7 +163,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_set_recover_status(ls, DLM_RS_LOCKS); - error = dlm_recover_locks_wait(ls); + error = dlm_recover_locks_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks_wait error %d", error); goto fail; @@ -187,7 +187,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) */ dlm_set_recover_status(ls, DLM_RS_LOCKS); - error = dlm_recover_locks_wait(ls); + error = dlm_recover_locks_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_locks_wait error %d", error); goto fail; @@ -206,7 +206,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_set_recover_status(ls, DLM_RS_DONE); - error = dlm_recover_done_wait(ls); + error = dlm_recover_done_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_done_wait error %d", error); goto fail; diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c index 8be2893ad15b..892d6ca21e74 100644 --- a/fs/dlm/requestqueue.c +++ b/fs/dlm/requestqueue.c @@ -30,7 +30,8 @@ struct rq_entry { * lockspace is enabled on some while still suspended on others. */ -void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms) +void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, + const struct dlm_message *ms) { struct rq_entry *e; int length = le16_to_cpu(ms->m_header.h_length) - diff --git a/fs/dlm/requestqueue.h b/fs/dlm/requestqueue.h index 4e403469a845..42bfe23ceabe 100644 --- a/fs/dlm/requestqueue.h +++ b/fs/dlm/requestqueue.h @@ -11,7 +11,8 @@ #ifndef __REQUESTQUEUE_DOT_H__ #define __REQUESTQUEUE_DOT_H__ -void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms); +void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, + const struct dlm_message *ms); int dlm_process_requestqueue(struct dlm_ls *ls); void dlm_wait_requestqueue(struct dlm_ls *ls); void dlm_purge_requestqueue(struct dlm_ls *ls); diff --git a/fs/gfs2/file.c b/fs/gfs2/file.c index 34cd57c6b68d..f2700477a300 100644 --- a/fs/gfs2/file.c +++ b/fs/gfs2/file.c @@ -1436,17 +1436,14 @@ static int gfs2_lock(struct file *file, int cmd, struct file_lock *fl) if (!(fl->fl_flags & FL_POSIX)) return -ENOLCK; - if (cmd == F_CANCELLK) { - /* Hack: */ - cmd = F_SETLK; - fl->fl_type = F_UNLCK; - } if (unlikely(gfs2_withdrawn(sdp))) { if (fl->fl_type == F_UNLCK) locks_lock_file_wait(file, fl); return -EIO; } - if (IS_GETLK(cmd)) + if (cmd == F_CANCELLK) + return dlm_posix_cancel(ls->ls_dlm, ip->i_no_addr, file, fl); + else if (IS_GETLK(cmd)) return dlm_posix_get(ls->ls_dlm, ip->i_no_addr, file, fl); else if (fl->fl_type == F_UNLCK) return dlm_posix_unlock(ls->ls_dlm, ip->i_no_addr, file, fl); diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c index 05d4414d0c33..9b76ee66aeb2 100644 --- a/fs/ocfs2/stack_user.c +++ b/fs/ocfs2/stack_user.c @@ -738,18 +738,11 @@ static int user_plock(struct ocfs2_cluster_connection *conn, * * Internally, fs/dlm will pass these to a misc device, which * a userspace daemon will read and write to. - * - * For now, cancel requests (which happen internally only), - * are turned into unlocks. Most of this function taken from - * gfs2_lock. */ - if (cmd == F_CANCELLK) { - cmd = F_SETLK; - fl->fl_type = F_UNLCK; - } - - if (IS_GETLK(cmd)) + if (cmd == F_CANCELLK) + return dlm_posix_cancel(conn->cc_lockspace, ino, file, fl); + else if (IS_GETLK(cmd)) return dlm_posix_get(conn->cc_lockspace, ino, file, fl); else if (fl->fl_type == F_UNLCK) return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); diff --git a/include/linux/dlm_plock.h b/include/linux/dlm_plock.h index e6d76e8715a6..15fc856d198c 100644 --- a/include/linux/dlm_plock.h +++ b/include/linux/dlm_plock.h @@ -11,6 +11,8 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file, int cmd, struct file_lock *fl); int dlm_posix_unlock(dlm_lockspace_t *lockspace, u64 number, struct file *file, struct file_lock *fl); +int dlm_posix_cancel(dlm_lockspace_t *lockspace, u64 number, struct file *file, + struct file_lock *fl); int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file, struct file_lock *fl); #endif diff --git a/include/trace/events/dlm.h b/include/trace/events/dlm.h index 2b09574e1243..c1a146f9fc91 100644 --- a/include/trace/events/dlm.h +++ b/include/trace/events/dlm.h @@ -7,6 +7,7 @@ #include #include +#include #include #include "../../../fs/dlm/dlm_internal.h" @@ -585,6 +586,56 @@ TRACE_EVENT(dlm_recv_message, ); +DECLARE_EVENT_CLASS(dlm_plock_template, + + TP_PROTO(const struct dlm_plock_info *info), + + TP_ARGS(info), + + TP_STRUCT__entry( + __field(uint8_t, optype) + __field(uint8_t, ex) + __field(uint8_t, wait) + __field(uint8_t, flags) + __field(uint32_t, pid) + __field(int32_t, nodeid) + __field(int32_t, rv) + __field(uint32_t, fsid) + __field(uint64_t, number) + __field(uint64_t, start) + __field(uint64_t, end) + __field(uint64_t, owner) + ), + + TP_fast_assign( + __entry->optype = info->optype; + __entry->ex = info->ex; + __entry->wait = info->wait; + __entry->flags = info->flags; + __entry->pid = info->pid; + __entry->nodeid = info->nodeid; + __entry->rv = info->rv; + __entry->fsid = info->fsid; + __entry->number = info->number; + __entry->start = info->start; + __entry->end = info->end; + __entry->owner = info->owner; + ), + + TP_printk("fsid=%u number=%llx owner=%llx optype=%d ex=%d wait=%d flags=%x pid=%u nodeid=%d rv=%d start=%llx end=%llx", + __entry->fsid, __entry->number, __entry->owner, + __entry->optype, __entry->ex, __entry->wait, + __entry->flags, __entry->pid, __entry->nodeid, + __entry->rv, __entry->start, __entry->end) + +); + +DEFINE_EVENT(dlm_plock_template, dlm_plock_read, + TP_PROTO(const struct dlm_plock_info *info), TP_ARGS(info)); + +DEFINE_EVENT(dlm_plock_template, dlm_plock_write, + TP_PROTO(const struct dlm_plock_info *info), TP_ARGS(info)); + TRACE_EVENT(dlm_send, TP_PROTO(int nodeid, int ret), diff --git a/include/uapi/linux/dlm_plock.h b/include/uapi/linux/dlm_plock.h index 63b6c1fd9169..eb66afcac40e 100644 --- a/include/uapi/linux/dlm_plock.h +++ b/include/uapi/linux/dlm_plock.h @@ -22,6 +22,7 @@ enum { DLM_PLOCK_OP_LOCK = 1, DLM_PLOCK_OP_UNLOCK, DLM_PLOCK_OP_GET, + DLM_PLOCK_OP_CANCEL, }; #define DLM_PLOCK_FL_CLOSE 1