Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: no .snap inside of snapped namespace
  libceph: fix msgr standby handling
  libceph: fix msgr keepalive flag
  libceph: fix msgr backoff
  libceph: retry after authorization failure
  libceph: fix handling of short returns from get_user_pages
  ceph: do not clear I_COMPLETE from d_release
  ceph: do not set I_COMPLETE
  Revert "ceph: keep reference to parent inode on ceph_dentry"
This commit is contained in:
Linus Torvalds 2011-03-05 10:43:22 -08:00
commit fb62c00a6d
6 changed files with 72 additions and 50 deletions

View File

@ -60,7 +60,6 @@ int ceph_init_dentry(struct dentry *dentry)
} }
di->dentry = dentry; di->dentry = dentry;
di->lease_session = NULL; di->lease_session = NULL;
di->parent_inode = igrab(dentry->d_parent->d_inode);
dentry->d_fsdata = di; dentry->d_fsdata = di;
dentry->d_time = jiffies; dentry->d_time = jiffies;
ceph_dentry_lru_add(dentry); ceph_dentry_lru_add(dentry);
@ -410,7 +409,7 @@ more:
spin_lock(&inode->i_lock); spin_lock(&inode->i_lock);
if (ci->i_release_count == fi->dir_release_count) { if (ci->i_release_count == fi->dir_release_count) {
dout(" marking %p complete\n", inode); dout(" marking %p complete\n", inode);
ci->i_ceph_flags |= CEPH_I_COMPLETE; /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
ci->i_max_offset = filp->f_pos; ci->i_max_offset = filp->f_pos;
} }
spin_unlock(&inode->i_lock); spin_unlock(&inode->i_lock);
@ -497,6 +496,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
/* .snap dir? */ /* .snap dir? */
if (err == -ENOENT && if (err == -ENOENT &&
ceph_snap(parent) == CEPH_NOSNAP &&
strcmp(dentry->d_name.name, strcmp(dentry->d_name.name,
fsc->mount_options->snapdir_name) == 0) { fsc->mount_options->snapdir_name) == 0) {
struct inode *inode = ceph_get_snapdir(parent); struct inode *inode = ceph_get_snapdir(parent);
@ -1030,28 +1030,8 @@ out_touch:
static void ceph_dentry_release(struct dentry *dentry) static void ceph_dentry_release(struct dentry *dentry)
{ {
struct ceph_dentry_info *di = ceph_dentry(dentry); struct ceph_dentry_info *di = ceph_dentry(dentry);
struct inode *parent_inode = NULL;
u64 snapid = CEPH_NOSNAP;
if (!IS_ROOT(dentry)) { dout("dentry_release %p\n", dentry);
parent_inode = di->parent_inode;
if (parent_inode)
snapid = ceph_snap(parent_inode);
}
dout("dentry_release %p parent %p\n", dentry, parent_inode);
if (parent_inode && snapid != CEPH_SNAPDIR) {
struct ceph_inode_info *ci = ceph_inode(parent_inode);
spin_lock(&parent_inode->i_lock);
if (ci->i_shared_gen == di->lease_shared_gen ||
snapid <= CEPH_MAXSNAP) {
dout(" clearing %p complete (d_release)\n",
parent_inode);
ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
ci->i_release_count++;
}
spin_unlock(&parent_inode->i_lock);
}
if (di) { if (di) {
ceph_dentry_lru_del(dentry); ceph_dentry_lru_del(dentry);
if (di->lease_session) if (di->lease_session)
@ -1059,8 +1039,6 @@ static void ceph_dentry_release(struct dentry *dentry)
kmem_cache_free(ceph_dentry_cachep, di); kmem_cache_free(ceph_dentry_cachep, di);
dentry->d_fsdata = NULL; dentry->d_fsdata = NULL;
} }
if (parent_inode)
iput(parent_inode);
} }
static int ceph_snapdir_d_revalidate(struct dentry *dentry, static int ceph_snapdir_d_revalidate(struct dentry *dentry,

View File

@ -707,7 +707,7 @@ static int fill_inode(struct inode *inode,
(issued & CEPH_CAP_FILE_EXCL) == 0 && (issued & CEPH_CAP_FILE_EXCL) == 0 &&
(ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) { (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
dout(" marking %p complete (empty)\n", inode); dout(" marking %p complete (empty)\n", inode);
ci->i_ceph_flags |= CEPH_I_COMPLETE; /* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
ci->i_max_offset = 2; ci->i_max_offset = 2;
} }
break; break;

View File

@ -207,7 +207,6 @@ struct ceph_dentry_info {
struct dentry *dentry; struct dentry *dentry;
u64 time; u64 time;
u64 offset; u64 offset;
struct inode *parent_inode;
}; };
struct ceph_inode_xattrs_info { struct ceph_inode_xattrs_info {

View File

@ -123,6 +123,7 @@ struct ceph_msg_pos {
#define SOCK_CLOSED 11 /* socket state changed to closed */ #define SOCK_CLOSED 11 /* socket state changed to closed */
#define OPENING 13 /* open connection w/ (possibly new) peer */ #define OPENING 13 /* open connection w/ (possibly new) peer */
#define DEAD 14 /* dead, about to kfree */ #define DEAD 14 /* dead, about to kfree */
#define BACKOFF 15
/* /*
* A single connection with another host. * A single connection with another host.
@ -160,7 +161,6 @@ struct ceph_connection {
struct list_head out_queue; struct list_head out_queue;
struct list_head out_sent; /* sending or sent but unacked */ struct list_head out_sent; /* sending or sent but unacked */
u64 out_seq; /* last message queued for send */ u64 out_seq; /* last message queued for send */
bool out_keepalive_pending;
u64 in_seq, in_seq_acked; /* last message received, acked */ u64 in_seq, in_seq_acked; /* last message received, acked */

View File

@ -336,7 +336,6 @@ static void reset_connection(struct ceph_connection *con)
ceph_msg_put(con->out_msg); ceph_msg_put(con->out_msg);
con->out_msg = NULL; con->out_msg = NULL;
} }
con->out_keepalive_pending = false;
con->in_seq = 0; con->in_seq = 0;
con->in_seq_acked = 0; con->in_seq_acked = 0;
} }
@ -1248,8 +1247,6 @@ static int process_connect(struct ceph_connection *con)
con->auth_retry); con->auth_retry);
if (con->auth_retry == 2) { if (con->auth_retry == 2) {
con->error_msg = "connect authorization failure"; con->error_msg = "connect authorization failure";
reset_connection(con);
set_bit(CLOSED, &con->state);
return -1; return -1;
} }
con->auth_retry = 1; con->auth_retry = 1;
@ -1715,14 +1712,6 @@ more:
/* open the socket first? */ /* open the socket first? */
if (con->sock == NULL) { if (con->sock == NULL) {
/*
* if we were STANDBY and are reconnecting _this_
* connection, bump connect_seq now. Always bump
* global_seq.
*/
if (test_and_clear_bit(STANDBY, &con->state))
con->connect_seq++;
prepare_write_banner(msgr, con); prepare_write_banner(msgr, con);
prepare_write_connect(msgr, con, 1); prepare_write_connect(msgr, con, 1);
prepare_read_banner(con); prepare_read_banner(con);
@ -1951,7 +1940,24 @@ static void con_work(struct work_struct *work)
work.work); work.work);
mutex_lock(&con->mutex); mutex_lock(&con->mutex);
if (test_and_clear_bit(BACKOFF, &con->state)) {
dout("con_work %p backing off\n", con);
if (queue_delayed_work(ceph_msgr_wq, &con->work,
round_jiffies_relative(con->delay))) {
dout("con_work %p backoff %lu\n", con, con->delay);
mutex_unlock(&con->mutex);
return;
} else {
con->ops->put(con);
dout("con_work %p FAILED to back off %lu\n", con,
con->delay);
}
}
if (test_bit(STANDBY, &con->state)) {
dout("con_work %p STANDBY\n", con);
goto done;
}
if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
dout("con_work CLOSED\n"); dout("con_work CLOSED\n");
con_close_socket(con); con_close_socket(con);
@ -2008,10 +2014,12 @@ static void ceph_fault(struct ceph_connection *con)
/* Requeue anything that hasn't been acked */ /* Requeue anything that hasn't been acked */
list_splice_init(&con->out_sent, &con->out_queue); list_splice_init(&con->out_sent, &con->out_queue);
/* If there are no messages in the queue, place the connection /* If there are no messages queued or keepalive pending, place
* in a STANDBY state (i.e., don't try to reconnect just yet). */ * the connection in a STANDBY state */
if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { if (list_empty(&con->out_queue) &&
dout("fault setting STANDBY\n"); !test_bit(KEEPALIVE_PENDING, &con->state)) {
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
clear_bit(WRITE_PENDING, &con->state);
set_bit(STANDBY, &con->state); set_bit(STANDBY, &con->state);
} else { } else {
/* retry after a delay. */ /* retry after a delay. */
@ -2019,11 +2027,24 @@ static void ceph_fault(struct ceph_connection *con)
con->delay = BASE_DELAY_INTERVAL; con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL) else if (con->delay < MAX_DELAY_INTERVAL)
con->delay *= 2; con->delay *= 2;
dout("fault queueing %p delay %lu\n", con, con->delay);
con->ops->get(con); con->ops->get(con);
if (queue_delayed_work(ceph_msgr_wq, &con->work, if (queue_delayed_work(ceph_msgr_wq, &con->work,
round_jiffies_relative(con->delay)) == 0) round_jiffies_relative(con->delay))) {
dout("fault queued %p delay %lu\n", con, con->delay);
} else {
con->ops->put(con); con->ops->put(con);
dout("fault failed to queue %p delay %lu, backoff\n",
con, con->delay);
/*
* In many cases we see a socket state change
* while con_work is running and end up
* queuing (non-delayed) work, such that we
* can't backoff with a delay. Set a flag so
* that when con_work restarts we schedule the
* delay then.
*/
set_bit(BACKOFF, &con->state);
}
} }
out_unlock: out_unlock:
@ -2094,6 +2115,19 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
} }
EXPORT_SYMBOL(ceph_messenger_destroy); EXPORT_SYMBOL(ceph_messenger_destroy);
static void clear_standby(struct ceph_connection *con)
{
/* come back from STANDBY? */
if (test_and_clear_bit(STANDBY, &con->state)) {
mutex_lock(&con->mutex);
dout("clear_standby %p and ++connect_seq\n", con);
con->connect_seq++;
WARN_ON(test_bit(WRITE_PENDING, &con->state));
WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
mutex_unlock(&con->mutex);
}
}
/* /*
* Queue up an outgoing message on the given connection. * Queue up an outgoing message on the given connection.
*/ */
@ -2126,6 +2160,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
/* if there wasn't anything waiting to send before, queue /* if there wasn't anything waiting to send before, queue
* new work */ * new work */
clear_standby(con);
if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
queue_con(con); queue_con(con);
} }
@ -2191,6 +2226,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
*/ */
void ceph_con_keepalive(struct ceph_connection *con) void ceph_con_keepalive(struct ceph_connection *con)
{ {
dout("con_keepalive %p\n", con);
clear_standby(con);
if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
test_and_set_bit(WRITE_PENDING, &con->state) == 0) test_and_set_bit(WRITE_PENDING, &con->state) == 0)
queue_con(con); queue_con(con);

