bcachefs: Assorted journal refactoring

Improved the way we track various state by adding j->err_seq, which
records the first journal sequence number that encountered an error
being written, and j->last_empty_seq, which records the most recent
journal entry that was completely empty.

Also, use the low bits of the journal sequence number to index the
corresponding journal_buf.

Signed-off-by: Kent Overstreet <kent.overstreet@gmail.com>
Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2020-11-14 16:04:30 -05:00 committed by Kent Overstreet
parent 1676a398d3
commit 158eecb88e
4 changed files with 67 additions and 68 deletions

View File

@ -17,7 +17,19 @@
#include "super-io.h"
#include "trace.h"
static inline struct journal_buf *journal_seq_to_buf(struct journal *, u64);
static u64 last_unwritten_seq(struct journal *j)
{
union journal_res_state s = READ_ONCE(j->reservations);
lockdep_assert_held(&j->lock);
return journal_cur_seq(j) - s.prev_buf_unwritten;
}
static inline bool journal_seq_unwritten(struct journal *j, u64 seq)
{
return seq >= last_unwritten_seq(j);
}
static bool __journal_entry_is_open(union journal_res_state state)
{
@ -29,6 +41,22 @@ static bool journal_entry_is_open(struct journal *j)
return __journal_entry_is_open(j->reservations);
}
static inline struct journal_buf *
journal_seq_to_buf(struct journal *j, u64 seq)
{
struct journal_buf *buf = NULL;
EBUG_ON(seq > journal_cur_seq(j));
EBUG_ON(seq == journal_cur_seq(j) &&
j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL);
if (journal_seq_unwritten(j, seq)) {
buf = j->buf + (seq & 1);
EBUG_ON(le64_to_cpu(buf->data->seq) != seq);
}
return buf;
}
static void journal_pin_new_entry(struct journal *j, int count)
{
struct journal_entry_pin_list *p;
@ -50,6 +78,8 @@ static void bch2_journal_buf_init(struct journal *j)
{
struct journal_buf *buf = journal_cur_buf(j);
bkey_extent_init(&buf->key);
memset(buf->has_inode, 0, sizeof(buf->has_inode));
memset(buf->data, 0, sizeof(*buf->data));
@ -71,6 +101,7 @@ void bch2_journal_halt(struct journal *j)
} while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v);
j->err_seq = journal_cur_seq(j);
journal_wake(j);
closure_wake_up(&journal_cur_buf(j)->wait);
}
@ -138,8 +169,6 @@ static bool __journal_entry_close(struct journal *j)
BUG_ON(sectors > buf->sectors);
buf->sectors = sectors;
bkey_extent_init(&buf->key);
/*
* We have to set last_seq here, _before_ opening a new journal entry:
*
@ -161,11 +190,6 @@ static bool __journal_entry_close(struct journal *j)
*/
buf->data->last_seq = cpu_to_le64(journal_last_seq(j));
if (journal_entry_empty(buf->data))
clear_bit(JOURNAL_NOT_EMPTY, &j->flags);
else
set_bit(JOURNAL_NOT_EMPTY, &j->flags);
journal_pin_new_entry(j, 1);
bch2_journal_buf_init(j);
@ -502,49 +526,28 @@ void bch2_journal_entry_res_resize(struct journal *j,
/* journal flushing: */
static int journal_seq_error(struct journal *j, u64 seq)
{
union journal_res_state state = READ_ONCE(j->reservations);
if (seq == journal_cur_seq(j))
return bch2_journal_error(j);
if (seq + 1 == journal_cur_seq(j) &&
!state.prev_buf_unwritten &&
seq > j->seq_ondisk)
return -EIO;
return 0;
}
static inline struct journal_buf *
journal_seq_to_buf(struct journal *j, u64 seq)
{
/* seq should be for a journal entry that has been opened: */
BUG_ON(seq > journal_cur_seq(j));
BUG_ON(seq == journal_cur_seq(j) &&
j->reservations.cur_entry_offset == JOURNAL_ENTRY_CLOSED_VAL);
if (seq == journal_cur_seq(j))
return journal_cur_buf(j);
if (seq + 1 == journal_cur_seq(j) &&
j->reservations.prev_buf_unwritten)
return journal_prev_buf(j);
return NULL;
}
/**
* bch2_journal_flush_seq_async - wait for a journal entry to be written
*
* like bch2_journal_wait_on_seq, except that it triggers a write immediately if
* necessary
*/
void bch2_journal_flush_seq_async(struct journal *j, u64 seq,
int bch2_journal_flush_seq_async(struct journal *j, u64 seq,
struct closure *parent)
{
struct journal_buf *buf;
int ret = 0;
spin_lock(&j->lock);
if (seq <= j->err_seq) {
ret = -EIO;
goto out;
}
if (seq <= j->seq_ondisk) {
ret = 1;
goto out;
}
if (parent &&
(buf = journal_seq_to_buf(j, seq)))
@ -553,20 +556,8 @@ void bch2_journal_flush_seq_async(struct journal *j, u64 seq,
if (seq == journal_cur_seq(j))
__journal_entry_close(j);
out:
spin_unlock(&j->lock);
}
static int journal_seq_flushed(struct journal *j, u64 seq)
{
int ret;
spin_lock(&j->lock);
ret = seq <= j->seq_ondisk ? 1 : journal_seq_error(j, seq);
if (seq == journal_cur_seq(j))
__journal_entry_close(j);
spin_unlock(&j->lock);
return ret;
}
@ -575,7 +566,7 @@ int bch2_journal_flush_seq(struct journal *j, u64 seq)
u64 start_time = local_clock();
int ret, ret2;
ret = wait_event_killable(j->wait, (ret2 = journal_seq_flushed(j, seq)));
ret = wait_event_killable(j->wait, (ret2 = bch2_journal_flush_seq_async(j, seq, NULL)));
bch2_time_stats_update(j->flush_seq_time, start_time);
@ -876,7 +867,8 @@ void bch2_fs_journal_stop(struct journal *j)
journal_quiesce(j);
BUG_ON(!bch2_journal_error(j) &&
test_bit(JOURNAL_NOT_EMPTY, &j->flags));
(journal_entry_is_open(j) ||
j->last_empty_seq + 1 != journal_cur_seq(j)));
cancel_delayed_work_sync(&j->write_work);
cancel_delayed_work_sync(&j->reclaim_work);
@ -934,6 +926,9 @@ int bch2_fs_journal_start(struct journal *j, u64 cur_seq,
set_bit(JOURNAL_STARTED, &j->flags);
journal_pin_new_entry(j, 1);
j->reservations.idx = journal_cur_seq(j);
bch2_journal_buf_init(j);
c->last_bucket_seq_cleanup = journal_cur_seq(j);

View File

@ -466,7 +466,7 @@ void bch2_journal_entry_res_resize(struct journal *,
struct journal_entry_res *,
unsigned);
void bch2_journal_flush_seq_async(struct journal *, u64, struct closure *);
int bch2_journal_flush_seq_async(struct journal *, u64, struct closure *);
void bch2_journal_flush_async(struct journal *, struct closure *);
int bch2_journal_flush_seq(struct journal *, u64);

View File

@ -944,24 +944,29 @@ static void journal_write_done(struct closure *cl)
struct bch_replicas_padded replicas;
u64 seq = le64_to_cpu(w->data->seq);
u64 last_seq = le64_to_cpu(w->data->last_seq);
int err = 0;
bch2_time_stats_update(j->write_time, j->write_start_time);
if (!devs.nr) {
bch_err(c, "unable to write journal to sufficient devices");
goto err;
err = -EIO;
} else {
bch2_devlist_to_replicas(&replicas.e, BCH_DATA_journal, devs);
if (bch2_mark_replicas(c, &replicas.e))
err = -EIO;
}
bch2_devlist_to_replicas(&replicas.e, BCH_DATA_journal, devs);
if (bch2_mark_replicas(c, &replicas.e))
goto err;
if (err)
bch2_fatal_error(c);
spin_lock(&j->lock);
if (seq >= j->pin.front)
journal_seq_pin(j, seq)->devs = devs;
j->seq_ondisk = seq;
if (err && (!j->err_seq || seq < j->err_seq))
j->err_seq = seq;
j->last_seq_ondisk = last_seq;
bch2_journal_space_available(j);
@ -973,7 +978,7 @@ static void journal_write_done(struct closure *cl)
* bch2_fs_journal_stop():
*/
mod_delayed_work(c->journal_reclaim_wq, &j->reclaim_work, 0);
out:
/* also must come before signalling write completion: */
closure_debug_destroy(cl);
@ -987,11 +992,6 @@ static void journal_write_done(struct closure *cl)
if (test_bit(JOURNAL_NEED_WRITE, &j->flags))
mod_delayed_work(system_freezable_wq, &j->write_work, 0);
spin_unlock(&j->lock);
return;
err:
bch2_fatal_error(c);
spin_lock(&j->lock);
goto out;
}
static void journal_write_endio(struct bio *bio)
@ -1072,6 +1072,9 @@ void bch2_journal_write(struct closure *cl)
SET_JSET_BIG_ENDIAN(jset, CPU_BIG_ENDIAN);
SET_JSET_CSUM_TYPE(jset, bch2_meta_checksum_type(c));
if (journal_entry_empty(jset))
j->last_empty_seq = le64_to_cpu(jset->seq);
if (bch2_csum_type_is_encryption(JSET_CSUM_TYPE(jset)))
validate_before_checksum = true;

View File

@ -127,7 +127,6 @@ enum {
JOURNAL_STARTED,
JOURNAL_RECLAIM_STARTED,
JOURNAL_NEED_WRITE,
JOURNAL_NOT_EMPTY,
JOURNAL_MAY_GET_UNRESERVED,
};
@ -181,6 +180,8 @@ struct journal {
/* seq, last_seq from the most recent journal entry successfully written */
u64 seq_ondisk;
u64 last_seq_ondisk;
u64 err_seq;
u64 last_empty_seq;
/*
* FIFO of journal entries whose btree updates have not yet been