diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c index 6ab3ed4074c6..7112958c2e5b 100644 --- a/fs/dlm/debug_fs.c +++ b/fs/dlm/debug_fs.c @@ -380,7 +380,7 @@ static const struct seq_operations format4_seq_ops; static int table_seq_show(struct seq_file *seq, void *iter_ptr) { - struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_rsbs_list); + struct dlm_rsb *rsb = list_entry(iter_ptr, struct dlm_rsb, res_slow_list); if (seq->op == &format1_seq_ops) print_format1(rsb, seq); @@ -409,9 +409,9 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos) } if (seq->op == &format4_seq_ops) - list = &ls->ls_toss; + list = &ls->ls_slow_inactive; else - list = &ls->ls_keep; + list = &ls->ls_slow_active; read_lock_bh(&ls->ls_rsbtbl_lock); return seq_list_start(list, *pos); @@ -423,9 +423,9 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos) struct list_head *list; if (seq->op == &format4_seq_ops) - list = &ls->ls_toss; + list = &ls->ls_slow_inactive; else - list = &ls->ls_keep; + list = &ls->ls_slow_active; return seq_list_next(iter_ptr, list, pos); } diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 9e68e68bf0cf..818484315906 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -327,11 +327,11 @@ struct dlm_rsb { struct list_head res_convertqueue; struct list_head res_waitqueue; - struct list_head res_rsbs_list; + struct list_head res_slow_list; /* ls_slow_* */ + struct list_head res_scan_list; struct list_head res_root_list; /* used for recovery */ struct list_head res_masters_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */ - struct list_head res_toss_q_list; int res_recover_locks_count; char *res_lvbptr; @@ -365,7 +365,7 @@ enum rsb_flags { RSB_RECOVER_CONVERT, RSB_RECOVER_GRANT, RSB_RECOVER_LVB_INVAL, - RSB_TOSS, + RSB_INACTIVE, }; static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) @@ -572,20 +572,16 @@ struct dlm_ls { struct xarray ls_lkbxa; rwlock_t ls_lkbxa_lock; + /* an rsb is on rsbtl for primary locking functions, + and on a slow list for recovery/dump iteration */ struct rhashtable ls_rsbtbl; - rwlock_t ls_rsbtbl_lock; + rwlock_t ls_rsbtbl_lock; /* for ls_rsbtbl and ls_slow */ + struct list_head ls_slow_inactive; /* to iterate rsbtbl */ + struct list_head ls_slow_active; /* to iterate rsbtbl */ - struct list_head ls_toss; - struct list_head ls_keep; - - struct timer_list ls_timer; - /* this queue is ordered according the - * absolute res_toss_time jiffies time - * to mod_timer() with the first element - * if necessary. - */ - struct list_head ls_toss_q; - spinlock_t ls_toss_q_lock; + struct timer_list ls_scan_timer; /* based on first scan_list rsb toss_time */ + struct list_head ls_scan_list; /* rsbs ordered by res_toss_time */ + spinlock_t ls_scan_lock; spinlock_t ls_waiters_lock; struct list_head ls_waiters; /* lkbs needing a reply */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index a29de48849ef..f5f2ceab5a04 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -89,7 +89,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, const struct dlm_message *ms, bool local); static int receive_extralen(const struct dlm_message *ms); static void do_purge(struct dlm_ls *ls, int nodeid, int pid); -static void toss_rsb(struct kref *kref); +static void deactivate_rsb(struct kref *kref); /* * Lock compatibilty matrix - thanks Steve @@ -330,8 +330,8 @@ static inline unsigned long rsb_toss_jiffies(void) static inline void hold_rsb(struct dlm_rsb *r) { - /* rsbs in toss state never get referenced */ - WARN_ON(rsb_flag(r, RSB_TOSS)); + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); kref_get(&r->res_ref); } @@ -370,15 +370,12 @@ static inline int dlm_kref_put_write_lock_bh(struct kref *kref, return 0; } -/* When all references to the rsb are gone it's transferred to - the tossed list for later disposal. */ - static void put_rsb(struct dlm_rsb *r) { struct dlm_ls *ls = r->res_ls; int rv; - rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb, + rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb, &ls->ls_rsbtbl_lock); if (rv) write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -391,48 +388,49 @@ void dlm_put_rsb(struct dlm_rsb *r) /* connected with timer_delete_sync() in dlm_ls_stop() to stop * new timers when recovery is triggered and don't run them - * again until a dlm_timer_resume() tries it again. + * again until a resume_scan_timer() tries it again. */ -static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies) +static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies) { if (!dlm_locking_stopped(ls)) - mod_timer(&ls->ls_timer, jiffies); + mod_timer(&ls->ls_scan_timer, jiffies); } /* This function tries to resume the timer callback if a rsb - * is on the toss list and no timer is pending. It might that + * is on the scan list and no timer is pending. It might that * the first entry is on currently executed as timer callback * but we don't care if a timer queued up again and does * nothing. Should be a rare case. */ -void dlm_timer_resume(struct dlm_ls *ls) +void resume_scan_timer(struct dlm_ls *ls) { struct dlm_rsb *r; - spin_lock_bh(&ls->ls_toss_q_lock); - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - if (r && !timer_pending(&ls->ls_timer)) - __rsb_mod_timer(ls, r->res_toss_time); - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_lock_bh(&ls->ls_scan_lock); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + if (r && !timer_pending(&ls->ls_scan_timer)) + enable_scan_timer(ls, r->res_toss_time); + spin_unlock_bh(&ls->ls_scan_lock); } -/* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */ -static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) +/* ls_rsbtbl_lock must be held */ + +static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r) { struct dlm_rsb *first; - spin_lock_bh(&ls->ls_toss_q_lock); + spin_lock_bh(&ls->ls_scan_lock); r->res_toss_time = 0; /* if the rsb is not queued do nothing */ - if (list_empty(&r->res_toss_q_list)) + if (list_empty(&r->res_scan_list)) goto out; /* get the first element before delete */ - first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - list_del_init(&r->res_toss_q_list); + first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_del_init(&r->res_scan_list); /* check if the first element was the rsb we deleted */ if (first == r) { /* try to get the new first element, if the list @@ -442,23 +440,19 @@ static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) * if the list isn't empty and a new first element got * in place, set the new timer expire time. */ - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); if (!first) - timer_delete(&ls->ls_timer); + timer_delete(&ls->ls_scan_timer); else - __rsb_mod_timer(ls, first->res_toss_time); + enable_scan_timer(ls, first->res_toss_time); } out: - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_unlock_bh(&ls->ls_scan_lock); } -/* Caller must held ls_rsbtbl_lock and need to be called every time - * when either the rsb enters toss state or the toss state changes - * the dir/master nodeid. - */ -static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) +static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r) { int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *first; @@ -471,25 +465,25 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) if (!dlm_no_directory(ls) && (r->res_master_nodeid != our_nodeid) && (dlm_dir_nodeid(r) == our_nodeid)) { - rsb_delete_toss_timer(ls, r); + del_scan(ls, r); return; } - spin_lock_bh(&ls->ls_toss_q_lock); + spin_lock_bh(&ls->ls_scan_lock); /* set the new rsb absolute expire time in the rsb */ r->res_toss_time = rsb_toss_jiffies(); - if (list_empty(&ls->ls_toss_q)) { + if (list_empty(&ls->ls_scan_list)) { /* if the queue is empty add the element and it's * our new expire time */ - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); - __rsb_mod_timer(ls, r->res_toss_time); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); + enable_scan_timer(ls, r->res_toss_time); } else { /* check if the rsb was already queued, if so delete * it from the toss queue */ - if (!list_empty(&r->res_toss_q_list)) - list_del(&r->res_toss_q_list); + if (!list_empty(&r->res_scan_list)) + list_del(&r->res_scan_list); /* try to get the maybe new first element and then add * to this rsb with the oldest expire time to the end @@ -497,15 +491,15 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) * rsb expire time is our next expiration if it wasn't * the now new first elemet is our new expiration time */ - first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); - list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); + first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); + list_add_tail(&r->res_scan_list, &ls->ls_scan_list); if (!first) - __rsb_mod_timer(ls, r->res_toss_time); + enable_scan_timer(ls, r->res_toss_time); else - __rsb_mod_timer(ls, first->res_toss_time); + enable_scan_timer(ls, first->res_toss_time); } - spin_unlock_bh(&ls->ls_toss_q_lock); + spin_unlock_bh(&ls->ls_scan_lock); } /* if we hit contention we do in 250 ms a retry to trylock. @@ -515,9 +509,11 @@ static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) */ #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) -void dlm_rsb_toss_timer(struct timer_list *timer) +/* Called by lockspace scan_timer to free unused rsb's. */ + +void dlm_rsb_scan(struct timer_list *timer) { - struct dlm_ls *ls = from_timer(ls, timer, ls_timer); + struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer); int our_nodeid = dlm_our_nodeid(); struct dlm_rsb *r; int rv; @@ -525,76 +521,62 @@ void dlm_rsb_toss_timer(struct timer_list *timer) while (1) { /* interrupting point to leave iteration when * recovery waits for timer_delete_sync(), recovery - * will take care to delete everything in toss queue. + * will take care to delete everything in scan list. */ if (dlm_locking_stopped(ls)) break; - rv = spin_trylock(&ls->ls_toss_q_lock); + rv = spin_trylock(&ls->ls_scan_lock); if (!rv) { /* rearm again try timer */ - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break; } - r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, - res_toss_q_list); + r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb, + res_scan_list); if (!r) { - /* nothing to do anymore next rsb queue will - * set next mod_timer() expire. - */ - spin_unlock(&ls->ls_toss_q_lock); + /* the next add_scan will enable the timer again */ + spin_unlock(&ls->ls_scan_lock); break; } - /* test if the first rsb isn't expired yet, if - * so we stop freeing rsb from toss queue as - * the order in queue is ascending to the - * absolute res_toss_time jiffies + /* + * If the first rsb is not yet expired, then stop because the + * list is sorted with nearest expiration first. */ if (time_before(jiffies, r->res_toss_time)) { /* rearm with the next rsb to expire in the future */ - __rsb_mod_timer(ls, r->res_toss_time); - spin_unlock(&ls->ls_toss_q_lock); + enable_scan_timer(ls, r->res_toss_time); + spin_unlock(&ls->ls_scan_lock); break; } /* in find_rsb_dir/nodir there is a reverse order of this * lock, however this is only a trylock if we hit some * possible contention we try it again. - * - * This lock synchronized while holding ls_toss_q_lock - * synchronize everything that rsb_delete_toss_timer() - * or rsb_mod_timer() can't run after this timer callback - * deletes the rsb from the ls_toss_q. Whereas the other - * holders have always a priority to run as this is only - * a caching handling and the other holders might to put - * this rsb out of the toss state. */ rv = write_trylock(&ls->ls_rsbtbl_lock); if (!rv) { - spin_unlock(&ls->ls_toss_q_lock); + spin_unlock(&ls->ls_scan_lock); /* rearm again try timer */ - __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break; } - list_del(&r->res_rsbs_list); + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); - /* not necessary to held the ls_rsbtbl_lock when - * calling send_remove() - */ + /* ls_rsbtbl_lock is not needed when calling send_remove() */ write_unlock(&ls->ls_rsbtbl_lock); - /* remove the rsb out of the toss queue its gone - * drom DLM now - */ - list_del_init(&r->res_toss_q_list); - spin_unlock(&ls->ls_toss_q_lock); + list_del_init(&r->res_scan_list); + spin_unlock(&ls->ls_scan_lock); - /* no rsb in this state should ever run a timer */ + /* An rsb that is a dir record for a remote master rsb + * cannot be removed, and should not have a timer enabled. + */ WARN_ON(!dlm_no_directory(ls) && (r->res_master_nodeid != our_nodeid) && (dlm_dir_nodeid(r) == our_nodeid)); @@ -608,7 +590,7 @@ void dlm_rsb_toss_timer(struct timer_list *timer) (dlm_dir_nodeid(r) != our_nodeid)) send_remove(r); - free_toss_rsb(r); + free_inactive_rsb(r); } } @@ -635,7 +617,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); - INIT_LIST_HEAD(&r->res_toss_q_list); + INIT_LIST_HEAD(&r->res_scan_list); INIT_LIST_HEAD(&r->res_recover_list); INIT_LIST_HEAD(&r->res_masters_list); @@ -689,7 +671,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) * So, if the given rsb is on the toss list, it is moved to the keep list * before being returned. * - * toss_rsb() happens when all local usage of the rsb is done, i.e. no + * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no * more refcounts exist, so the rsb is moved from the keep list to the * toss list. * @@ -737,9 +719,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, * * If someone sends us a request, we are the dir node, and we do * not find the rsb anywhere, then recreate it. This happens if - * someone sends us a request after we have removed/freed an rsb - * from our toss list. (They sent a request instead of lookup - * because they are using an rsb from their toss list.) + * someone sends us a request after we have removed/freed an rsb. + * (They sent a request instead of lookup because they are using + * an rsb taken from their scan list.) */ if (from_local || from_dir || @@ -749,7 +731,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, retry: - /* check if the rsb is in keep state under read lock - likely path */ + /* check if the rsb is active under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) { @@ -761,9 +743,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, * rsb is active, so we can't check master_nodeid without lock_rsb. */ - if (rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_INACTIVE)) { read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_toss; + goto do_inactive; } kref_get(&r->res_ref); @@ -771,15 +753,15 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, goto out; - do_toss: + do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still in toss state - * if not it's in keep state and we relookup - unlikely path. + /* retry lookup under write lock to see if its still in inactive state + * if not it's in active state and we relookup - unlikely path. */ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; } @@ -791,14 +773,14 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, /* * rsb found inactive (master_nodeid may be out of date unless * we are the dir_nodeid or were the master) No other thread - * is using this rsb because it's on the toss list, so we can + * is using this rsb because it's inactive, so we can * look at or update res_master_nodeid without lock_rsb. */ if ((r->res_master_nodeid != our_nodeid) && from_other) { /* our rsb was not master, and another node (not the dir node) has sent us a request */ - log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", + log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s", from_nodeid, r->res_master_nodeid, dir_nodeid, r->res_name); write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -808,7 +790,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, if ((r->res_master_nodeid != our_nodeid) && from_dir) { /* don't think this should ever happen */ - log_error(ls, "find_rsb toss from_dir %d master %d", + log_error(ls, "find_rsb inactive from_dir %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); /* fix it and go on */ @@ -825,14 +807,10 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, r->res_first_lkid = 0; } - list_move(&r->res_rsbs_list, &ls->ls_keep); - rsb_clear_flag(r, RSB_TOSS); - /* rsb got out of toss state, it becomes alive again - * and we reinit the reference counter that is only - * valid for keep state rsbs - */ + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); kref_init(&r->res_ref); - rsb_delete_toss_timer(ls, r); + del_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); goto out; @@ -901,7 +879,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, dlm_free_rsb(r); goto retry; } else if (!error) { - list_add(&r->res_rsbs_list, &ls->ls_keep); + list_add(&r->res_slow_list, &ls->ls_slow_active); } write_unlock_bh(&ls->ls_rsbtbl_lock); out: @@ -924,7 +902,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, retry: - /* check if the rsb is in keep state under read lock - likely path */ + /* check if the rsb is in active state under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (error) { @@ -932,9 +910,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, goto do_new; } - if (rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_INACTIVE)) { read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_toss; + goto do_inactive; } /* @@ -947,15 +925,15 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, goto out; - do_toss: + do_inactive: write_lock_bh(&ls->ls_rsbtbl_lock); - /* retry lookup under write lock to see if its still in toss state - * if not it's in keep state and we relookup - unlikely path. + /* retry lookup under write lock to see if its still inactive. + * if it's active, repeat lookup - unlikely path. */ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry; } @@ -967,14 +945,14 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, /* * rsb found inactive. No other thread is using this rsb because - * it's on the toss list, so we can look at or update - * res_master_nodeid without lock_rsb. + * it's inactive, so we can look at or update res_master_nodeid + * without lock_rsb. */ if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { /* our rsb is not master, and another node has sent us a request; this should never happen */ - log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", + log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d", from_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -986,21 +964,17 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, (dir_nodeid == our_nodeid)) { /* our rsb is not master, and we are dir; may as well fix it; this should never happen */ - log_error(ls, "find_rsb toss our %d master %d dir %d", + log_error(ls, "find_rsb inactive our %d master %d dir %d", our_nodeid, r->res_master_nodeid, dir_nodeid); dlm_print_rsb(r); r->res_master_nodeid = our_nodeid; r->res_nodeid = 0; } - list_move(&r->res_rsbs_list, &ls->ls_keep); - rsb_clear_flag(r, RSB_TOSS); - /* rsb got out of toss state, it becomes alive again - * and we reinit the reference counter that is only - * valid for keep state rsbs - */ + list_move(&r->res_slow_list, &ls->ls_slow_active); + rsb_clear_flag(r, RSB_INACTIVE); kref_init(&r->res_ref); - rsb_delete_toss_timer(ls, r); + del_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); goto out; @@ -1031,7 +1005,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, dlm_free_rsb(r); goto retry; } else if (!error) { - list_add(&r->res_rsbs_list, &ls->ls_keep); + list_add(&r->res_slow_list, &ls->ls_slow_active); } write_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1105,7 +1079,7 @@ static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, } static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, - int from_nodeid, bool toss_list, unsigned int flags, + int from_nodeid, bool is_inactive, unsigned int flags, int *r_nodeid, int *result) { int fix_master = (flags & DLM_LU_RECOVER_MASTER); @@ -1129,9 +1103,9 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no r->res_nodeid = from_nodeid; rsb_set_flag(r, RSB_NEW_MASTER); - if (toss_list) { - /* I don't think we should ever find it on toss list. */ - log_error(ls, "%s fix_master on toss", __func__); + if (is_inactive) { + /* I don't think we should ever find it inactive. */ + log_error(ls, "%s fix_master inactive", __func__); dlm_dump_rsb(r); } } @@ -1171,7 +1145,7 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no if (!from_master && !fix_master && (r->res_master_nodeid == from_nodeid)) { /* this can happen when the master sends remove, the dir node - * finds the rsb on the keep list and ignores the remove, + * finds the rsb on the active list and ignores the remove, * and the former master sends a lookup */ @@ -1244,13 +1218,13 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, retry: - /* check if the rsb is in keep state under read lock - likely path */ + /* check if the rsb is active under read lock - likely path */ read_lock_bh(&ls->ls_rsbtbl_lock); error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (rsb_flag(r, RSB_TOSS)) { + if (rsb_flag(r, RSB_INACTIVE)) { read_unlock_bh(&ls->ls_rsbtbl_lock); - goto do_toss; + goto do_inactive; } /* because the rsb is active, we need to lock_rsb before @@ -1274,16 +1248,13 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, goto not_found; } - do_toss: + do_inactive: /* unlikely path - relookup under write */ write_lock_bh(&ls->ls_rsbtbl_lock); - /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock - * check if the rsb is still in toss state, if not relookup - */ error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); if (!error) { - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { write_unlock_bh(&ls->ls_rsbtbl_lock); /* something as changed, very unlikely but * try again @@ -1295,15 +1266,13 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, goto not_found; } - /* because the rsb is inactive (on toss list), it's not refcounted - * and lock_rsb is not used, but is protected by the rsbtbl lock - */ + /* because the rsb is inactive, it's not refcounted and lock_rsb + is not used, but is protected by the rsbtbl lock */ __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, r_nodeid, result); - rsb_mod_timer(ls, r); - /* the rsb was inactive (on toss list) */ + add_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); return 0; @@ -1317,7 +1286,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, r->res_dir_nodeid = our_nodeid; r->res_master_nodeid = from_nodeid; r->res_nodeid = from_nodeid; - rsb_set_flag(r, RSB_TOSS); + rsb_set_flag(r, RSB_INACTIVE); write_lock_bh(&ls->ls_rsbtbl_lock); error = rsb_insert(r, &ls->ls_rsbtbl); @@ -1335,8 +1304,8 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, goto retry; } - list_add(&r->res_rsbs_list, &ls->ls_toss); - rsb_mod_timer(ls, r); + list_add(&r->res_slow_list, &ls->ls_slow_inactive); + add_scan(ls, r); write_unlock_bh(&ls->ls_rsbtbl_lock); if (result) @@ -1351,7 +1320,7 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) struct dlm_rsb *r; read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (r->res_hash == hash) dlm_dump_rsb(r); } @@ -1373,15 +1342,15 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) read_unlock_bh(&ls->ls_rsbtbl_lock); } -static void toss_rsb(struct kref *kref) +static void deactivate_rsb(struct kref *kref) { struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); struct dlm_ls *ls = r->res_ls; DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); - rsb_set_flag(r, RSB_TOSS); - list_move(&r->res_rsbs_list, &ls->ls_toss); - rsb_mod_timer(ls, r); + rsb_set_flag(r, RSB_INACTIVE); + list_move(&r->res_slow_list, &ls->ls_slow_inactive); + add_scan(ls, r); if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); @@ -1395,22 +1364,22 @@ static void unhold_rsb(struct dlm_rsb *r) { int rv; - /* rsbs in toss state never get referenced */ - WARN_ON(rsb_flag(r, RSB_TOSS)); - rv = kref_put(&r->res_ref, toss_rsb); + /* inactive rsbs are not ref counted */ + WARN_ON(rsb_flag(r, RSB_INACTIVE)); + rv = kref_put(&r->res_ref, deactivate_rsb); DLM_ASSERT(!rv, dlm_dump_rsb(r);); } -void free_toss_rsb(struct dlm_rsb *r) +void free_inactive_rsb(struct dlm_rsb *r) { - WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS)); + WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE)); DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); - DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); @@ -4256,8 +4225,9 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) return; } - /* Look for name in rsb toss state, if it's there, kill it. - * If it's in non toss state, it's being used, and we should ignore this + /* + * Look for inactive rsb, if it's there, free it. + * If the rsb is active, it's being used, and we should ignore this * message. This is an expected race between the dir node sending a * request to the master node at the same time as the master node sends * a remove to the dir node. The resolution to that race is for the @@ -4280,16 +4250,18 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) return; } - if (!rsb_flag(r, RSB_TOSS)) { + if (!rsb_flag(r, RSB_INACTIVE)) { if (r->res_master_nodeid != from_nodeid) { /* should not happen */ - log_error(ls, "receive_remove keep from %d master %d", + log_error(ls, "receive_remove on active rsb from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); write_unlock_bh(&ls->ls_rsbtbl_lock); return; } + /* Ignore the remove message, see race comment above. */ + log_debug(ls, "receive_remove from %d master %d first %x %s", from_nodeid, r->res_master_nodeid, r->res_first_lkid, name); @@ -4298,19 +4270,19 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) } if (r->res_master_nodeid != from_nodeid) { - log_error(ls, "receive_remove toss from %d master %d", + log_error(ls, "receive_remove inactive from %d master %d", from_nodeid, r->res_master_nodeid); dlm_print_rsb(r); write_unlock_bh(&ls->ls_rsbtbl_lock); return; } - list_del(&r->res_rsbs_list); + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); write_unlock_bh(&ls->ls_rsbtbl_lock); - free_toss_rsb(r); + free_inactive_rsb(r); } static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) @@ -5377,7 +5349,7 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls) struct dlm_rsb *r; read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (!rsb_flag(r, RSB_RECOVER_GRANT)) continue; if (!is_master(r)) { diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index 8de9dee4c058..4ed8d36f9c6d 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -11,7 +11,6 @@ #ifndef __LOCK_DOT_H__ #define __LOCK_DOT_H__ -void dlm_rsb_toss_timer(struct timer_list *timer); void dlm_dump_rsb(struct dlm_rsb *r); void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len); void dlm_print_lkb(struct dlm_lkb *lkb); @@ -19,15 +18,15 @@ void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms, uint32_t saved_seq); void dlm_receive_buffer(const union dlm_packet *p, int nodeid); int dlm_modes_compat(int mode1, int mode2); -void free_toss_rsb(struct dlm_rsb *r); +void free_inactive_rsb(struct dlm_rsb *r); void dlm_put_rsb(struct dlm_rsb *r); void dlm_hold_rsb(struct dlm_rsb *r); int dlm_put_lkb(struct dlm_lkb *lkb); -void dlm_scan_rsbs(struct dlm_ls *ls); int dlm_lock_recovery_try(struct dlm_ls *ls); void dlm_lock_recovery(struct dlm_ls *ls); void dlm_unlock_recovery(struct dlm_ls *ls); -void dlm_timer_resume(struct dlm_ls *ls); +void dlm_rsb_scan(struct timer_list *timer); +void resume_scan_timer(struct dlm_ls *ls); int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, int len, unsigned int flags, int *r_nodeid, int *result); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 6f1078a1c715..3990880faea7 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -412,8 +412,8 @@ static int new_lockspace(const char *name, const char *cluster, */ ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); - INIT_LIST_HEAD(&ls->ls_toss); - INIT_LIST_HEAD(&ls->ls_keep); + INIT_LIST_HEAD(&ls->ls_slow_inactive); + INIT_LIST_HEAD(&ls->ls_slow_active); rwlock_init(&ls->ls_rsbtbl_lock); error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); @@ -490,10 +490,9 @@ static int new_lockspace(const char *name, const char *cluster, INIT_LIST_HEAD(&ls->ls_dir_dump_list); rwlock_init(&ls->ls_dir_dump_lock); - INIT_LIST_HEAD(&ls->ls_toss_q); - spin_lock_init(&ls->ls_toss_q_lock); - timer_setup(&ls->ls_timer, dlm_rsb_toss_timer, - TIMER_DEFERRABLE); + INIT_LIST_HEAD(&ls->ls_scan_list); + spin_lock_init(&ls->ls_scan_lock); + timer_setup(&ls->ls_scan_timer, dlm_rsb_scan, TIMER_DEFERRABLE); spin_lock_bh(&lslist_lock); ls->ls_create_count = 1; @@ -723,7 +722,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) * time_shutdown_sync(), we don't care anymore */ clear_bit(LSFL_RUNNING, &ls->ls_flags); - timer_shutdown_sync(&ls->ls_timer); + timer_shutdown_sync(&ls->ls_scan_timer); if (ls_count == 1) { dlm_clear_members(ls); diff --git a/fs/dlm/member.c b/fs/dlm/member.c index c46e306f2e5c..a7ee7fd2b9d3 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -642,7 +642,7 @@ int dlm_ls_stop(struct dlm_ls *ls) set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); if (new) - timer_delete_sync(&ls->ls_timer); + timer_delete_sync(&ls->ls_scan_timer); ls->ls_recover_seq++; /* activate requestqueue and stop processing */ diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index d156196b9e69..c7afb428a2b4 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -882,29 +882,26 @@ void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list) log_rinfo(ls, "dlm_recover_rsbs %d done", count); } -/* Create a single list of all root rsb's to be used during recovery */ - -void dlm_clear_toss(struct dlm_ls *ls) +void dlm_clear_inactive(struct dlm_ls *ls) { struct dlm_rsb *r, *safe; unsigned int count = 0; write_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) { - list_del(&r->res_rsbs_list); + list_for_each_entry_safe(r, safe, &ls->ls_slow_inactive, res_slow_list) { + list_del(&r->res_slow_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); - /* remove it from the toss queue if its part of it */ - if (!list_empty(&r->res_toss_q_list)) - list_del_init(&r->res_toss_q_list); + if (!list_empty(&r->res_scan_list)) + list_del_init(&r->res_scan_list); - free_toss_rsb(r); + free_inactive_rsb(r); count++; } write_unlock_bh(&ls->ls_rsbtbl_lock); if (count) - log_rinfo(ls, "dlm_clear_toss %u done", count); + log_rinfo(ls, "dlm_clear_inactive %u done", count); } diff --git a/fs/dlm/recover.h b/fs/dlm/recover.h index efc79a6e577d..ec69896462fb 100644 --- a/fs/dlm/recover.h +++ b/fs/dlm/recover.h @@ -25,7 +25,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, const struct dlm_rcom *rc); int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq, const struct list_head *root_list); void dlm_recovered_lock(struct dlm_rsb *r); -void dlm_clear_toss(struct dlm_ls *ls); +void dlm_clear_inactive(struct dlm_ls *ls); void dlm_recover_rsbs(struct dlm_ls *ls, const struct list_head *root_list); #endif /* __RECOVER_DOT_H__ */ diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 17a40d1e6036..34f4f9f49a6c 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -33,7 +33,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls) } read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { if (r->res_nodeid) continue; @@ -63,12 +63,12 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list) struct dlm_rsb *r; read_lock_bh(&ls->ls_rsbtbl_lock); - list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { + list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) { list_add(&r->res_root_list, root_list); dlm_hold_rsb(r); } - WARN_ON_ONCE(!list_empty(&ls->ls_toss)); + WARN_ON_ONCE(!list_empty(&ls->ls_slow_inactive)); read_unlock_bh(&ls->ls_rsbtbl_lock); } @@ -98,16 +98,16 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq) spin_lock_bh(&ls->ls_recover_lock); if (ls->ls_recover_seq == seq) { set_bit(LSFL_RUNNING, &ls->ls_flags); - /* Schedule next timer if recovery put something on toss. + /* Schedule next timer if recovery put something on inactive. * * The rsbs that was queued while recovery on toss hasn't * started yet because LSFL_RUNNING was set everything * else recovery hasn't started as well because ls_in_recovery * is still hold. So we should not run into the case that - * dlm_timer_resume() queues a timer that can occur in + * resume_scan_timer() queues a timer that can occur in * a no op. */ - dlm_timer_resume(ls); + resume_scan_timer(ls); /* unblocks processes waiting to enter the dlm */ up_write(&ls->ls_in_recovery); clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); @@ -131,7 +131,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) dlm_callback_suspend(ls); - dlm_clear_toss(ls); + dlm_clear_inactive(ls); /* * This list of root rsb's will be the basis of most of the recovery