View File

@ -16,22 +16,30 @@ struct page **ceph_get_direct_page_vector(const char __user *data,
int num_pages, bool write_page) int num_pages, bool write_page)
{ {
struct page **pages; struct page **pages;
int rc; int got = 0;
int rc = 0;
pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS); pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS);
if (!pages) if (!pages)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
down_read(&current->mm->mmap_sem); down_read(&current->mm->mmap_sem);
rc = get_user_pages(current, current->mm, (unsigned long)data, while (got < num_pages) {
num_pages, write_page, 0, pages, NULL); rc = get_user_pages(current, current->mm,
(unsigned long)data + ((unsigned long)got * PAGE_SIZE),
num_pages - got, write_page, 0, pages + got, NULL);
if (rc < 0)
break;
BUG_ON(rc == 0);
got += rc;
}
up_read(&current->mm->mmap_sem); up_read(&current->mm->mmap_sem);
if (rc < num_pages) if (rc < 0)
goto fail; goto fail;
return pages; return pages;
fail: fail:
ceph_put_page_vector(pages, rc > 0 ? rc : 0, false); ceph_put_page_vector(pages, got, false);
return ERR_PTR(rc); return ERR_PTR(rc);
} }
EXPORT_SYMBOL(ceph_get_direct_page_vector); EXPORT_SYMBOL(ceph_get_direct_page_vector);