aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-10-15 00:46:01 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2014-10-15 00:46:01 -0400
commit6b0490816671b2f4126a99998c9bf3c8c0472de2 (patch)
tree016543455c2bdbe47b422fed6a3b4ffb991c97d6 /net/ceph
parentce9d7f7b45930ed16c512aabcfe651d44f1c8619 (diff)
parent0bc62284ee3f2a228c64902ed818b6ba8e04159b (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "There is the long-awaited discard support for RBD (Guangliang Zhao, Josh Durgin), a pile of RBD bug fixes that didn't belong in late -rc's (Ilya Dryomov, Li RongQing), a pile of fs/ceph bug fixes and performance and debugging improvements (Yan, Zheng, John Spray), and a smattering of cleanups (Chao Yu, Fabian Frederick, Joe Perches)" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits) ceph: fix divide-by-zero in __validate_layout() rbd: rbd workqueues need a resque worker libceph: ceph-msgr workqueue needs a resque worker ceph: fix bool assignments libceph: separate multiple ops with commas in debugfs output libceph: sync osd op definitions in rados.h libceph: remove redundant declaration ceph: additional debugfs output ceph: export ceph_session_state_name function ceph: include the initial ACL in create/mkdir/mknod MDS requests ceph: use pagelist to present MDS request data libceph: reference counting pagelist ceph: fix llistxattr on symlink ceph: send client metadata to MDS ceph: remove redundant code for max file size verification ceph: remove redundant io_iter_advance() ceph: move ceph_find_inode() outside the s_mutex ceph: request xattrs if xattr_version is zero rbd: set the remaining discard properties to enable support rbd: use helpers to handle discard for layered images correctly ...
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/Kconfig1
-rw-r--r--net/ceph/ceph_common.c15
-rw-r--r--net/ceph/ceph_strings.c75
-rw-r--r--net/ceph/debugfs.c3
-rw-r--r--net/ceph/messenger.c28
-rw-r--r--net/ceph/mon_client.c8
-rw-r--r--net/ceph/osd_client.c192
-rw-r--r--net/ceph/osdmap.c52
-rw-r--r--net/ceph/pagelist.c7
9 files changed, 153 insertions, 228 deletions
diff --git a/net/ceph/Kconfig b/net/ceph/Kconfig
index e50cc69ae8ca..f8cceb99e732 100644
--- a/net/ceph/Kconfig
+++ b/net/ceph/Kconfig
@@ -3,6 +3,7 @@ config CEPH_LIB
3 depends on INET 3 depends on INET
4 select LIBCRC32C 4 select LIBCRC32C
5 select CRYPTO_AES 5 select CRYPTO_AES
6 select CRYPTO_CBC
6 select CRYPTO 7 select CRYPTO
7 select KEYS 8 select KEYS
8 default n 9 default n
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 1675021d8c12..58fbfe134f93 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -293,17 +293,20 @@ static int get_secret(struct ceph_crypto_key *dst, const char *name) {
293 key_err = PTR_ERR(ukey); 293 key_err = PTR_ERR(ukey);
294 switch (key_err) { 294 switch (key_err) {
295 case -ENOKEY: 295 case -ENOKEY:
296 pr_warning("ceph: Mount failed due to key not found: %s\n", name); 296 pr_warn("ceph: Mount failed due to key not found: %s\n",
297 name);
297 break; 298 break;
298 case -EKEYEXPIRED: 299 case -EKEYEXPIRED:
299 pr_warning("ceph: Mount failed due to expired key: %s\n", name); 300 pr_warn("ceph: Mount failed due to expired key: %s\n",
301 name);
300 break; 302 break;
301 case -EKEYREVOKED: 303 case -EKEYREVOKED:
302 pr_warning("ceph: Mount failed due to revoked key: %s\n", name); 304 pr_warn("ceph: Mount failed due to revoked key: %s\n",
305 name);
303 break; 306 break;
304 default: 307 default:
305 pr_warning("ceph: Mount failed due to unknown key error" 308 pr_warn("ceph: Mount failed due to unknown key error %d: %s\n",
306 " %d: %s\n", key_err, name); 309 key_err, name);
307 } 310 }
308 err = -EPERM; 311 err = -EPERM;
309 goto out; 312 goto out;
@@ -433,7 +436,7 @@ ceph_parse_options(char *options, const char *dev_name,
433 436
434 /* misc */ 437 /* misc */
435 case Opt_osdtimeout: 438 case Opt_osdtimeout:
436 pr_warning("ignoring deprecated osdtimeout option\n"); 439 pr_warn("ignoring deprecated osdtimeout option\n");
437 break; 440 break;
438 case Opt_osdkeepalivetimeout: 441 case Opt_osdkeepalivetimeout:
439 opt->osd_keepalive_timeout = intval; 442 opt->osd_keepalive_timeout = intval;
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 1348df96fe15..30560202f57b 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -19,77 +19,12 @@ const char *ceph_entity_type_name(int type)
19const char *ceph_osd_op_name(int op) 19const char *ceph_osd_op_name(int op)
20{ 20{
21 switch (op) { 21 switch (op) {
22 case CEPH_OSD_OP_READ: return "read"; 22#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return (str);
23 case CEPH_OSD_OP_STAT: return "stat"; 23__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
24 case CEPH_OSD_OP_MAPEXT: return "mapext"; 24#undef GENERATE_CASE
25 case CEPH_OSD_OP_SPARSE_READ: return "sparse-read"; 25 default:
26 case CEPH_OSD_OP_NOTIFY: return "notify"; 26 return "???";
27 case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
28 case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
29
30 case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
31
32 case CEPH_OSD_OP_CREATE: return "create";
33 case CEPH_OSD_OP_WRITE: return "write";
34 case CEPH_OSD_OP_DELETE: return "delete";
35 case CEPH_OSD_OP_TRUNCATE: return "truncate";
36 case CEPH_OSD_OP_ZERO: return "zero";
37 case CEPH_OSD_OP_WRITEFULL: return "writefull";
38 case CEPH_OSD_OP_ROLLBACK: return "rollback";
39
40 case CEPH_OSD_OP_APPEND: return "append";
41 case CEPH_OSD_OP_STARTSYNC: return "startsync";
42 case CEPH_OSD_OP_SETTRUNC: return "settrunc";
43 case CEPH_OSD_OP_TRIMTRUNC: return "trimtrunc";
44
45 case CEPH_OSD_OP_TMAPUP: return "tmapup";
46 case CEPH_OSD_OP_TMAPGET: return "tmapget";
47 case CEPH_OSD_OP_TMAPPUT: return "tmapput";
48 case CEPH_OSD_OP_WATCH: return "watch";
49
50 case CEPH_OSD_OP_CLONERANGE: return "clonerange";
51 case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
52 case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr";
53
54 case CEPH_OSD_OP_GETXATTR: return "getxattr";
55 case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
56 case CEPH_OSD_OP_SETXATTR: return "setxattr";
57 case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
58 case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
59 case CEPH_OSD_OP_RMXATTR: return "rmxattr";
60 case CEPH_OSD_OP_CMPXATTR: return "cmpxattr";
61
62 case CEPH_OSD_OP_PULL: return "pull";
63 case CEPH_OSD_OP_PUSH: return "push";
64 case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
65 case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
66 case CEPH_OSD_OP_SCRUB: return "scrub";
67 case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve";
68 case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve";
69 case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop";
70 case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map";
71
72 case CEPH_OSD_OP_WRLOCK: return "wrlock";
73 case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
74 case CEPH_OSD_OP_RDLOCK: return "rdlock";
75 case CEPH_OSD_OP_RDUNLOCK: return "rdunlock";
76 case CEPH_OSD_OP_UPLOCK: return "uplock";
77 case CEPH_OSD_OP_DNLOCK: return "dnlock";
78
79 case CEPH_OSD_OP_CALL: return "call";
80
81 case CEPH_OSD_OP_PGLS: return "pgls";
82 case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
83 case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
84 case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
85 case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
86 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys";
87 case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals";
88 case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header";
89 case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear";
90 case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys";
91 } 27 }
92 return "???";
93} 28}
94 29
95const char *ceph_osd_state_name(int s) 30const char *ceph_osd_state_name(int s)
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index d1a62c69a9f4..d2d525529f87 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -169,7 +169,8 @@ static int osdc_show(struct seq_file *s, void *pp)
169 169
170 for (i = 0; i < req->r_num_ops; i++) { 170 for (i = 0; i < req->r_num_ops; i++) {
171 opcode = req->r_ops[i].op; 171 opcode = req->r_ops[i].op;
172 seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); 172 seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
173 ceph_osd_op_name(opcode));
173 } 174 }
174 175
175 seq_printf(s, "\n"); 176 seq_printf(s, "\n");
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index b2f571dd933d..559c9f619c20 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -292,7 +292,11 @@ int ceph_msgr_init(void)
292 if (ceph_msgr_slab_init()) 292 if (ceph_msgr_slab_init())
293 return -ENOMEM; 293 return -ENOMEM;
294 294
295 ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0); 295 /*
296 * The number of active work items is limited by the number of
297 * connections, so leave @max_active at default.
298 */
299 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
296 if (ceph_msgr_wq) 300 if (ceph_msgr_wq)
297 return 0; 301 return 0;
298 302
@@ -1937,11 +1941,11 @@ static int process_banner(struct ceph_connection *con)
1937 sizeof(con->peer_addr)) != 0 && 1941 sizeof(con->peer_addr)) != 0 &&
1938 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1942 !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
1939 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1943 con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
1940 pr_warning("wrong peer, want %s/%d, got %s/%d\n", 1944 pr_warn("wrong peer, want %s/%d, got %s/%d\n",
1941 ceph_pr_addr(&con->peer_addr.in_addr), 1945 ceph_pr_addr(&con->peer_addr.in_addr),
1942 (int)le32_to_cpu(con->peer_addr.nonce), 1946 (int)le32_to_cpu(con->peer_addr.nonce),
1943 ceph_pr_addr(&con->actual_peer_addr.in_addr), 1947 ceph_pr_addr(&con->actual_peer_addr.in_addr),
1944 (int)le32_to_cpu(con->actual_peer_addr.nonce)); 1948 (int)le32_to_cpu(con->actual_peer_addr.nonce));
1945 con->error_msg = "wrong peer at address"; 1949 con->error_msg = "wrong peer at address";
1946 return -1; 1950 return -1;
1947 } 1951 }
@@ -2302,7 +2306,7 @@ static int read_partial_message(struct ceph_connection *con)
2302 2306
2303 BUG_ON(!con->in_msg ^ skip); 2307 BUG_ON(!con->in_msg ^ skip);
2304 if (con->in_msg && data_len > con->in_msg->data_length) { 2308 if (con->in_msg && data_len > con->in_msg->data_length) {
2305 pr_warning("%s skipping long message (%u > %zd)\n", 2309 pr_warn("%s skipping long message (%u > %zd)\n",
2306 __func__, data_len, con->in_msg->data_length); 2310 __func__, data_len, con->in_msg->data_length);
2307 ceph_msg_put(con->in_msg); 2311 ceph_msg_put(con->in_msg);
2308 con->in_msg = NULL; 2312 con->in_msg = NULL;
@@ -2712,7 +2716,7 @@ static bool con_sock_closed(struct ceph_connection *con)
2712 CASE(OPEN); 2716 CASE(OPEN);
2713 CASE(STANDBY); 2717 CASE(STANDBY);
2714 default: 2718 default:
2715 pr_warning("%s con %p unrecognized state %lu\n", 2719 pr_warn("%s con %p unrecognized state %lu\n",
2716 __func__, con, con->state); 2720 __func__, con, con->state);
2717 con->error_msg = "unrecognized con state"; 2721 con->error_msg = "unrecognized con state";
2718 BUG(); 2722 BUG();
@@ -2828,8 +2832,8 @@ static void con_work(struct work_struct *work)
2828 */ 2832 */
2829static void con_fault(struct ceph_connection *con) 2833static void con_fault(struct ceph_connection *con)
2830{ 2834{
2831 pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2835 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2832 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2836 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2833 dout("fault %p state %lu to peer %s\n", 2837 dout("fault %p state %lu to peer %s\n",
2834 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2838 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2835 2839
@@ -3071,10 +3075,8 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
3071 return; 3075 return;
3072 3076
3073 WARN_ON(!list_empty(&data->links)); 3077 WARN_ON(!list_empty(&data->links));
3074 if (data->type == CEPH_MSG_DATA_PAGELIST) { 3078 if (data->type == CEPH_MSG_DATA_PAGELIST)
3075 ceph_pagelist_release(data->pagelist); 3079 ceph_pagelist_release(data->pagelist);
3076 kfree(data->pagelist);
3077 }
3078 kmem_cache_free(ceph_msg_data_cache, data); 3080 kmem_cache_free(ceph_msg_data_cache, data);
3079} 3081}
3080 3082
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 61fcfc304f68..a83062ceeec9 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -1182,10 +1182,10 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1182 pr_info("alloc_msg unknown type %d\n", type); 1182 pr_info("alloc_msg unknown type %d\n", type);
1183 *skip = 1; 1183 *skip = 1;
1184 } else if (front_len > m->front_alloc_len) { 1184 } else if (front_len > m->front_alloc_len) {
1185 pr_warning("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1185 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1186 front_len, m->front_alloc_len, 1186 front_len, m->front_alloc_len,
1187 (unsigned int)con->peer_name.type, 1187 (unsigned int)con->peer_name.type,
1188 le64_to_cpu(con->peer_name.num)); 1188 le64_to_cpu(con->peer_name.num));
1189 ceph_msg_put(m); 1189 ceph_msg_put(m);
1190 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1190 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1191 } 1191 }
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 30f6faf3584f..f3fc54eac09d 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -30,8 +30,11 @@ static void __send_queued(struct ceph_osd_client *osdc);
30static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 30static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
31static void __register_request(struct ceph_osd_client *osdc, 31static void __register_request(struct ceph_osd_client *osdc,
32 struct ceph_osd_request *req); 32 struct ceph_osd_request *req);
33static void __unregister_request(struct ceph_osd_client *osdc,
34 struct ceph_osd_request *req);
33static void __unregister_linger_request(struct ceph_osd_client *osdc, 35static void __unregister_linger_request(struct ceph_osd_client *osdc,
34 struct ceph_osd_request *req); 36 struct ceph_osd_request *req);
37static void __enqueue_request(struct ceph_osd_request *req);
35static void __send_request(struct ceph_osd_client *osdc, 38static void __send_request(struct ceph_osd_client *osdc,
36 struct ceph_osd_request *req); 39 struct ceph_osd_request *req);
37 40
@@ -428,68 +431,9 @@ EXPORT_SYMBOL(ceph_osdc_alloc_request);
428static bool osd_req_opcode_valid(u16 opcode) 431static bool osd_req_opcode_valid(u16 opcode)
429{ 432{
430 switch (opcode) { 433 switch (opcode) {
431 case CEPH_OSD_OP_READ: 434#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true;
432 case CEPH_OSD_OP_STAT: 435__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
433 case CEPH_OSD_OP_MAPEXT: 436#undef GENERATE_CASE
434 case CEPH_OSD_OP_MASKTRUNC:
435 case CEPH_OSD_OP_SPARSE_READ:
436 case CEPH_OSD_OP_NOTIFY:
437 case CEPH_OSD_OP_NOTIFY_ACK:
438 case CEPH_OSD_OP_ASSERT_VER:
439 case CEPH_OSD_OP_WRITE:
440 case CEPH_OSD_OP_WRITEFULL:
441 case CEPH_OSD_OP_TRUNCATE:
442 case CEPH_OSD_OP_ZERO:
443 case CEPH_OSD_OP_DELETE:
444 case CEPH_OSD_OP_APPEND:
445 case CEPH_OSD_OP_STARTSYNC:
446 case CEPH_OSD_OP_SETTRUNC:
447 case CEPH_OSD_OP_TRIMTRUNC:
448 case CEPH_OSD_OP_TMAPUP:
449 case CEPH_OSD_OP_TMAPPUT:
450 case CEPH_OSD_OP_TMAPGET:
451 case CEPH_OSD_OP_CREATE:
452 case CEPH_OSD_OP_ROLLBACK:
453 case CEPH_OSD_OP_WATCH:
454 case CEPH_OSD_OP_OMAPGETKEYS:
455 case CEPH_OSD_OP_OMAPGETVALS:
456 case CEPH_OSD_OP_OMAPGETHEADER:
457 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
458 case CEPH_OSD_OP_OMAPSETVALS:
459 case CEPH_OSD_OP_OMAPSETHEADER:
460 case CEPH_OSD_OP_OMAPCLEAR:
461 case CEPH_OSD_OP_OMAPRMKEYS:
462 case CEPH_OSD_OP_OMAP_CMP:
463 case CEPH_OSD_OP_SETALLOCHINT:
464 case CEPH_OSD_OP_CLONERANGE:
465 case CEPH_OSD_OP_ASSERT_SRC_VERSION:
466 case CEPH_OSD_OP_SRC_CMPXATTR:
467 case CEPH_OSD_OP_GETXATTR:
468 case CEPH_OSD_OP_GETXATTRS:
469 case CEPH_OSD_OP_CMPXATTR:
470 case CEPH_OSD_OP_SETXATTR:
471 case CEPH_OSD_OP_SETXATTRS:
472 case CEPH_OSD_OP_RESETXATTRS:
473 case CEPH_OSD_OP_RMXATTR:
474 case CEPH_OSD_OP_PULL:
475 case CEPH_OSD_OP_PUSH:
476 case CEPH_OSD_OP_BALANCEREADS:
477 case CEPH_OSD_OP_UNBALANCEREADS:
478 case CEPH_OSD_OP_SCRUB:
479 case CEPH_OSD_OP_SCRUB_RESERVE:
480 case CEPH_OSD_OP_SCRUB_UNRESERVE:
481 case CEPH_OSD_OP_SCRUB_STOP:
482 case CEPH_OSD_OP_SCRUB_MAP:
483 case CEPH_OSD_OP_WRLOCK:
484 case CEPH_OSD_OP_WRUNLOCK:
485 case CEPH_OSD_OP_RDLOCK:
486 case CEPH_OSD_OP_RDUNLOCK:
487 case CEPH_OSD_OP_UPLOCK:
488 case CEPH_OSD_OP_DNLOCK:
489 case CEPH_OSD_OP_CALL:
490 case CEPH_OSD_OP_PGLS:
491 case CEPH_OSD_OP_PGLS_FILTER:
492 return true;
493 default: 437 default:
494 return false; 438 return false;
495 } 439 }
@@ -892,6 +836,37 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
892 return NULL; 836 return NULL;
893} 837}
894 838
839static void __kick_linger_request(struct ceph_osd_request *req)
840{
841 struct ceph_osd_client *osdc = req->r_osdc;
842 struct ceph_osd *osd = req->r_osd;
843
844 /*
845 * Linger requests need to be resent with a new tid to avoid
846 * the dup op detection logic on the OSDs. Achieve this with
847 * a re-register dance instead of open-coding.
848 */
849 ceph_osdc_get_request(req);
850 if (!list_empty(&req->r_linger_item))
851 __unregister_linger_request(osdc, req);
852 else
853 __unregister_request(osdc, req);
854 __register_request(osdc, req);
855 ceph_osdc_put_request(req);
856
857 /*
858 * Unless request has been registered as both normal and
859 * lingering, __unregister{,_linger}_request clears r_osd.
860 * However, here we need to preserve r_osd to make sure we
861 * requeue on the same OSD.
862 */
863 WARN_ON(req->r_osd || !osd);
864 req->r_osd = osd;
865
866 dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
867 __enqueue_request(req);
868}
869
895/* 870/*
896 * Resubmit requests pending on the given osd. 871 * Resubmit requests pending on the given osd.
897 */ 872 */
@@ -900,12 +875,14 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
900{ 875{
901 struct ceph_osd_request *req, *nreq; 876 struct ceph_osd_request *req, *nreq;
902 LIST_HEAD(resend); 877 LIST_HEAD(resend);
878 LIST_HEAD(resend_linger);
903 int err; 879 int err;
904 880
905 dout("__kick_osd_requests osd%d\n", osd->o_osd); 881 dout("%s osd%d\n", __func__, osd->o_osd);
906 err = __reset_osd(osdc, osd); 882 err = __reset_osd(osdc, osd);
907 if (err) 883 if (err)
908 return; 884 return;
885
909 /* 886 /*
910 * Build up a list of requests to resend by traversing the 887 * Build up a list of requests to resend by traversing the
911 * osd's list of requests. Requests for a given object are 888 * osd's list of requests. Requests for a given object are
@@ -926,33 +903,32 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
926 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 903 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
927 if (!req->r_sent) 904 if (!req->r_sent)
928 break; 905 break;
929 list_move_tail(&req->r_req_lru_item, &resend); 906
930 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, 907 if (!req->r_linger) {
931 osd->o_osd); 908 dout("%s requeueing %p tid %llu\n", __func__, req,
932 if (!req->r_linger) 909 req->r_tid);
910 list_move_tail(&req->r_req_lru_item, &resend);
933 req->r_flags |= CEPH_OSD_FLAG_RETRY; 911 req->r_flags |= CEPH_OSD_FLAG_RETRY;
912 } else {
913 list_move_tail(&req->r_req_lru_item, &resend_linger);
914 }
934 } 915 }
935 list_splice(&resend, &osdc->req_unsent); 916 list_splice(&resend, &osdc->req_unsent);
936 917
937 /* 918 /*
938 * Linger requests are re-registered before sending, which 919 * Both registered and not yet registered linger requests are
939 * sets up a new tid for each. We add them to the unsent 920 * enqueued with a new tid on the same OSD. We add/move them
940 * list at the end to keep things in tid order. 921 * to req_unsent/o_requests at the end to keep things in tid
922 * order.
941 */ 923 */
942 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 924 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
943 r_linger_osd_item) { 925 r_linger_osd_item) {
944 /* 926 WARN_ON(!list_empty(&req->r_req_lru_item));
945 * reregister request prior to unregistering linger so 927 __kick_linger_request(req);
946 * that r_osd is preserved.
947 */
948 BUG_ON(!list_empty(&req->r_req_lru_item));
949 __register_request(osdc, req);
950 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
951 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
952 __unregister_linger_request(osdc, req);
953 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
954 osd->o_osd);
955 } 928 }
929
930 list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
931 __kick_linger_request(req);
956} 932}
957 933
958/* 934/*
@@ -1346,6 +1322,22 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
1346 &req->r_target_oid, pg_out); 1322 &req->r_target_oid, pg_out);
1347} 1323}
1348 1324
1325static void __enqueue_request(struct ceph_osd_request *req)
1326{
1327 struct ceph_osd_client *osdc = req->r_osdc;
1328
1329 dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
1330 req->r_osd ? req->r_osd->o_osd : -1);
1331
1332 if (req->r_osd) {
1333 __remove_osd_from_lru(req->r_osd);
1334 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
1335 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
1336 } else {
1337 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
1338 }
1339}
1340
1349/* 1341/*
1350 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1342 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
1351 * (as needed), and set the request r_osd appropriately. If there is 1343 * (as needed), and set the request r_osd appropriately. If there is
@@ -1423,13 +1415,7 @@ static int __map_request(struct ceph_osd_client *osdc,
1423 &osdc->osdmap->osd_addr[o]); 1415 &osdc->osdmap->osd_addr[o]);
1424 } 1416 }
1425 1417
1426 if (req->r_osd) { 1418 __enqueue_request(req);
1427 __remove_osd_from_lru(req->r_osd);
1428 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
1429 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
1430 } else {
1431 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
1432 }
1433 err = 1; /* osd or pg changed */ 1419 err = 1; /* osd or pg changed */
1434 1420
1435out: 1421out:
@@ -1774,8 +1760,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1774 } 1760 }
1775 bytes = le32_to_cpu(msg->hdr.data_len); 1761 bytes = le32_to_cpu(msg->hdr.data_len);
1776 if (payload_len != bytes) { 1762 if (payload_len != bytes) {
1777 pr_warning("sum of op payload lens %d != data_len %d", 1763 pr_warn("sum of op payload lens %d != data_len %d\n",
1778 payload_len, bytes); 1764 payload_len, bytes);
1779 goto bad_put; 1765 goto bad_put;
1780 } 1766 }
1781 1767
@@ -2313,24 +2299,19 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
2313 if (event) { 2299 if (event) {
2314 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 2300 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
2315 if (!event_work) { 2301 if (!event_work) {
2316 dout("ERROR: could not allocate event_work\n"); 2302 pr_err("couldn't allocate event_work\n");
2317 goto done_err; 2303 ceph_osdc_put_event(event);
2304 return;
2318 } 2305 }
2319 INIT_WORK(&event_work->work, do_event_work); 2306 INIT_WORK(&event_work->work, do_event_work);
2320 event_work->event = event; 2307 event_work->event = event;
2321 event_work->ver = ver; 2308 event_work->ver = ver;
2322 event_work->notify_id = notify_id; 2309 event_work->notify_id = notify_id;
2323 event_work->opcode = opcode; 2310 event_work->opcode = opcode;
2324 if (!queue_work(osdc->notify_wq, &event_work->work)) {
2325 dout("WARNING: failed to queue notify event work\n");
2326 goto done_err;
2327 }
2328 }
2329 2311
2330 return; 2312 queue_work(osdc->notify_wq, &event_work->work);
2313 }
2331 2314
2332done_err:
2333 ceph_osdc_put_event(event);
2334 return; 2315 return;
2335 2316
2336bad: 2317bad:
@@ -2797,10 +2778,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2797 ceph_msg_revoke_incoming(req->r_reply); 2778 ceph_msg_revoke_incoming(req->r_reply);
2798 2779
2799 if (front_len > req->r_reply->front_alloc_len) { 2780 if (front_len > req->r_reply->front_alloc_len) {
2800 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", 2781 pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n",
2801 front_len, req->r_reply->front_alloc_len, 2782 front_len, req->r_reply->front_alloc_len,
2802 (unsigned int)con->peer_name.type, 2783 (unsigned int)con->peer_name.type,
2803 le64_to_cpu(con->peer_name.num)); 2784 le64_to_cpu(con->peer_name.num));
2804 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 2785 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
2805 false); 2786 false);
2806 if (!m) 2787 if (!m)
@@ -2823,8 +2804,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2823 if (osd_data->pages && 2804 if (osd_data->pages &&
2824 unlikely(osd_data->length < data_len)) { 2805 unlikely(osd_data->length < data_len)) {
2825 2806
2826 pr_warning("tid %lld reply has %d bytes " 2807 pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n",
2827 "we had only %llu bytes ready\n",
2828 tid, data_len, osd_data->length); 2808 tid, data_len, osd_data->length);
2829 *skip = 1; 2809 *skip = 1;
2830 ceph_msg_put(m); 2810 ceph_msg_put(m);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index c547e46084d3..b8c3fde5b04f 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -521,11 +521,11 @@ static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
521 ev = ceph_decode_8(p); /* encoding version */ 521 ev = ceph_decode_8(p); /* encoding version */
522 cv = ceph_decode_8(p); /* compat version */ 522 cv = ceph_decode_8(p); /* compat version */
523 if (ev < 5) { 523 if (ev < 5) {
524 pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); 524 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
525 return -EINVAL; 525 return -EINVAL;
526 } 526 }
527 if (cv > 9) { 527 if (cv > 9) {
528 pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv); 528 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
529 return -EINVAL; 529 return -EINVAL;
530 } 530 }
531 len = ceph_decode_32(p); 531 len = ceph_decode_32(p);
@@ -671,26 +671,26 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
671 int i; 671 int i;
672 672
673 state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS); 673 state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
674 if (!state)
675 return -ENOMEM;
676 map->osd_state = state;
677
674 weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS); 678 weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
675 addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS); 679 if (!weight)
676 if (!state || !weight || !addr) { 680 return -ENOMEM;
677 kfree(state); 681 map->osd_weight = weight;
678 kfree(weight);
679 kfree(addr);
680 682
683 addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
684 if (!addr)
681 return -ENOMEM; 685 return -ENOMEM;
682 } 686 map->osd_addr = addr;
683 687
684 for (i = map->max_osd; i < max; i++) { 688 for (i = map->max_osd; i < max; i++) {
685 state[i] = 0; 689 map->osd_state[i] = 0;
686 weight[i] = CEPH_OSD_OUT; 690 map->osd_weight[i] = CEPH_OSD_OUT;
687 memset(addr + i, 0, sizeof(*addr)); 691 memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
688 } 692 }
689 693
690 map->osd_state = state;
691 map->osd_weight = weight;
692 map->osd_addr = addr;
693
694 if (map->osd_primary_affinity) { 694 if (map->osd_primary_affinity) {
695 u32 *affinity; 695 u32 *affinity;
696 696
@@ -698,11 +698,11 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
698 max*sizeof(*affinity), GFP_NOFS); 698 max*sizeof(*affinity), GFP_NOFS);
699 if (!affinity) 699 if (!affinity)
700 return -ENOMEM; 700 return -ENOMEM;
701 map->osd_primary_affinity = affinity;
701 702
702 for (i = map->max_osd; i < max; i++) 703 for (i = map->max_osd; i < max; i++)
703 affinity[i] = CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 704 map->osd_primary_affinity[i] =
704 705 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
705 map->osd_primary_affinity = affinity;
706 } 706 }
707 707
708 map->max_osd = max; 708 map->max_osd = max;
@@ -729,9 +729,9 @@ static int get_osdmap_client_data_v(void **p, void *end,
729 729
730 ceph_decode_8_safe(p, end, struct_compat, e_inval); 730 ceph_decode_8_safe(p, end, struct_compat, e_inval);
731 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) { 731 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
732 pr_warning("got v %d cv %d > %d of %s ceph_osdmap\n", 732 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
733 struct_v, struct_compat, 733 struct_v, struct_compat,
734 OSDMAP_WRAPPER_COMPAT_VER, prefix); 734 OSDMAP_WRAPPER_COMPAT_VER, prefix);
735 return -EINVAL; 735 return -EINVAL;
736 } 736 }
737 *p += 4; /* ignore wrapper struct_len */ 737 *p += 4; /* ignore wrapper struct_len */
@@ -739,9 +739,9 @@ static int get_osdmap_client_data_v(void **p, void *end,
739 ceph_decode_8_safe(p, end, struct_v, e_inval); 739 ceph_decode_8_safe(p, end, struct_v, e_inval);
740 ceph_decode_8_safe(p, end, struct_compat, e_inval); 740 ceph_decode_8_safe(p, end, struct_compat, e_inval);
741 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) { 741 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
742 pr_warning("got v %d cv %d > %d of %s ceph_osdmap client data\n", 742 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
743 struct_v, struct_compat, 743 struct_v, struct_compat,
744 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix); 744 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
745 return -EINVAL; 745 return -EINVAL;
746 } 746 }
747 *p += 4; /* ignore client data struct_len */ 747 *p += 4; /* ignore client data struct_len */
@@ -751,8 +751,8 @@ static int get_osdmap_client_data_v(void **p, void *end,
751 *p -= 1; 751 *p -= 1;
752 ceph_decode_16_safe(p, end, version, e_inval); 752 ceph_decode_16_safe(p, end, version, e_inval);
753 if (version < 6) { 753 if (version < 6) {
754 pr_warning("got v %d < 6 of %s ceph_osdmap\n", version, 754 pr_warn("got v %d < 6 of %s ceph_osdmap\n",
755 prefix); 755 version, prefix);
756 return -EINVAL; 756 return -EINVAL;
757 } 757 }
758 758
diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c
index 92866bebb65f..c7c220a736e5 100644
--- a/net/ceph/pagelist.c
+++ b/net/ceph/pagelist.c
@@ -1,5 +1,6 @@
1#include <linux/module.h> 1#include <linux/module.h>
2#include <linux/gfp.h> 2#include <linux/gfp.h>
3#include <linux/slab.h>
3#include <linux/pagemap.h> 4#include <linux/pagemap.h>
4#include <linux/highmem.h> 5#include <linux/highmem.h>
5#include <linux/ceph/pagelist.h> 6#include <linux/ceph/pagelist.h>
@@ -13,8 +14,10 @@ static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
13 } 14 }
14} 15}
15 16
16int ceph_pagelist_release(struct ceph_pagelist *pl) 17void ceph_pagelist_release(struct ceph_pagelist *pl)
17{ 18{
19 if (!atomic_dec_and_test(&pl->refcnt))
20 return;
18 ceph_pagelist_unmap_tail(pl); 21 ceph_pagelist_unmap_tail(pl);
19 while (!list_empty(&pl->head)) { 22 while (!list_empty(&pl->head)) {
20 struct page *page = list_first_entry(&pl->head, struct page, 23 struct page *page = list_first_entry(&pl->head, struct page,
@@ -23,7 +26,7 @@ int ceph_pagelist_release(struct ceph_pagelist *pl)
23 __free_page(page); 26 __free_page(page);
24 } 27 }
25 ceph_pagelist_free_reserve(pl); 28 ceph_pagelist_free_reserve(pl);
26 return 0; 29 kfree(pl);
27} 30}
28EXPORT_SYMBOL(ceph_pagelist_release); 31EXPORT_SYMBOL(ceph_pagelist_release);
29 32