diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5a3953db8118..5bb39a50f904 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -228,36 +228,128 @@ static int ceph_readpage(struct file *filp, struct page *page) } /* - * Build a vector of contiguous pages from the provided page list. + * Finish an async read(ahead) op. */ -static struct page **page_vector_from_list(struct list_head *page_list, - unsigned *nr_pages) +static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) { + struct inode *inode = req->r_inode; + struct ceph_osd_reply_head *replyhead; + int rc, bytes; + int i; + + /* parse reply */ + replyhead = msg->front.iov_base; + WARN_ON(le32_to_cpu(replyhead->num_ops) == 0); + rc = le32_to_cpu(replyhead->result); + bytes = le32_to_cpu(msg->hdr.data_len); + + dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); + + /* unlock all pages, zeroing any data we didn't read */ + for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) { + struct page *page = req->r_pages[i]; + + if (bytes < (int)PAGE_CACHE_SIZE) { + /* zero (remainder of) page */ + int s = bytes < 0 ? 0 : bytes; + zero_user_segment(page, s, PAGE_CACHE_SIZE); + } + dout("finish_read %p uptodate %p idx %lu\n", inode, page, + page->index); + flush_dcache_page(page); + SetPageUptodate(page); + unlock_page(page); + page_cache_release(page); + } + kfree(req->r_pages); +} + +/* + * start an async read(ahead) operation. return nr_pages we submitted + * a read for on success, or negative error code. + */ +static int start_read(struct inode *inode, struct list_head *page_list) +{ + struct ceph_osd_client *osdc = + &ceph_inode_to_client(inode)->client->osdc; + struct ceph_inode_info *ci = ceph_inode(inode); + struct page *page = list_entry(page_list->prev, struct page, lru); + struct ceph_osd_request *req; + u64 off; + u64 len; + int i; struct page **pages; - struct page *page; - int next_index, contig_pages = 0; + pgoff_t next_index; + int nr_pages = 0; + int ret; + + off = page->index << PAGE_CACHE_SHIFT; + + /* count pages */ + next_index = page->index; + list_for_each_entry_reverse(page, page_list, lru) { + if (page->index != next_index) + break; + nr_pages++; + next_index++; + } + len = nr_pages << PAGE_CACHE_SHIFT; + dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages, + off, len); + + req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), + off, &len, + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, + NULL, 0, + ci->i_truncate_seq, ci->i_truncate_size, + NULL, false, 1, 0); + if (!req) + return -ENOMEM; /* build page vector */ - pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS); + nr_pages = len >> PAGE_CACHE_SHIFT; + pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); + ret = -ENOMEM; if (!pages) - return ERR_PTR(-ENOMEM); - - BUG_ON(list_empty(page_list)); - next_index = list_entry(page_list->prev, struct page, lru)->index; - list_for_each_entry_reverse(page, page_list, lru) { - if (page->index == next_index) { - dout("readpages page %d %p\n", contig_pages, page); - pages[contig_pages] = page; - contig_pages++; - next_index++; - } else { - break; + goto out; + for (i = 0; i < nr_pages; ++i) { + page = list_entry(page_list->prev, struct page, lru); + BUG_ON(PageLocked(page)); + list_del(&page->lru); + + dout("start_read %p adding %p idx %lu\n", inode, page, + page->index); + if (add_to_page_cache_lru(page, &inode->i_data, page->index, + GFP_NOFS)) { + page_cache_release(page); + dout("start_read %p add_to_page_cache failed %p\n", + inode, page); + nr_pages = i; + goto out_pages; } + pages[i] = page; } - *nr_pages = contig_pages; - return pages; + req->r_pages = pages; + req->r_num_pages = nr_pages; + req->r_callback = finish_read; + req->r_inode = inode; + + dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); + ret = ceph_osdc_start_request(osdc, req, false); + if (ret < 0) + goto out_pages; + ceph_osdc_put_request(req); + return nr_pages; + +out_pages: + ceph_release_page_vector(pages, nr_pages); + kfree(pages); +out: + ceph_osdc_put_request(req); + return ret; } + /* * Read multiple pages. Leave pages we don't read + unlock in page_list; * the caller (VM) cleans them up. @@ -266,64 +358,17 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, struct list_head *page_list, unsigned nr_pages) { struct inode *inode = file->f_dentry->d_inode; - struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_osd_client *osdc = - &ceph_inode_to_client(inode)->client->osdc; int rc = 0; - struct page **pages; - loff_t offset; - u64 len; - dout("readpages %p file %p nr_pages %d\n", - inode, file, nr_pages); - - pages = page_vector_from_list(page_list, &nr_pages); - if (IS_ERR(pages)) - return PTR_ERR(pages); - - /* guess read extent */ - offset = pages[0]->index << PAGE_CACHE_SHIFT; - len = nr_pages << PAGE_CACHE_SHIFT; - rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, - offset, &len, - ci->i_truncate_seq, ci->i_truncate_size, - pages, nr_pages, 0); - if (rc == -ENOENT) - rc = 0; - if (rc < 0) - goto out; - - for (; !list_empty(page_list) && len > 0; - rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) { - struct page *page = - list_entry(page_list->prev, struct page, lru); - - list_del(&page->lru); - - if (rc < (int)PAGE_CACHE_SIZE) { - /* zero (remainder of) page */ - int s = rc < 0 ? 0 : rc; - zero_user_segment(page, s, PAGE_CACHE_SIZE); - } - - if (add_to_page_cache_lru(page, mapping, page->index, - GFP_NOFS)) { - page_cache_release(page); - dout("readpages %p add_to_page_cache failed %p\n", - inode, page); - continue; - } - dout("readpages %p adding %p idx %lu\n", inode, page, - page->index); - flush_dcache_page(page); - SetPageUptodate(page); - unlock_page(page); - page_cache_release(page); + dout("readpages %p file %p nr_pages %d\n", inode, file, nr_pages); + while (!list_empty(page_list)) { + rc = start_read(inode, page_list); + if (rc < 0) + goto out; + BUG_ON(rc == 0); } - rc = 0; - out: - kfree(pages); + dout("readpages %p file %p ret %d\n", inode, file, rc); return rc; }