ceph: prevent a client from exceeding the MDS maximum xattr size

The MDS tries to enforce a limit on the total key/values in extended
attributes.  However, this limit is enforced only if doing a synchronous
operation (MDS_OP_SETXATTR) -- if we're buffering the xattrs, the MDS
doesn't have a chance to enforce these limits.

This patch adds support for decoding the xattrs maximum size setting that is
distributed in the mdsmap.  Then, when setting an xattr, the kernel client
will revert to do a synchronous operation if that maximum size is exceeded.

While there, fix a dout() that would trigger a printk warning:

[   98.718078] ------------[ cut here ]------------
[   98.719012] precision 65536 too large
[   98.719039] WARNING: CPU: 1 PID: 3755 at lib/vsprintf.c:2703 vsnprintf+0x5e3/0x600
...

Link: https://tracker.ceph.com/issues/55725
Signed-off-by: Luís Henriques <lhenriques@suse.de>
Reviewed-by: Xiubo Li <xiubli@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
This commit is contained in:
Luís Henriques 2022-06-03 14:29:09 +01:00 committed by Ilya Dryomov
parent 8266c4d7a7
commit d93231a6bc
3 changed files with 27 additions and 8 deletions

View File

@ -352,12 +352,10 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end, bool msgr2)
__decode_and_drop_type(p, end, u8, bad_ext);
}
if (mdsmap_ev >= 8) {
u32 name_len;
/* enabled */
ceph_decode_8_safe(p, end, m->m_enabled, bad_ext);
ceph_decode_32_safe(p, end, name_len, bad_ext);
ceph_decode_need(p, end, name_len, bad_ext);
*p += name_len;
/* fs_name */
ceph_decode_skip_string(p, end, bad_ext);
}
/* damaged */
if (mdsmap_ev >= 9) {
@ -370,6 +368,22 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end, bool msgr2)
} else {
m->m_damaged = false;
}
if (mdsmap_ev >= 17) {
/* balancer */
ceph_decode_skip_string(p, end, bad_ext);
/* standby_count_wanted */
ceph_decode_skip_32(p, end, bad_ext);
/* old_max_mds */
ceph_decode_skip_32(p, end, bad_ext);
/* min_compat_client */
ceph_decode_skip_8(p, end, bad_ext);
/* required_client_features */
ceph_decode_skip_set(p, end, 64, bad_ext);
ceph_decode_64_safe(p, end, m->m_max_xattr_size, bad_ext);
} else {
/* This forces the usage of the (sync) SETXATTR Op */
m->m_max_xattr_size = 0;
}
bad_ext:
dout("mdsmap_decode m_enabled: %d, m_damaged: %d, m_num_laggy: %d\n",
!!m->m_enabled, !!m->m_damaged, m->m_num_laggy);

View File

@ -1086,7 +1086,7 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
flags |= CEPH_XATTR_REMOVE;
}
dout("setxattr value=%.*s\n", (int)size, value);
dout("setxattr value size: %zu\n", size);
/* do request */
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
@ -1184,8 +1184,14 @@ int __ceph_setxattr(struct inode *inode, const char *name,
spin_lock(&ci->i_ceph_lock);
retry:
issued = __ceph_caps_issued(ci, NULL);
if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
required_blob_size = __get_required_blob_size(ci, name_len, val_len);
if ((ci->i_xattrs.version == 0) || !(issued & CEPH_CAP_XATTR_EXCL) ||
(required_blob_size > mdsc->mdsmap->m_max_xattr_size)) {
dout("%s do sync setxattr: version: %llu size: %d max: %llu\n",
__func__, ci->i_xattrs.version, required_blob_size,
mdsc->mdsmap->m_max_xattr_size);
goto do_sync;
}
if (!lock_snap_rwsem && !ci->i_head_snapc) {
lock_snap_rwsem = true;
@ -1201,8 +1207,6 @@ int __ceph_setxattr(struct inode *inode, const char *name,
ceph_cap_string(issued));
__build_xattrs(inode);
required_blob_size = __get_required_blob_size(ci, name_len, val_len);
if (!ci->i_xattrs.prealloc_blob ||
required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
struct ceph_buffer *blob;

View File

@ -25,6 +25,7 @@ struct ceph_mdsmap {
u32 m_session_timeout; /* seconds */
u32 m_session_autoclose; /* seconds */
u64 m_max_file_size;
u64 m_max_xattr_size; /* maximum size for xattrs blob */
u32 m_max_mds; /* expected up:active mds number */
u32 m_num_active_mds; /* actual up:active mds number */
u32 possible_max_rank; /* possible max rank index */