diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index f6acba4310a7..10753486049a 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -216,16 +216,13 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name, if (!rv) return r; - down_read(&ls->ls_root_sem); - list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + list_for_each_entry(r, &ls->ls_masters_list, res_masters_list) { if (len == r->res_length && !memcmp(name, r->res_name, len)) { - up_read(&ls->ls_root_sem); log_debug(ls, "find_rsb_root revert to root_list %s", r->res_name); return r; } } - up_read(&ls->ls_root_sem); return NULL; } @@ -241,7 +238,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, int offset = 0, dir_nodeid; __be16 be_namelen; - down_read(&ls->ls_root_sem); + read_lock(&ls->ls_masters_lock); if (inlen > 1) { r = find_rsb_root(ls, inbuf, inlen); @@ -250,16 +247,13 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, nodeid, inlen, inlen, inbuf); goto out; } - list = r->res_root_list.next; + list = r->res_masters_list.next; } else { - list = ls->ls_root_list.next; + list = ls->ls_masters_list.next; } - for (offset = 0; list != &ls->ls_root_list; list = list->next) { - r = list_entry(list, struct dlm_rsb, res_root_list); - if (r->res_nodeid) - continue; - + for (offset = 0; list != &ls->ls_masters_list; list = list->next) { + r = list_entry(list, struct dlm_rsb, res_masters_list); dir_nodeid = dlm_dir_nodeid(r); if (dir_nodeid != nodeid) continue; @@ -294,7 +288,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, * terminating record. */ - if ((list == &ls->ls_root_list) && + if ((list == &ls->ls_masters_list) && (offset + sizeof(uint16_t) <= outlen)) { be_namelen = cpu_to_be16(0xFFFF); memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); @@ -302,6 +296,6 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, ls->ls_recover_dir_sent_msg++; } out: - up_read(&ls->ls_root_sem); + read_unlock(&ls->ls_masters_lock); } diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 1d2ee5c2d23d..3524f2b33f2c 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -342,6 +342,7 @@ struct dlm_rsb { struct list_head res_waitqueue; struct list_head res_root_list; /* used for recovery */ + struct list_head res_masters_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */ int res_recover_locks_count; @@ -675,6 +676,8 @@ struct dlm_ls { struct list_head ls_root_list; /* root resources */ struct rw_semaphore ls_root_sem; /* protect root_list */ + struct list_head ls_masters_list; /* root resources */ + rwlock_t ls_masters_lock; /* protect root_list */ const struct dlm_lockspace_ops *ls_ops; void *ls_ops_arg; diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index d87464614bc5..e0ab7432ca4d 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -423,6 +423,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); INIT_LIST_HEAD(&r->res_recover_list); + INIT_LIST_HEAD(&r->res_masters_list); *r_ret = r; return 0; @@ -1168,6 +1169,7 @@ static void kill_rsb(struct kref *kref) DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); } /* Attaching/detaching lkb's from rsb's is for rsb reference counting. diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 0455dddb0797..c427c76b5f07 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -582,6 +582,8 @@ static int new_lockspace(const char *name, const char *cluster, init_waitqueue_head(&ls->ls_wait_general); INIT_LIST_HEAD(&ls->ls_root_list); init_rwsem(&ls->ls_root_sem); + INIT_LIST_HEAD(&ls->ls_masters_list); + rwlock_init(&ls->ls_masters_lock); spin_lock(&lslist_lock); ls->ls_create_count = 1; diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 8eb42554ccb0..dfce8fc6a783 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -20,6 +20,48 @@ #include "requestqueue.h" #include "recoverd.h" +static int dlm_create_masters_list(struct dlm_ls *ls) +{ + struct rb_node *n; + struct dlm_rsb *r; + int i, error = 0; + + write_lock(&ls->ls_masters_lock); + if (!list_empty(&ls->ls_masters_list)) { + log_error(ls, "root list not empty"); + error = -EINVAL; + goto out; + } + + for (i = 0; i < ls->ls_rsbtbl_size; i++) { + spin_lock_bh(&ls->ls_rsbtbl[i].lock); + for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { + r = rb_entry(n, struct dlm_rsb, res_hashnode); + if (r->res_nodeid) + continue; + + list_add(&r->res_masters_list, &ls->ls_masters_list); + dlm_hold_rsb(r); + } + spin_unlock_bh(&ls->ls_rsbtbl[i].lock); + } + out: + write_unlock(&ls->ls_masters_lock); + return error; +} + +static void dlm_release_masters_list(struct dlm_ls *ls) +{ + struct dlm_rsb *r, *safe; + + write_lock(&ls->ls_masters_lock); + list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) { + list_del_init(&r->res_masters_list); + dlm_put_rsb(r); + } + write_unlock(&ls->ls_masters_lock); +} + static void dlm_create_root_list(struct dlm_ls *ls) { struct rb_node *n; @@ -123,6 +165,23 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_recover_dir_nodeid(ls); + /* Create a snapshot of all active rsbs were we are the master of. + * During the barrier between dlm_recover_members_wait() and + * dlm_recover_directory() other nodes can dump their necessary + * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom + * communication dlm_copy_master_names() handling. + * + * TODO We should create a per lockspace list that contains rsbs + * that we are the master of. Instead of creating this list while + * recovery we keep track of those rsbs while locking handling and + * recovery can use it when necessary. + */ + error = dlm_create_masters_list(ls); + if (error) { + log_rinfo(ls, "dlm_create_masters_list error %d", error); + goto fail; + } + ls->ls_recover_dir_sent_res = 0; ls->ls_recover_dir_sent_msg = 0; ls->ls_recover_locks_in = 0; @@ -132,6 +191,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_members_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_members_wait error %d", error); + dlm_release_masters_list(ls); goto fail; } @@ -145,6 +205,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory error %d", error); + dlm_release_masters_list(ls); goto fail; } @@ -153,9 +214,12 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) error = dlm_recover_directory_wait(ls, rv->seq); if (error) { log_rinfo(ls, "dlm_recover_directory_wait error %d", error); + dlm_release_masters_list(ls); goto fail; } + dlm_release_masters_list(ls); + log_rinfo(ls, "dlm_recover_directory %u out %u messages", ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);