aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2011-10-28 19:42:18 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2011-10-28 19:42:18 -0400
commit97d2eb13a019ec09cc1a7ea2d3705c0b117b3c0d (patch)
tree86f6382941f8cfc41647d33d87bec7bc1407c18c
parent68d99b2c8efcb6ed3807a55569300c53b5f88be5 (diff)
parent339573406737461cfb17bebabf7ba536a302d841 (diff)
Merge branch 'for-linus' of git://ceph.newdream.net/git/ceph-client
* 'for-linus' of git://ceph.newdream.net/git/ceph-client: libceph: fix double-free of page vector ceph: fix 32-bit ino numbers libceph: force resend of osd requests if we skip an osdmap ceph: use kernel DNS resolver ceph: fix ceph_monc_init memory leak ceph: let the set_layout ioctl set single traits Revert "ceph: don't truncate dirty pages in invalidate work thread" ceph: replace leading spaces with tabs libceph: warn on msg allocation failures libceph: don't complain on msgpool alloc failures libceph: always preallocate mon connection libceph: create messenger with client ceph: document ioctls ceph: implement (optional) max read size ceph: rename rsize -> rasize ceph: make readpages fully async
-rw-r--r--drivers/block/rbd.c2
-rw-r--r--fs/ceph/addr.c193
-rw-r--r--fs/ceph/caps.c2
-rw-r--r--fs/ceph/inode.c46
-rw-r--r--fs/ceph/ioctl.c34
-rw-r--r--fs/ceph/ioctl.h55
-rw-r--r--fs/ceph/mds_client.c11
-rw-r--r--fs/ceph/super.c61
-rw-r--r--fs/ceph/super.h19
-rw-r--r--include/linux/ceph/libceph.h4
-rw-r--r--include/linux/ceph/messenger.h3
-rw-r--r--net/ceph/Kconfig14
-rw-r--r--net/ceph/ceph_common.c47
-rw-r--r--net/ceph/messenger.c130
-rw-r--r--net/ceph/mon_client.c79
-rw-r--r--net/ceph/msgpool.c4
-rw-r--r--net/ceph/osd_client.c34
17 files changed, 483 insertions, 255 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index fe3c3249cec4..65cc424359b0 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -260,7 +260,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
260 kref_init(&rbdc->kref); 260 kref_init(&rbdc->kref);
261 INIT_LIST_HEAD(&rbdc->node); 261 INIT_LIST_HEAD(&rbdc->node);
262 262
263 rbdc->client = ceph_create_client(opt, rbdc); 263 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
264 if (IS_ERR(rbdc->client)) 264 if (IS_ERR(rbdc->client))
265 goto out_rbdc; 265 goto out_rbdc;
266 opt = NULL; /* Now rbdc->client is responsible for opt */ 266 opt = NULL; /* Now rbdc->client is responsible for opt */
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5a3953db8118..4144caf2f9d3 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -228,102 +228,155 @@ static int ceph_readpage(struct file *filp, struct page *page)
228} 228}
229 229
230/* 230/*
231 * Build a vector of contiguous pages from the provided page list. 231 * Finish an async read(ahead) op.
232 */ 232 */
233static struct page **page_vector_from_list(struct list_head *page_list, 233static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
234 unsigned *nr_pages)
235{ 234{
236 struct page **pages; 235 struct inode *inode = req->r_inode;
237 struct page *page; 236 struct ceph_osd_reply_head *replyhead;
238 int next_index, contig_pages = 0; 237 int rc, bytes;
238 int i;
239 239
240 /* build page vector */ 240 /* parse reply */
241 pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS); 241 replyhead = msg->front.iov_base;
242 if (!pages) 242 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
243 return ERR_PTR(-ENOMEM); 243 rc = le32_to_cpu(replyhead->result);
244 bytes = le32_to_cpu(msg->hdr.data_len);
244 245
245 BUG_ON(list_empty(page_list)); 246 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
246 next_index = list_entry(page_list->prev, struct page, lru)->index; 247
247 list_for_each_entry_reverse(page, page_list, lru) { 248 /* unlock all pages, zeroing any data we didn't read */
248 if (page->index == next_index) { 249 for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
249 dout("readpages page %d %p\n", contig_pages, page); 250 struct page *page = req->r_pages[i];
250 pages[contig_pages] = page; 251
251 contig_pages++; 252 if (bytes < (int)PAGE_CACHE_SIZE) {
252 next_index++; 253 /* zero (remainder of) page */
253 } else { 254 int s = bytes < 0 ? 0 : bytes;
254 break; 255 zero_user_segment(page, s, PAGE_CACHE_SIZE);
255 } 256 }
257 dout("finish_read %p uptodate %p idx %lu\n", inode, page,
258 page->index);
259 flush_dcache_page(page);
260 SetPageUptodate(page);
261 unlock_page(page);
262 page_cache_release(page);
256 } 263 }
257 *nr_pages = contig_pages; 264 kfree(req->r_pages);
258 return pages;
259} 265}
260 266
261/* 267/*
262 * Read multiple pages. Leave pages we don't read + unlock in page_list; 268 * start an async read(ahead) operation. return nr_pages we submitted
263 * the caller (VM) cleans them up. 269 * a read for on success, or negative error code.
264 */ 270 */
265static int ceph_readpages(struct file *file, struct address_space *mapping, 271static int start_read(struct inode *inode, struct list_head *page_list, int max)
266 struct list_head *page_list, unsigned nr_pages)
267{ 272{
268 struct inode *inode = file->f_dentry->d_inode;
269 struct ceph_inode_info *ci = ceph_inode(inode);
270 struct ceph_osd_client *osdc = 273 struct ceph_osd_client *osdc =
271 &ceph_inode_to_client(inode)->client->osdc; 274 &ceph_inode_to_client(inode)->client->osdc;
272 int rc = 0; 275 struct ceph_inode_info *ci = ceph_inode(inode);
273 struct page **pages; 276 struct page *page = list_entry(page_list->prev, struct page, lru);
274 loff_t offset; 277 struct ceph_osd_request *req;
278 u64 off;
275 u64 len; 279 u64 len;
280 int i;
281 struct page **pages;
282 pgoff_t next_index;
283 int nr_pages = 0;
284 int ret;
276 285
277 dout("readpages %p file %p nr_pages %d\n", 286 off = page->index << PAGE_CACHE_SHIFT;
278 inode, file, nr_pages);
279
280 pages = page_vector_from_list(page_list, &nr_pages);
281 if (IS_ERR(pages))
282 return PTR_ERR(pages);
283 287
284 /* guess read extent */ 288 /* count pages */
285 offset = pages[0]->index << PAGE_CACHE_SHIFT; 289 next_index = page->index;
290 list_for_each_entry_reverse(page, page_list, lru) {
291 if (page->index != next_index)
292 break;
293 nr_pages++;
294 next_index++;
295 if (max && nr_pages == max)
296 break;
297 }
286 len = nr_pages << PAGE_CACHE_SHIFT; 298 len = nr_pages << PAGE_CACHE_SHIFT;
287 rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 299 dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
288 offset, &len, 300 off, len);
289 ci->i_truncate_seq, ci->i_truncate_size, 301
290 pages, nr_pages, 0); 302 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
291 if (rc == -ENOENT) 303 off, &len,
292 rc = 0; 304 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
293 if (rc < 0) 305 NULL, 0,
294 goto out; 306 ci->i_truncate_seq, ci->i_truncate_size,
295 307 NULL, false, 1, 0);
296 for (; !list_empty(page_list) && len > 0; 308 if (!req)
297 rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) { 309 return -ENOMEM;
298 struct page *page =
299 list_entry(page_list->prev, struct page, lru);
300 310
311 /* build page vector */
312 nr_pages = len >> PAGE_CACHE_SHIFT;
313 pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
314 ret = -ENOMEM;
315 if (!pages)
316 goto out;
317 for (i = 0; i < nr_pages; ++i) {
318 page = list_entry(page_list->prev, struct page, lru);
319 BUG_ON(PageLocked(page));
301 list_del(&page->lru); 320 list_del(&page->lru);
302 321
303 if (rc < (int)PAGE_CACHE_SIZE) { 322 dout("start_read %p adding %p idx %lu\n", inode, page,
304 /* zero (remainder of) page */ 323 page->index);
305 int s = rc < 0 ? 0 : rc; 324 if (add_to_page_cache_lru(page, &inode->i_data, page->index,
306 zero_user_segment(page, s, PAGE_CACHE_SIZE);
307 }
308
309 if (add_to_page_cache_lru(page, mapping, page->index,
310 GFP_NOFS)) { 325 GFP_NOFS)) {
311 page_cache_release(page); 326 page_cache_release(page);
312 dout("readpages %p add_to_page_cache failed %p\n", 327 dout("start_read %p add_to_page_cache failed %p\n",
313 inode, page); 328 inode, page);
314 continue; 329 nr_pages = i;
330 goto out_pages;
315 } 331 }
316 dout("readpages %p adding %p idx %lu\n", inode, page, 332 pages[i] = page;
317 page->index);
318 flush_dcache_page(page);
319 SetPageUptodate(page);
320 unlock_page(page);
321 page_cache_release(page);
322 } 333 }
323 rc = 0; 334 req->r_pages = pages;
335 req->r_num_pages = nr_pages;
336 req->r_callback = finish_read;
337 req->r_inode = inode;
338
339 dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
340 ret = ceph_osdc_start_request(osdc, req, false);
341 if (ret < 0)
342 goto out_pages;
343 ceph_osdc_put_request(req);
344 return nr_pages;
324 345
346out_pages:
347 ceph_release_page_vector(pages, nr_pages);
348out:
349 ceph_osdc_put_request(req);
350 return ret;
351}
352
353
354/*
355 * Read multiple pages. Leave pages we don't read + unlock in page_list;
356 * the caller (VM) cleans them up.
357 */
358static int ceph_readpages(struct file *file, struct address_space *mapping,
359 struct list_head *page_list, unsigned nr_pages)
360{
361 struct inode *inode = file->f_dentry->d_inode;
362 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
363 int rc = 0;
364 int max = 0;
365
366 if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
367 max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
368 >> PAGE_SHIFT;
369
370 dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages,
371 max);
372 while (!list_empty(page_list)) {
373 rc = start_read(inode, page_list, max);
374 if (rc < 0)
375 goto out;
376 BUG_ON(rc == 0);
377 }
325out: 378out:
326 kfree(pages); 379 dout("readpages %p file %p ret %d\n", inode, file, rc);
327 return rc; 380 return rc;
328} 381}
329 382
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 8d74ad7ba556..b8731bf3ef1f 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -945,7 +945,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
945 seq, issue_seq, mseq, follows, size, max_size, 945 seq, issue_seq, mseq, follows, size, max_size,
946 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); 946 xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
947 947
948 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS); 948 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
949 if (!msg) 949 if (!msg)
950 return -ENOMEM; 950 return -ENOMEM;
951 951
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 095799ba9dd1..5dde7d51dc11 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -9,7 +9,6 @@
9#include <linux/namei.h> 9#include <linux/namei.h>
10#include <linux/writeback.h> 10#include <linux/writeback.h>
11#include <linux/vmalloc.h> 11#include <linux/vmalloc.h>
12#include <linux/pagevec.h>
13 12
14#include "super.h" 13#include "super.h"
15#include "mds_client.h" 14#include "mds_client.h"
@@ -1364,49 +1363,6 @@ void ceph_queue_invalidate(struct inode *inode)
1364} 1363}
1365 1364
1366/* 1365/*
1367 * invalidate any pages that are not dirty or under writeback. this
1368 * includes pages that are clean and mapped.
1369 */
1370static void ceph_invalidate_nondirty_pages(struct address_space *mapping)
1371{
1372 struct pagevec pvec;
1373 pgoff_t next = 0;
1374 int i;
1375
1376 pagevec_init(&pvec, 0);
1377 while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) {
1378 for (i = 0; i < pagevec_count(&pvec); i++) {
1379 struct page *page = pvec.pages[i];
1380 pgoff_t index;
1381 int skip_page =
1382 (PageDirty(page) || PageWriteback(page));
1383
1384 if (!skip_page)
1385 skip_page = !trylock_page(page);
1386
1387 /*
1388 * We really shouldn't be looking at the ->index of an
1389 * unlocked page. But we're not allowed to lock these
1390 * pages. So we rely upon nobody altering the ->index
1391 * of this (pinned-by-us) page.
1392 */
1393 index = page->index;
1394 if (index > next)
1395 next = index;
1396 next++;
1397
1398 if (skip_page)
1399 continue;
1400
1401 generic_error_remove_page(mapping, page);
1402 unlock_page(page);
1403 }
1404 pagevec_release(&pvec);
1405 cond_resched();
1406 }
1407}
1408
1409/*
1410 * Invalidate inode pages in a worker thread. (This can't be done 1366 * Invalidate inode pages in a worker thread. (This can't be done
1411 * in the message handler context.) 1367 * in the message handler context.)
1412 */ 1368 */
@@ -1429,7 +1385,7 @@ static void ceph_invalidate_work(struct work_struct *work)
1429 orig_gen = ci->i_rdcache_gen; 1385 orig_gen = ci->i_rdcache_gen;
1430 spin_unlock(&inode->i_lock); 1386 spin_unlock(&inode->i_lock);
1431 1387
1432 ceph_invalidate_nondirty_pages(inode->i_mapping); 1388 truncate_inode_pages(&inode->i_data, 0);
1433 1389
1434 spin_lock(&inode->i_lock); 1390 spin_lock(&inode->i_lock);
1435 if (orig_gen == ci->i_rdcache_gen && 1391 if (orig_gen == ci->i_rdcache_gen &&
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 3b256b50f7d8..5a14c29cbba6 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -42,17 +42,39 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
42 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 42 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
43 struct ceph_mds_request *req; 43 struct ceph_mds_request *req;
44 struct ceph_ioctl_layout l; 44 struct ceph_ioctl_layout l;
45 struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode);
46 struct ceph_ioctl_layout nl;
45 int err, i; 47 int err, i;
46 48
47 /* copy and validate */
48 if (copy_from_user(&l, arg, sizeof(l))) 49 if (copy_from_user(&l, arg, sizeof(l)))
49 return -EFAULT; 50 return -EFAULT;
50 51
51 if ((l.object_size & ~PAGE_MASK) || 52 /* validate changed params against current layout */
52 (l.stripe_unit & ~PAGE_MASK) || 53 err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT);
53 !l.stripe_unit || 54 if (!err) {
54 (l.object_size && 55 nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
55 (unsigned)l.object_size % (unsigned)l.stripe_unit)) 56 nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
57 nl.object_size = ceph_file_layout_object_size(ci->i_layout);
58 nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
59 nl.preferred_osd =
60 (s32)le32_to_cpu(ci->i_layout.fl_pg_preferred);
61 } else
62 return err;
63
64 if (l.stripe_count)
65 nl.stripe_count = l.stripe_count;
66 if (l.stripe_unit)
67 nl.stripe_unit = l.stripe_unit;
68 if (l.object_size)
69 nl.object_size = l.object_size;
70 if (l.data_pool)
71 nl.data_pool = l.data_pool;
72 if (l.preferred_osd)
73 nl.preferred_osd = l.preferred_osd;
74
75 if ((nl.object_size & ~PAGE_MASK) ||
76 (nl.stripe_unit & ~PAGE_MASK) ||
77 ((unsigned)nl.object_size % (unsigned)nl.stripe_unit))
56 return -EINVAL; 78 return -EINVAL;
57 79
58 /* make sure it's a valid data pool */ 80 /* make sure it's a valid data pool */
diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h
index 0c5167e43180..be4a60487333 100644
--- a/fs/ceph/ioctl.h
+++ b/fs/ceph/ioctl.h
@@ -6,7 +6,31 @@
6 6
7#define CEPH_IOCTL_MAGIC 0x97 7#define CEPH_IOCTL_MAGIC 0x97
8 8
9/* just use u64 to align sanely on all archs */ 9/*
10 * CEPH_IOC_GET_LAYOUT - get file layout or dir layout policy
11 * CEPH_IOC_SET_LAYOUT - set file layout
12 * CEPH_IOC_SET_LAYOUT_POLICY - set dir layout policy
13 *
14 * The file layout specifies how file data is striped over objects in
15 * the distributed object store, which object pool they belong to (if
16 * it differs from the default), and an optional 'preferred osd' to
17 * store them on.
18 *
19 * Files get a new layout based on the policy set on the containing
20 * directory or one of its ancestors. The GET_LAYOUT ioctl will let
21 * you examine the layout for a file or the policy on a directory.
22 *
23 * SET_LAYOUT will let you set a layout on a newly created file. This
24 * only works immediately after the file is created and before any
25 * data is written to it.
26 *
27 * SET_LAYOUT_POLICY will let you set a layout policy (default layout)
28 * on a directory that will apply to any new files created in that
29 * directory (or any child directory that doesn't specify a layout of
30 * its own).
31 */
32
33/* use u64 to align sanely on all archs */
10struct ceph_ioctl_layout { 34struct ceph_ioctl_layout {
11 __u64 stripe_unit, stripe_count, object_size; 35 __u64 stripe_unit, stripe_count, object_size;
12 __u64 data_pool; 36 __u64 data_pool;
@@ -21,6 +45,8 @@ struct ceph_ioctl_layout {
21 struct ceph_ioctl_layout) 45 struct ceph_ioctl_layout)
22 46
23/* 47/*
48 * CEPH_IOC_GET_DATALOC - get location of file data in the cluster
49 *
24 * Extract identity, address of the OSD and object storing a given 50 * Extract identity, address of the OSD and object storing a given
25 * file offset. 51 * file offset.
26 */ 52 */
@@ -39,7 +65,34 @@ struct ceph_ioctl_dataloc {
39#define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3, \ 65#define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3, \
40 struct ceph_ioctl_dataloc) 66 struct ceph_ioctl_dataloc)
41 67
68/*
69 * CEPH_IOC_LAZYIO - relax consistency
70 *
71 * Normally Ceph switches to synchronous IO when multiple clients have
72 * the file open (and or more for write). Reads and writes bypass the
73 * page cache and go directly to the OSD. Setting this flag on a file
74 * descriptor will allow buffered IO for this file in cases where the
75 * application knows it won't interfere with other nodes (or doesn't
76 * care).
77 */
42#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4) 78#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4)
79
80/*
81 * CEPH_IOC_SYNCIO - force synchronous IO
82 *
83 * This ioctl sets a file flag that forces the synchronous IO that
84 * bypasses the page cache, even if it is not necessary. This is
85 * essentially the opposite behavior of IOC_LAZYIO. This forces the
86 * same read/write path as a file opened by multiple clients when one
87 * or more of those clients is opened for write.
88 *
89 * Note that this type of sync IO takes a different path than a file
90 * opened with O_SYNC/D_SYNC (writes hit the page cache and are
91 * immediately flushed on page boundaries). It is very similar to
92 * O_DIRECT (writes bypass the page cache) excep that O_DIRECT writes
93 * are not copied (user page must remain stable) and O_DIRECT writes
94 * have alignment restrictions (on the buffer and file offset).
95 */
43#define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5) 96#define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5)
44 97
45#endif 98#endif
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 86c59e16ba74..1d72f15fe9f4 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -764,7 +764,8 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
764 struct ceph_msg *msg; 764 struct ceph_msg *msg;
765 struct ceph_mds_session_head *h; 765 struct ceph_mds_session_head *h;
766 766
767 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS); 767 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
768 false);
768 if (!msg) { 769 if (!msg) {
769 pr_err("create_session_msg ENOMEM creating msg\n"); 770 pr_err("create_session_msg ENOMEM creating msg\n");
770 return NULL; 771 return NULL;
@@ -1240,7 +1241,7 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1240 while (session->s_num_cap_releases < session->s_nr_caps + extra) { 1241 while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1241 spin_unlock(&session->s_cap_lock); 1242 spin_unlock(&session->s_cap_lock);
1242 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, 1243 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1243 GFP_NOFS); 1244 GFP_NOFS, false);
1244 if (!msg) 1245 if (!msg)
1245 goto out_unlocked; 1246 goto out_unlocked;
1246 dout("add_cap_releases %p msg %p now %d\n", session, msg, 1247 dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1652,7 +1653,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1652 if (req->r_old_dentry_drop) 1653 if (req->r_old_dentry_drop)
1653 len += req->r_old_dentry->d_name.len; 1654 len += req->r_old_dentry->d_name.len;
1654 1655
1655 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS); 1656 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1656 if (!msg) { 1657 if (!msg) {
1657 msg = ERR_PTR(-ENOMEM); 1658 msg = ERR_PTR(-ENOMEM);
1658 goto out_free2; 1659 goto out_free2;
@@ -2518,7 +2519,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2518 goto fail_nopagelist; 2519 goto fail_nopagelist;
2519 ceph_pagelist_init(pagelist); 2520 ceph_pagelist_init(pagelist);
2520 2521
2521 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS); 2522 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2522 if (!reply) 2523 if (!reply)
2523 goto fail_nomsg; 2524 goto fail_nomsg;
2524 2525
@@ -2831,7 +2832,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2831 dnamelen = dentry->d_name.len; 2832 dnamelen = dentry->d_name.len;
2832 len += dnamelen; 2833 len += dnamelen;
2833 2834
2834 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS); 2835 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
2835 if (!msg) 2836 if (!msg)
2836 return; 2837 return;
2837 lease = msg->front.iov_base; 2838 lease = msg->front.iov_base;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 88bacaf385d9..788f5ad8e66d 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -114,6 +114,7 @@ static int ceph_sync_fs(struct super_block *sb, int wait)
114enum { 114enum {
115 Opt_wsize, 115 Opt_wsize,
116 Opt_rsize, 116 Opt_rsize,
117 Opt_rasize,
117 Opt_caps_wanted_delay_min, 118 Opt_caps_wanted_delay_min,
118 Opt_caps_wanted_delay_max, 119 Opt_caps_wanted_delay_max,
119 Opt_cap_release_safety, 120 Opt_cap_release_safety,
@@ -136,6 +137,7 @@ enum {
136static match_table_t fsopt_tokens = { 137static match_table_t fsopt_tokens = {
137 {Opt_wsize, "wsize=%d"}, 138 {Opt_wsize, "wsize=%d"},
138 {Opt_rsize, "rsize=%d"}, 139 {Opt_rsize, "rsize=%d"},
140 {Opt_rasize, "rasize=%d"},
139 {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"}, 141 {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
140 {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"}, 142 {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
141 {Opt_cap_release_safety, "cap_release_safety=%d"}, 143 {Opt_cap_release_safety, "cap_release_safety=%d"},
@@ -196,6 +198,9 @@ static int parse_fsopt_token(char *c, void *private)
196 case Opt_rsize: 198 case Opt_rsize:
197 fsopt->rsize = intval; 199 fsopt->rsize = intval;
198 break; 200 break;
201 case Opt_rasize:
202 fsopt->rasize = intval;
203 break;
199 case Opt_caps_wanted_delay_min: 204 case Opt_caps_wanted_delay_min:
200 fsopt->caps_wanted_delay_min = intval; 205 fsopt->caps_wanted_delay_min = intval;
201 break; 206 break;
@@ -289,28 +294,29 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
289 294
290 dout("parse_mount_options %p, dev_name '%s'\n", fsopt, dev_name); 295 dout("parse_mount_options %p, dev_name '%s'\n", fsopt, dev_name);
291 296
292 fsopt->sb_flags = flags; 297 fsopt->sb_flags = flags;
293 fsopt->flags = CEPH_MOUNT_OPT_DEFAULT; 298 fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
294 299
295 fsopt->rsize = CEPH_RSIZE_DEFAULT; 300 fsopt->rsize = CEPH_RSIZE_DEFAULT;
296 fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL); 301 fsopt->rasize = CEPH_RASIZE_DEFAULT;
302 fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
297 fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT; 303 fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
298 fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT; 304 fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
299 fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT; 305 fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
300 fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT; 306 fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
301 fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT; 307 fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
302 fsopt->congestion_kb = default_congestion_kb(); 308 fsopt->congestion_kb = default_congestion_kb();
303 309
304 /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */ 310 /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
305 err = -EINVAL; 311 err = -EINVAL;
306 if (!dev_name) 312 if (!dev_name)
307 goto out; 313 goto out;
308 *path = strstr(dev_name, ":/"); 314 *path = strstr(dev_name, ":/");
309 if (*path == NULL) { 315 if (*path == NULL) {
310 pr_err("device name is missing path (no :/ in %s)\n", 316 pr_err("device name is missing path (no :/ in %s)\n",
311 dev_name); 317 dev_name);
312 goto out; 318 goto out;
313 } 319 }
314 dev_name_end = *path; 320 dev_name_end = *path;
315 dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name); 321 dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
316 322
@@ -376,6 +382,8 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
376 seq_printf(m, ",wsize=%d", fsopt->wsize); 382 seq_printf(m, ",wsize=%d", fsopt->wsize);
377 if (fsopt->rsize != CEPH_RSIZE_DEFAULT) 383 if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
378 seq_printf(m, ",rsize=%d", fsopt->rsize); 384 seq_printf(m, ",rsize=%d", fsopt->rsize);
385 if (fsopt->rasize != CEPH_RASIZE_DEFAULT)
386 seq_printf(m, ",rasize=%d", fsopt->rsize);
379 if (fsopt->congestion_kb != default_congestion_kb()) 387 if (fsopt->congestion_kb != default_congestion_kb())
380 seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb); 388 seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
381 if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT) 389 if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
@@ -422,20 +430,23 @@ struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
422 struct ceph_options *opt) 430 struct ceph_options *opt)
423{ 431{
424 struct ceph_fs_client *fsc; 432 struct ceph_fs_client *fsc;
433 const unsigned supported_features =
434 CEPH_FEATURE_FLOCK |
435 CEPH_FEATURE_DIRLAYOUTHASH;
436 const unsigned required_features = 0;
425 int err = -ENOMEM; 437 int err = -ENOMEM;
426 438
427 fsc = kzalloc(sizeof(*fsc), GFP_KERNEL); 439 fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
428 if (!fsc) 440 if (!fsc)
429 return ERR_PTR(-ENOMEM); 441 return ERR_PTR(-ENOMEM);
430 442
431 fsc->client = ceph_create_client(opt, fsc); 443 fsc->client = ceph_create_client(opt, fsc, supported_features,
444 required_features);
432 if (IS_ERR(fsc->client)) { 445 if (IS_ERR(fsc->client)) {
433 err = PTR_ERR(fsc->client); 446 err = PTR_ERR(fsc->client);
434 goto fail; 447 goto fail;
435 } 448 }
436 fsc->client->extra_mon_dispatch = extra_mon_dispatch; 449 fsc->client->extra_mon_dispatch = extra_mon_dispatch;
437 fsc->client->supported_features |= CEPH_FEATURE_FLOCK |
438 CEPH_FEATURE_DIRLAYOUTHASH;
439 fsc->client->monc.want_mdsmap = 1; 450 fsc->client->monc.want_mdsmap = 1;
440 451
441 fsc->mount_options = fsopt; 452 fsc->mount_options = fsopt;
@@ -774,10 +785,10 @@ static int ceph_register_bdi(struct super_block *sb,
774{ 785{
775 int err; 786 int err;
776 787
777 /* set ra_pages based on rsize mount option? */ 788 /* set ra_pages based on rasize mount option? */
778 if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE) 789 if (fsc->mount_options->rasize >= PAGE_CACHE_SIZE)
779 fsc->backing_dev_info.ra_pages = 790 fsc->backing_dev_info.ra_pages =
780 (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) 791 (fsc->mount_options->rasize + PAGE_CACHE_SIZE - 1)
781 >> PAGE_SHIFT; 792 >> PAGE_SHIFT;
782 else 793 else
783 fsc->backing_dev_info.ra_pages = 794 fsc->backing_dev_info.ra_pages =
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index a23eed526f05..b01442aaf278 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -36,7 +36,8 @@
36#define ceph_test_mount_opt(fsc, opt) \ 36#define ceph_test_mount_opt(fsc, opt) \
37 (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt)) 37 (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
38 38
39#define CEPH_RSIZE_DEFAULT (512*1024) /* readahead */ 39#define CEPH_RSIZE_DEFAULT 0 /* max read size */
40#define CEPH_RASIZE_DEFAULT (8192*1024) /* readahead */
40#define CEPH_MAX_READDIR_DEFAULT 1024 41#define CEPH_MAX_READDIR_DEFAULT 1024
41#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024) 42#define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024)
42#define CEPH_SNAPDIRNAME_DEFAULT ".snap" 43#define CEPH_SNAPDIRNAME_DEFAULT ".snap"
@@ -45,8 +46,9 @@ struct ceph_mount_options {
45 int flags; 46 int flags;
46 int sb_flags; 47 int sb_flags;
47 48
48 int wsize; 49 int wsize; /* max write size */
49 int rsize; /* max readahead */ 50 int rsize; /* max read size */
51 int rasize; /* max readahead */
50 int congestion_kb; /* max writeback in flight */ 52 int congestion_kb; /* max writeback in flight */
51 int caps_wanted_delay_min, caps_wanted_delay_max; 53 int caps_wanted_delay_min, caps_wanted_delay_max;
52 int cap_release_safety; 54 int cap_release_safety;
@@ -344,9 +346,10 @@ static inline struct ceph_vino ceph_vino(struct inode *inode)
344 * x86_64+ino32 64 32 346 * x86_64+ino32 64 32
345 * x86_64 64 64 347 * x86_64 64 64
346 */ 348 */
347static inline u32 ceph_ino_to_ino32(ino_t ino) 349static inline u32 ceph_ino_to_ino32(__u64 vino)
348{ 350{
349 ino ^= ino >> (sizeof(ino) * 8 - 32); 351 u32 ino = vino & 0xffffffff;
352 ino ^= vino >> 32;
350 if (!ino) 353 if (!ino)
351 ino = 1; 354 ino = 1;
352 return ino; 355 return ino;
@@ -357,11 +360,11 @@ static inline u32 ceph_ino_to_ino32(ino_t ino)
357 */ 360 */
358static inline ino_t ceph_vino_to_ino(struct ceph_vino vino) 361static inline ino_t ceph_vino_to_ino(struct ceph_vino vino)
359{ 362{
360 ino_t ino = (ino_t)vino.ino; /* ^ (vino.snap << 20); */
361#if BITS_PER_LONG == 32 363#if BITS_PER_LONG == 32
362 ino = ceph_ino_to_ino32(ino); 364 return ceph_ino_to_ino32(vino.ino);
365#else
366 return (ino_t)vino.ino;
363#endif 367#endif
364 return ino;
365} 368}
366 369
367/* 370/*
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 563755181c1e..95bd8502e715 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -215,7 +215,9 @@ extern void ceph_destroy_options(struct ceph_options *opt);
215extern int ceph_compare_options(struct ceph_options *new_opt, 215extern int ceph_compare_options(struct ceph_options *new_opt,
216 struct ceph_client *client); 216 struct ceph_client *client);
217extern struct ceph_client *ceph_create_client(struct ceph_options *opt, 217extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
218 void *private); 218 void *private,
219 unsigned supported_features,
220 unsigned required_features);
219extern u64 ceph_client_id(struct ceph_client *client); 221extern u64 ceph_client_id(struct ceph_client *client);
220extern void ceph_destroy_client(struct ceph_client *client); 222extern void ceph_destroy_client(struct ceph_client *client);
221extern int __ceph_open_session(struct ceph_client *client, 223extern int __ceph_open_session(struct ceph_client *client,
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index ca768ae729b4..ffbeb2c217b4 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -237,7 +237,8 @@ extern void ceph_con_keepalive(struct ceph_connection *con);
237extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); 237extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
238extern void ceph_con_put(struct ceph_connection *con); 238extern void ceph_con_put(struct ceph_connection *con);
239 239
240extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags); 240extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
241 bool can_fail);
241extern void ceph_msg_kfree(struct ceph_msg *m); 242extern void ceph_msg_kfree(struct ceph_msg *m);
242 243
243 244
diff --git a/net/ceph/Kconfig b/net/ceph/Kconfig
index be683f2d401f..cc04dd667a10 100644
--- a/net/ceph/Kconfig
+++ b/net/ceph/Kconfig
@@ -27,3 +27,17 @@ config CEPH_LIB_PRETTYDEBUG
27 27
28 If unsure, say N. 28 If unsure, say N.
29 29
30config CEPH_LIB_USE_DNS_RESOLVER
31 bool "Use in-kernel support for DNS lookup"
32 depends on CEPH_LIB
33 select DNS_RESOLVER
34 default n
35 help
36 If you say Y here, hostnames (e.g. monitor addresses) will
37 be resolved using the CONFIG_DNS_RESOLVER facility.
38
39 For information on how to use CONFIG_DNS_RESOLVER consult
40 Documentation/networking/dns_resolver.txt
41
42 If unsure, say N.
43
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 2883ea01e680..97f70e50ad3b 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -432,9 +432,12 @@ EXPORT_SYMBOL(ceph_client_id);
432/* 432/*
433 * create a fresh client instance 433 * create a fresh client instance
434 */ 434 */
435struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private) 435struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
436 unsigned supported_features,
437 unsigned required_features)
436{ 438{
437 struct ceph_client *client; 439 struct ceph_client *client;
440 struct ceph_entity_addr *myaddr = NULL;
438 int err = -ENOMEM; 441 int err = -ENOMEM;
439 442
440 client = kzalloc(sizeof(*client), GFP_KERNEL); 443 client = kzalloc(sizeof(*client), GFP_KERNEL);
@@ -449,15 +452,27 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
449 client->auth_err = 0; 452 client->auth_err = 0;
450 453
451 client->extra_mon_dispatch = NULL; 454 client->extra_mon_dispatch = NULL;
452 client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT; 455 client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT |
453 client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT; 456 supported_features;
454 457 client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT |
455 client->msgr = NULL; 458 required_features;
459
460 /* msgr */
461 if (ceph_test_opt(client, MYIP))
462 myaddr = &client->options->my_addr;
463 client->msgr = ceph_messenger_create(myaddr,
464 client->supported_features,
465 client->required_features);
466 if (IS_ERR(client->msgr)) {
467 err = PTR_ERR(client->msgr);
468 goto fail;
469 }
470 client->msgr->nocrc = ceph_test_opt(client, NOCRC);
456 471
457 /* subsystems */ 472 /* subsystems */
458 err = ceph_monc_init(&client->monc, client); 473 err = ceph_monc_init(&client->monc, client);
459 if (err < 0) 474 if (err < 0)
460 goto fail; 475 goto fail_msgr;
461 err = ceph_osdc_init(&client->osdc, client); 476 err = ceph_osdc_init(&client->osdc, client);
462 if (err < 0) 477 if (err < 0)
463 goto fail_monc; 478 goto fail_monc;
@@ -466,6 +481,8 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
466 481
467fail_monc: 482fail_monc:
468 ceph_monc_stop(&client->monc); 483 ceph_monc_stop(&client->monc);
484fail_msgr:
485 ceph_messenger_destroy(client->msgr);
469fail: 486fail:
470 kfree(client); 487 kfree(client);
471 return ERR_PTR(err); 488 return ERR_PTR(err);
@@ -490,8 +507,7 @@ void ceph_destroy_client(struct ceph_client *client)
490 507
491 ceph_debugfs_client_cleanup(client); 508 ceph_debugfs_client_cleanup(client);
492 509
493 if (client->msgr) 510 ceph_messenger_destroy(client->msgr);
494 ceph_messenger_destroy(client->msgr);
495 511
496 ceph_destroy_options(client->options); 512 ceph_destroy_options(client->options);
497 513
@@ -514,24 +530,9 @@ static int have_mon_and_osd_map(struct ceph_client *client)
514 */ 530 */
515int __ceph_open_session(struct ceph_client *client, unsigned long started) 531int __ceph_open_session(struct ceph_client *client, unsigned long started)
516{ 532{
517 struct ceph_entity_addr *myaddr = NULL;
518 int err; 533 int err;
519 unsigned long timeout = client->options->mount_timeout * HZ; 534 unsigned long timeout = client->options->mount_timeout * HZ;
520 535
521 /* initialize the messenger */
522 if (client->msgr == NULL) {
523 if (ceph_test_opt(client, MYIP))
524 myaddr = &client->options->my_addr;
525 client->msgr = ceph_messenger_create(myaddr,
526 client->supported_features,
527 client->required_features);
528 if (IS_ERR(client->msgr)) {
529 client->msgr = NULL;
530 return PTR_ERR(client->msgr);
531 }
532 client->msgr->nocrc = ceph_test_opt(client, NOCRC);
533 }
534
535 /* open session, and wait for mon and osd maps */ 536 /* open session, and wait for mon and osd maps */
536 err = ceph_monc_open_session(&client->monc); 537 err = ceph_monc_open_session(&client->monc);
537 if (err < 0) 538 if (err < 0)
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9918e9eb276e..f466930e26fa 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -11,6 +11,7 @@
11#include <linux/string.h> 11#include <linux/string.h>
12#include <linux/bio.h> 12#include <linux/bio.h>
13#include <linux/blkdev.h> 13#include <linux/blkdev.h>
14#include <linux/dns_resolver.h>
14#include <net/tcp.h> 15#include <net/tcp.h>
15 16
16#include <linux/ceph/libceph.h> 17#include <linux/ceph/libceph.h>
@@ -1078,6 +1079,101 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)
1078} 1079}
1079 1080
1080/* 1081/*
1082 * Unlike other *_pton function semantics, zero indicates success.
1083 */
1084static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
1085 char delim, const char **ipend)
1086{
1087 struct sockaddr_in *in4 = (void *)ss;
1088 struct sockaddr_in6 *in6 = (void *)ss;
1089
1090 memset(ss, 0, sizeof(*ss));
1091
1092 if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
1093 ss->ss_family = AF_INET;
1094 return 0;
1095 }
1096
1097 if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
1098 ss->ss_family = AF_INET6;
1099 return 0;
1100 }
1101
1102 return -EINVAL;
1103}
1104
1105/*
1106 * Extract hostname string and resolve using kernel DNS facility.
1107 */
1108#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
1109static int ceph_dns_resolve_name(const char *name, size_t namelen,
1110 struct sockaddr_storage *ss, char delim, const char **ipend)
1111{
1112 const char *end, *delim_p;
1113 char *colon_p, *ip_addr = NULL;
1114 int ip_len, ret;
1115
1116 /*
1117 * The end of the hostname occurs immediately preceding the delimiter or
1118 * the port marker (':') where the delimiter takes precedence.
1119 */
1120 delim_p = memchr(name, delim, namelen);
1121 colon_p = memchr(name, ':', namelen);
1122
1123 if (delim_p && colon_p)
1124 end = delim_p < colon_p ? delim_p : colon_p;
1125 else if (!delim_p && colon_p)
1126 end = colon_p;
1127 else {
1128 end = delim_p;
1129 if (!end) /* case: hostname:/ */
1130 end = name + namelen;
1131 }
1132
1133 if (end <= name)
1134 return -EINVAL;
1135
1136 /* do dns_resolve upcall */
1137 ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
1138 if (ip_len > 0)
1139 ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
1140 else
1141 ret = -ESRCH;
1142
1143 kfree(ip_addr);
1144
1145 *ipend = end;
1146
1147 pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
1148 ret, ret ? "failed" : ceph_pr_addr(ss));
1149
1150 return ret;
1151}
1152#else
1153static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
1154 struct sockaddr_storage *ss, char delim, const char **ipend)
1155{
1156 return -EINVAL;
1157}
1158#endif
1159
1160/*
1161 * Parse a server name (IP or hostname). If a valid IP address is not found
1162 * then try to extract a hostname to resolve using userspace DNS upcall.
1163 */
1164static int ceph_parse_server_name(const char *name, size_t namelen,
1165 struct sockaddr_storage *ss, char delim, const char **ipend)
1166{
1167 int ret;
1168
1169 ret = ceph_pton(name, namelen, ss, delim, ipend);
1170 if (ret)
1171 ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
1172
1173 return ret;
1174}
1175
1176/*
1081 * Parse an ip[:port] list into an addr array. Use the default 1177 * Parse an ip[:port] list into an addr array. Use the default
1082 * monitor port if a port isn't specified. 1178 * monitor port if a port isn't specified.
1083 */ 1179 */
@@ -1085,15 +1181,13 @@ int ceph_parse_ips(const char *c, const char *end,
1085 struct ceph_entity_addr *addr, 1181 struct ceph_entity_addr *addr,
1086 int max_count, int *count) 1182 int max_count, int *count)
1087{ 1183{
1088 int i; 1184 int i, ret = -EINVAL;
1089 const char *p = c; 1185 const char *p = c;
1090 1186
1091 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1187 dout("parse_ips on '%.*s'\n", (int)(end-c), c);
1092 for (i = 0; i < max_count; i++) { 1188 for (i = 0; i < max_count; i++) {
1093 const char *ipend; 1189 const char *ipend;
1094 struct sockaddr_storage *ss = &addr[i].in_addr; 1190 struct sockaddr_storage *ss = &addr[i].in_addr;
1095 struct sockaddr_in *in4 = (void *)ss;
1096 struct sockaddr_in6 *in6 = (void *)ss;
1097 int port; 1191 int port;
1098 char delim = ','; 1192 char delim = ',';
1099 1193
@@ -1102,15 +1196,11 @@ int ceph_parse_ips(const char *c, const char *end,
1102 p++; 1196 p++;
1103 } 1197 }
1104 1198
1105 memset(ss, 0, sizeof(*ss)); 1199 ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
1106 if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr, 1200 if (ret)
1107 delim, &ipend))
1108 ss->ss_family = AF_INET;
1109 else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
1110 delim, &ipend))
1111 ss->ss_family = AF_INET6;
1112 else
1113 goto bad; 1201 goto bad;
1202 ret = -EINVAL;
1203
1114 p = ipend; 1204 p = ipend;
1115 1205
1116 if (delim == ']') { 1206 if (delim == ']') {
@@ -1155,7 +1245,7 @@ int ceph_parse_ips(const char *c, const char *end,
1155 1245
1156bad: 1246bad:
1157 pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); 1247 pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
1158 return -EINVAL; 1248 return ret;
1159} 1249}
1160EXPORT_SYMBOL(ceph_parse_ips); 1250EXPORT_SYMBOL(ceph_parse_ips);
1161 1251
@@ -2281,7 +2371,8 @@ EXPORT_SYMBOL(ceph_con_keepalive);
2281 * construct a new message with given type, size 2371 * construct a new message with given type, size
2282 * the new msg has a ref count of 1. 2372 * the new msg has a ref count of 1.
2283 */ 2373 */
2284struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) 2374struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2375 bool can_fail)
2285{ 2376{
2286 struct ceph_msg *m; 2377 struct ceph_msg *m;
2287 2378
@@ -2333,7 +2424,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2333 m->front.iov_base = kmalloc(front_len, flags); 2424 m->front.iov_base = kmalloc(front_len, flags);
2334 } 2425 }
2335 if (m->front.iov_base == NULL) { 2426 if (m->front.iov_base == NULL) {
2336 pr_err("msg_new can't allocate %d bytes\n", 2427 dout("ceph_msg_new can't allocate %d bytes\n",
2337 front_len); 2428 front_len);
2338 goto out2; 2429 goto out2;
2339 } 2430 }
@@ -2348,7 +2439,14 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2348out2: 2439out2:
2349 ceph_msg_put(m); 2440 ceph_msg_put(m);
2350out: 2441out:
2351 pr_err("msg_new can't create type %d front %d\n", type, front_len); 2442 if (!can_fail) {
2443 pr_err("msg_new can't create type %d front %d\n", type,
2444 front_len);
2445 WARN_ON(1);
2446 } else {
2447 dout("msg_new can't create type %d front %d\n", type,
2448 front_len);
2449 }
2352 return NULL; 2450 return NULL;
2353} 2451}
2354EXPORT_SYMBOL(ceph_msg_new); 2452EXPORT_SYMBOL(ceph_msg_new);
@@ -2398,7 +2496,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
2398 } 2496 }
2399 if (!msg) { 2497 if (!msg) {
2400 *skip = 0; 2498 *skip = 0;
2401 msg = ceph_msg_new(type, front_len, GFP_NOFS); 2499 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
2402 if (!msg) { 2500 if (!msg) {
2403 pr_err("unable to allocate msg type %d len %d\n", 2501 pr_err("unable to allocate msg type %d len %d\n",
2404 type, front_len); 2502 type, front_len);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index cbe31fa45508..0b62deae42bd 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -116,14 +116,12 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
116 */ 116 */
117static void __close_session(struct ceph_mon_client *monc) 117static void __close_session(struct ceph_mon_client *monc)
118{ 118{
119 if (monc->con) { 119 dout("__close_session closing mon%d\n", monc->cur_mon);
120 dout("__close_session closing mon%d\n", monc->cur_mon); 120 ceph_con_revoke(monc->con, monc->m_auth);
121 ceph_con_revoke(monc->con, monc->m_auth); 121 ceph_con_close(monc->con);
122 ceph_con_close(monc->con); 122 monc->cur_mon = -1;
123 monc->cur_mon = -1; 123 monc->pending_auth = 0;
124 monc->pending_auth = 0; 124 ceph_auth_reset(monc->auth);
125 ceph_auth_reset(monc->auth);
126 }
127} 125}
128 126
129/* 127/*
@@ -302,15 +300,6 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
302 */ 300 */
303int ceph_monc_open_session(struct ceph_mon_client *monc) 301int ceph_monc_open_session(struct ceph_mon_client *monc)
304{ 302{
305 if (!monc->con) {
306 monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
307 if (!monc->con)
308 return -ENOMEM;
309 ceph_con_init(monc->client->msgr, monc->con);
310 monc->con->private = monc;
311 monc->con->ops = &mon_con_ops;
312 }
313
314 mutex_lock(&monc->mutex); 303 mutex_lock(&monc->mutex);
315 __open_session(monc); 304 __open_session(monc);
316 __schedule_delayed(monc); 305 __schedule_delayed(monc);
@@ -528,10 +517,12 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
528 init_completion(&req->completion); 517 init_completion(&req->completion);
529 518
530 err = -ENOMEM; 519 err = -ENOMEM;
531 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS); 520 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
521 true);
532 if (!req->request) 522 if (!req->request)
533 goto out; 523 goto out;
534 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS); 524 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
525 true);
535 if (!req->reply) 526 if (!req->reply)
536 goto out; 527 goto out;
537 528
@@ -626,10 +617,12 @@ int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
626 init_completion(&req->completion); 617 init_completion(&req->completion);
627 618
628 err = -ENOMEM; 619 err = -ENOMEM;
629 req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS); 620 req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
621 true);
630 if (!req->request) 622 if (!req->request)
631 goto out; 623 goto out;
632 req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS); 624 req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
625 true);
633 if (!req->reply) 626 if (!req->reply)
634 goto out; 627 goto out;
635 628
@@ -755,13 +748,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
755 if (err) 748 if (err)
756 goto out; 749 goto out;
757 750
758 monc->con = NULL; 751 /* connection */
752 monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
753 if (!monc->con)
754 goto out_monmap;
755 ceph_con_init(monc->client->msgr, monc->con);
756 monc->con->private = monc;
757 monc->con->ops = &mon_con_ops;
759 758
760 /* authentication */ 759 /* authentication */
761 monc->auth = ceph_auth_init(cl->options->name, 760 monc->auth = ceph_auth_init(cl->options->name,
762 cl->options->key); 761 cl->options->key);
763 if (IS_ERR(monc->auth)) 762 if (IS_ERR(monc->auth)) {
764 return PTR_ERR(monc->auth); 763 err = PTR_ERR(monc->auth);
764 goto out_con;
765 }
765 monc->auth->want_keys = 766 monc->auth->want_keys =
766 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 767 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
767 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 768 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
@@ -770,19 +771,21 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
770 err = -ENOMEM; 771 err = -ENOMEM;
771 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 772 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
772 sizeof(struct ceph_mon_subscribe_ack), 773 sizeof(struct ceph_mon_subscribe_ack),
773 GFP_NOFS); 774 GFP_NOFS, true);
774 if (!monc->m_subscribe_ack) 775 if (!monc->m_subscribe_ack)
775 goto out_monmap; 776 goto out_auth;
776 777
777 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS); 778 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
779 true);
778 if (!monc->m_subscribe) 780 if (!monc->m_subscribe)
779 goto out_subscribe_ack; 781 goto out_subscribe_ack;
780 782
781 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS); 783 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
784 true);
782 if (!monc->m_auth_reply) 785 if (!monc->m_auth_reply)
783 goto out_subscribe; 786 goto out_subscribe;
784 787
785 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS); 788 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
786 monc->pending_auth = 0; 789 monc->pending_auth = 0;
787 if (!monc->m_auth) 790 if (!monc->m_auth)
788 goto out_auth_reply; 791 goto out_auth_reply;
@@ -808,6 +811,10 @@ out_subscribe:
808 ceph_msg_put(monc->m_subscribe); 811 ceph_msg_put(monc->m_subscribe);
809out_subscribe_ack: 812out_subscribe_ack:
810 ceph_msg_put(monc->m_subscribe_ack); 813 ceph_msg_put(monc->m_subscribe_ack);
814out_auth:
815 ceph_auth_destroy(monc->auth);
816out_con:
817 monc->con->ops->put(monc->con);
811out_monmap: 818out_monmap:
812 kfree(monc->monmap); 819 kfree(monc->monmap);
813out: 820out:
@@ -822,11 +829,11 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
822 829
823 mutex_lock(&monc->mutex); 830 mutex_lock(&monc->mutex);
824 __close_session(monc); 831 __close_session(monc);
825 if (monc->con) { 832
826 monc->con->private = NULL; 833 monc->con->private = NULL;
827 monc->con->ops->put(monc->con); 834 monc->con->ops->put(monc->con);
828 monc->con = NULL; 835 monc->con = NULL;
829 } 836
830 mutex_unlock(&monc->mutex); 837 mutex_unlock(&monc->mutex);
831 838
832 ceph_auth_destroy(monc->auth); 839 ceph_auth_destroy(monc->auth);
@@ -973,7 +980,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
973 case CEPH_MSG_MON_MAP: 980 case CEPH_MSG_MON_MAP:
974 case CEPH_MSG_MDS_MAP: 981 case CEPH_MSG_MDS_MAP:
975 case CEPH_MSG_OSD_MAP: 982 case CEPH_MSG_OSD_MAP:
976 m = ceph_msg_new(type, front_len, GFP_NOFS); 983 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
977 break; 984 break;
978 } 985 }
979 986
@@ -1000,7 +1007,7 @@ static void mon_fault(struct ceph_connection *con)
1000 if (!con->private) 1007 if (!con->private)
1001 goto out; 1008 goto out;
1002 1009
1003 if (monc->con && !monc->hunting) 1010 if (!monc->hunting)
1004 pr_info("mon%d %s session lost, " 1011 pr_info("mon%d %s session lost, "
1005 "hunting for new mon\n", monc->cur_mon, 1012 "hunting for new mon\n", monc->cur_mon,
1006 ceph_pr_addr(&monc->con->peer_addr.in_addr)); 1013 ceph_pr_addr(&monc->con->peer_addr.in_addr));
diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c
index 1f4cb30a42c5..11d5f4196a73 100644
--- a/net/ceph/msgpool.c
+++ b/net/ceph/msgpool.c
@@ -12,7 +12,7 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
12 struct ceph_msgpool *pool = arg; 12 struct ceph_msgpool *pool = arg;
13 struct ceph_msg *msg; 13 struct ceph_msg *msg;
14 14
15 msg = ceph_msg_new(0, pool->front_len, gfp_mask); 15 msg = ceph_msg_new(0, pool->front_len, gfp_mask, true);
16 if (!msg) { 16 if (!msg) {
17 dout("msgpool_alloc %s failed\n", pool->name); 17 dout("msgpool_alloc %s failed\n", pool->name);
18 } else { 18 } else {
@@ -61,7 +61,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
61 WARN_ON(1); 61 WARN_ON(1);
62 62
63 /* try to alloc a fresh message */ 63 /* try to alloc a fresh message */
64 return ceph_msg_new(0, front_len, GFP_NOFS); 64 return ceph_msg_new(0, front_len, GFP_NOFS, false);
65 } 65 }
66 66
67 msg = mempool_alloc(pool->pool, GFP_NOFS); 67 msg = mempool_alloc(pool->pool, GFP_NOFS);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 88ad8a2501b5..733e46008b89 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -227,7 +227,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
227 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 227 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
228 else 228 else
229 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 229 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
230 OSD_OPREPLY_FRONT_LEN, gfp_flags); 230 OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
231 if (!msg) { 231 if (!msg) {
232 ceph_osdc_put_request(req); 232 ceph_osdc_put_request(req);
233 return NULL; 233 return NULL;
@@ -250,7 +250,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
250 if (use_mempool) 250 if (use_mempool)
251 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 251 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
252 else 252 else
253 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags); 253 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
254 if (!msg) { 254 if (!msg) {
255 ceph_osdc_put_request(req); 255 ceph_osdc_put_request(req);
256 return NULL; 256 return NULL;
@@ -943,7 +943,7 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger);
943 * Caller should hold map_sem for read and request_mutex. 943 * Caller should hold map_sem for read and request_mutex.
944 */ 944 */
945static int __map_request(struct ceph_osd_client *osdc, 945static int __map_request(struct ceph_osd_client *osdc,
946 struct ceph_osd_request *req) 946 struct ceph_osd_request *req, int force_resend)
947{ 947{
948 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 948 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
949 struct ceph_pg pgid; 949 struct ceph_pg pgid;
@@ -967,7 +967,8 @@ static int __map_request(struct ceph_osd_client *osdc,
967 num = err; 967 num = err;
968 } 968 }
969 969
970 if ((req->r_osd && req->r_osd->o_osd == o && 970 if ((!force_resend &&
971 req->r_osd && req->r_osd->o_osd == o &&
971 req->r_sent >= req->r_osd->o_incarnation && 972 req->r_sent >= req->r_osd->o_incarnation &&
972 req->r_num_pg_osds == num && 973 req->r_num_pg_osds == num &&
973 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 974 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
@@ -1289,18 +1290,18 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
1289 * 1290 *
1290 * Caller should hold map_sem for read and request_mutex. 1291 * Caller should hold map_sem for read and request_mutex.
1291 */ 1292 */
1292static void kick_requests(struct ceph_osd_client *osdc) 1293static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1293{ 1294{
1294 struct ceph_osd_request *req, *nreq; 1295 struct ceph_osd_request *req, *nreq;
1295 struct rb_node *p; 1296 struct rb_node *p;
1296 int needmap = 0; 1297 int needmap = 0;
1297 int err; 1298 int err;
1298 1299
1299 dout("kick_requests\n"); 1300 dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1300 mutex_lock(&osdc->request_mutex); 1301 mutex_lock(&osdc->request_mutex);
1301 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 1302 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1302 req = rb_entry(p, struct ceph_osd_request, r_node); 1303 req = rb_entry(p, struct ceph_osd_request, r_node);
1303 err = __map_request(osdc, req); 1304 err = __map_request(osdc, req, force_resend);
1304 if (err < 0) 1305 if (err < 0)
1305 continue; /* error */ 1306 continue; /* error */
1306 if (req->r_osd == NULL) { 1307 if (req->r_osd == NULL) {
@@ -1318,7 +1319,7 @@ static void kick_requests(struct ceph_osd_client *osdc)
1318 r_linger_item) { 1319 r_linger_item) {
1319 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1320 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1320 1321
1321 err = __map_request(osdc, req); 1322 err = __map_request(osdc, req, force_resend);
1322 if (err == 0) 1323 if (err == 0)
1323 continue; /* no change and no osd was specified */ 1324 continue; /* no change and no osd was specified */
1324 if (err < 0) 1325 if (err < 0)
@@ -1395,7 +1396,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1395 ceph_osdmap_destroy(osdc->osdmap); 1396 ceph_osdmap_destroy(osdc->osdmap);
1396 osdc->osdmap = newmap; 1397 osdc->osdmap = newmap;
1397 } 1398 }
1398 kick_requests(osdc); 1399 kick_requests(osdc, 0);
1399 reset_changed_osds(osdc); 1400 reset_changed_osds(osdc);
1400 } else { 1401 } else {
1401 dout("ignoring incremental map %u len %d\n", 1402 dout("ignoring incremental map %u len %d\n",
@@ -1423,6 +1424,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1423 "older than our %u\n", epoch, maplen, 1424 "older than our %u\n", epoch, maplen,
1424 osdc->osdmap->epoch); 1425 osdc->osdmap->epoch);
1425 } else { 1426 } else {
1427 int skipped_map = 0;
1428
1426 dout("taking full map %u len %d\n", epoch, maplen); 1429 dout("taking full map %u len %d\n", epoch, maplen);
1427 newmap = osdmap_decode(&p, p+maplen); 1430 newmap = osdmap_decode(&p, p+maplen);
1428 if (IS_ERR(newmap)) { 1431 if (IS_ERR(newmap)) {
@@ -1432,9 +1435,12 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1432 BUG_ON(!newmap); 1435 BUG_ON(!newmap);
1433 oldmap = osdc->osdmap; 1436 oldmap = osdc->osdmap;
1434 osdc->osdmap = newmap; 1437 osdc->osdmap = newmap;
1435 if (oldmap) 1438 if (oldmap) {
1439 if (oldmap->epoch + 1 < newmap->epoch)
1440 skipped_map = 1;
1436 ceph_osdmap_destroy(oldmap); 1441 ceph_osdmap_destroy(oldmap);
1437 kick_requests(osdc); 1442 }
1443 kick_requests(osdc, skipped_map);
1438 } 1444 }
1439 p += maplen; 1445 p += maplen;
1440 nr_maps--; 1446 nr_maps--;
@@ -1707,7 +1713,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1707 * the request still han't been touched yet. 1713 * the request still han't been touched yet.
1708 */ 1714 */
1709 if (req->r_sent == 0) { 1715 if (req->r_sent == 0) {
1710 rc = __map_request(osdc, req); 1716 rc = __map_request(osdc, req, 0);
1711 if (rc < 0) { 1717 if (rc < 0) {
1712 if (nofail) { 1718 if (nofail) {
1713 dout("osdc_start_request failed map, " 1719 dout("osdc_start_request failed map, "
@@ -2032,7 +2038,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2032 if (front > req->r_reply->front.iov_len) { 2038 if (front > req->r_reply->front.iov_len) {
2033 pr_warning("get_reply front %d > preallocated %d\n", 2039 pr_warning("get_reply front %d > preallocated %d\n",
2034 front, (int)req->r_reply->front.iov_len); 2040 front, (int)req->r_reply->front.iov_len);
2035 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS); 2041 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2036 if (!m) 2042 if (!m)
2037 goto out; 2043 goto out;
2038 ceph_msg_put(req->r_reply); 2044 ceph_msg_put(req->r_reply);
@@ -2080,7 +2086,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2080 switch (type) { 2086 switch (type) {
2081 case CEPH_MSG_OSD_MAP: 2087 case CEPH_MSG_OSD_MAP:
2082 case CEPH_MSG_WATCH_NOTIFY: 2088 case CEPH_MSG_WATCH_NOTIFY:
2083 return ceph_msg_new(type, front, GFP_NOFS); 2089 return ceph_msg_new(type, front, GFP_NOFS, false);
2084 case CEPH_MSG_OSD_OPREPLY: 2090 case CEPH_MSG_OSD_OPREPLY:
2085 return get_reply(con, hdr, skip); 2091 return get_reply(con, hdr, skip);
2086 default: 2092 default: