diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2012-07-31 17:35:28 -0400 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2012-07-31 17:35:28 -0400 |
commit | cc8362b1f6d724e46f515121d442779924b19fec (patch) | |
tree | 86fb5c3767e538ec9ded57dd7b3ce5d69dcde691 /net/ceph | |
parent | 2e3ee613480563a6d5c01b57d342e65cc58c06df (diff) | |
parent | 1fe5e9932156f6122c3b1ff6ba7541c27c86718c (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil:
"Lots of stuff this time around:
- lots of cleanup and refactoring in the libceph messenger code, and
many hard to hit races and bugs closed as a result.
- lots of cleanup and refactoring in the rbd code from Alex Elder,
mostly in preparation for the layering functionality that will be
coming in 3.7.
- some misc rbd cleanups from Josh Durgin that are finally going
upstream
- support for CRUSH tunables (used by newer clusters to improve the
data placement)
- some cleanup in our use of d_parent that Al brought up a while back
- a random collection of fixes across the tree
There is another patch coming that fixes up our ->atomic_open()
behavior, but I'm going to hammer on it a bit more before sending it."
Fix up conflicts due to commits that were already committed earlier in
drivers/block/rbd.c, net/ceph/{messenger.c, osd_client.c}
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (132 commits)
rbd: create rbd_refresh_helper()
rbd: return obj version in __rbd_refresh_header()
rbd: fixes in rbd_header_from_disk()
rbd: always pass ops array to rbd_req_sync_op()
rbd: pass null version pointer in add_snap()
rbd: make rbd_create_rw_ops() return a pointer
rbd: have __rbd_add_snap_dev() return a pointer
libceph: recheck con state after allocating incoming message
libceph: change ceph_con_in_msg_alloc convention to be less weird
libceph: avoid dropping con mutex before fault
libceph: verify state after retaking con lock after dispatch
libceph: revoke mon_client messages on session restart
libceph: fix handling of immediate socket connect failure
ceph: update MAINTAINERS file
libceph: be less chatty about stray replies
libceph: clear all flags on con_close
libceph: clean up con flags
libceph: replace connection state bits with states
libceph: drop unnecessary CLOSED check in socket state change callback
libceph: close socket directly from ceph_con_close()
...
Diffstat (limited to 'net/ceph')
-rw-r--r-- | net/ceph/ceph_common.c | 25 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 13 | ||||
-rw-r--r-- | net/ceph/messenger.c | 925 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 76 | ||||
-rw-r--r-- | net/ceph/msgpool.c | 7 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 77 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 59 |
7 files changed, 737 insertions, 445 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index ba4323bce0e..69e38db28e5 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c | |||
@@ -17,6 +17,7 @@ | |||
17 | #include <linux/string.h> | 17 | #include <linux/string.h> |
18 | 18 | ||
19 | 19 | ||
20 | #include <linux/ceph/ceph_features.h> | ||
20 | #include <linux/ceph/libceph.h> | 21 | #include <linux/ceph/libceph.h> |
21 | #include <linux/ceph/debugfs.h> | 22 | #include <linux/ceph/debugfs.h> |
22 | #include <linux/ceph/decode.h> | 23 | #include <linux/ceph/decode.h> |
@@ -460,27 +461,23 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, | |||
460 | client->auth_err = 0; | 461 | client->auth_err = 0; |
461 | 462 | ||
462 | client->extra_mon_dispatch = NULL; | 463 | client->extra_mon_dispatch = NULL; |
463 | client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT | | 464 | client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT | |
464 | supported_features; | 465 | supported_features; |
465 | client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT | | 466 | client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT | |
466 | required_features; | 467 | required_features; |
467 | 468 | ||
468 | /* msgr */ | 469 | /* msgr */ |
469 | if (ceph_test_opt(client, MYIP)) | 470 | if (ceph_test_opt(client, MYIP)) |
470 | myaddr = &client->options->my_addr; | 471 | myaddr = &client->options->my_addr; |
471 | client->msgr = ceph_messenger_create(myaddr, | 472 | ceph_messenger_init(&client->msgr, myaddr, |
472 | client->supported_features, | 473 | client->supported_features, |
473 | client->required_features); | 474 | client->required_features, |
474 | if (IS_ERR(client->msgr)) { | 475 | ceph_test_opt(client, NOCRC)); |
475 | err = PTR_ERR(client->msgr); | ||
476 | goto fail; | ||
477 | } | ||
478 | client->msgr->nocrc = ceph_test_opt(client, NOCRC); | ||
479 | 476 | ||
480 | /* subsystems */ | 477 | /* subsystems */ |
481 | err = ceph_monc_init(&client->monc, client); | 478 | err = ceph_monc_init(&client->monc, client); |
482 | if (err < 0) | 479 | if (err < 0) |
483 | goto fail_msgr; | 480 | goto fail; |
484 | err = ceph_osdc_init(&client->osdc, client); | 481 | err = ceph_osdc_init(&client->osdc, client); |
485 | if (err < 0) | 482 | if (err < 0) |
486 | goto fail_monc; | 483 | goto fail_monc; |
@@ -489,8 +486,6 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, | |||
489 | 486 | ||
490 | fail_monc: | 487 | fail_monc: |
491 | ceph_monc_stop(&client->monc); | 488 | ceph_monc_stop(&client->monc); |
492 | fail_msgr: | ||
493 | ceph_messenger_destroy(client->msgr); | ||
494 | fail: | 489 | fail: |
495 | kfree(client); | 490 | kfree(client); |
496 | return ERR_PTR(err); | 491 | return ERR_PTR(err); |
@@ -501,6 +496,8 @@ void ceph_destroy_client(struct ceph_client *client) | |||
501 | { | 496 | { |
502 | dout("destroy_client %p\n", client); | 497 | dout("destroy_client %p\n", client); |
503 | 498 | ||
499 | atomic_set(&client->msgr.stopping, 1); | ||
500 | |||
504 | /* unmount */ | 501 | /* unmount */ |
505 | ceph_osdc_stop(&client->osdc); | 502 | ceph_osdc_stop(&client->osdc); |
506 | 503 | ||
@@ -508,8 +505,6 @@ void ceph_destroy_client(struct ceph_client *client) | |||
508 | 505 | ||
509 | ceph_debugfs_client_cleanup(client); | 506 | ceph_debugfs_client_cleanup(client); |
510 | 507 | ||
511 | ceph_messenger_destroy(client->msgr); | ||
512 | |||
513 | ceph_destroy_options(client->options); | 508 | ceph_destroy_options(client->options); |
514 | 509 | ||
515 | kfree(client); | 510 | kfree(client); |
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index d7edc24333b..35fce755ce1 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c | |||
@@ -306,7 +306,6 @@ static int crush_choose(const struct crush_map *map, | |||
306 | int item = 0; | 306 | int item = 0; |
307 | int itemtype; | 307 | int itemtype; |
308 | int collide, reject; | 308 | int collide, reject; |
309 | const unsigned int orig_tries = 5; /* attempts before we fall back to search */ | ||
310 | 309 | ||
311 | dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", | 310 | dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", |
312 | bucket->id, x, outpos, numrep); | 311 | bucket->id, x, outpos, numrep); |
@@ -351,8 +350,9 @@ static int crush_choose(const struct crush_map *map, | |||
351 | reject = 1; | 350 | reject = 1; |
352 | goto reject; | 351 | goto reject; |
353 | } | 352 | } |
354 | if (flocal >= (in->size>>1) && | 353 | if (map->choose_local_fallback_tries > 0 && |
355 | flocal > orig_tries) | 354 | flocal >= (in->size>>1) && |
355 | flocal > map->choose_local_fallback_tries) | ||
356 | item = bucket_perm_choose(in, x, r); | 356 | item = bucket_perm_choose(in, x, r); |
357 | else | 357 | else |
358 | item = crush_bucket_choose(in, x, r); | 358 | item = crush_bucket_choose(in, x, r); |
@@ -422,13 +422,14 @@ reject: | |||
422 | ftotal++; | 422 | ftotal++; |
423 | flocal++; | 423 | flocal++; |
424 | 424 | ||
425 | if (collide && flocal < 3) | 425 | if (collide && flocal <= map->choose_local_tries) |
426 | /* retry locally a few times */ | 426 | /* retry locally a few times */ |
427 | retry_bucket = 1; | 427 | retry_bucket = 1; |
428 | else if (flocal <= in->size + orig_tries) | 428 | else if (map->choose_local_fallback_tries > 0 && |
429 | flocal <= in->size + map->choose_local_fallback_tries) | ||
429 | /* exhaustive bucket search */ | 430 | /* exhaustive bucket search */ |
430 | retry_bucket = 1; | 431 | retry_bucket = 1; |
431 | else if (ftotal < 20) | 432 | else if (ftotal <= map->choose_total_tries) |
432 | /* then retry descent */ | 433 | /* then retry descent */ |
433 | retry_descent = 1; | 434 | retry_descent = 1; |
434 | else | 435 | else |
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 10255e81be7..b9796750034 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
@@ -29,6 +29,74 @@ | |||
29 | * the sender. | 29 | * the sender. |
30 | */ | 30 | */ |
31 | 31 | ||
32 | /* | ||
33 | * We track the state of the socket on a given connection using | ||
34 | * values defined below. The transition to a new socket state is | ||
35 | * handled by a function which verifies we aren't coming from an | ||
36 | * unexpected state. | ||
37 | * | ||
38 | * -------- | ||
39 | * | NEW* | transient initial state | ||
40 | * -------- | ||
41 | * | con_sock_state_init() | ||
42 | * v | ||
43 | * ---------- | ||
44 | * | CLOSED | initialized, but no socket (and no | ||
45 | * ---------- TCP connection) | ||
46 | * ^ \ | ||
47 | * | \ con_sock_state_connecting() | ||
48 | * | ---------------------- | ||
49 | * | \ | ||
50 | * + con_sock_state_closed() \ | ||
51 | * |+--------------------------- \ | ||
52 | * | \ \ \ | ||
53 | * | ----------- \ \ | ||
54 | * | | CLOSING | socket event; \ \ | ||
55 | * | ----------- await close \ \ | ||
56 | * | ^ \ | | ||
57 | * | | \ | | ||
58 | * | + con_sock_state_closing() \ | | ||
59 | * | / \ | | | ||
60 | * | / --------------- | | | ||
61 | * | / \ v v | ||
62 | * | / -------------- | ||
63 | * | / -----------------| CONNECTING | socket created, TCP | ||
64 | * | | / -------------- connect initiated | ||
65 | * | | | con_sock_state_connected() | ||
66 | * | | v | ||
67 | * ------------- | ||
68 | * | CONNECTED | TCP connection established | ||
69 | * ------------- | ||
70 | * | ||
71 | * State values for ceph_connection->sock_state; NEW is assumed to be 0. | ||
72 | */ | ||
73 | |||
74 | #define CON_SOCK_STATE_NEW 0 /* -> CLOSED */ | ||
75 | #define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */ | ||
76 | #define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */ | ||
77 | #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ | ||
78 | #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ | ||
79 | |||
80 | /* | ||
81 | * connection states | ||
82 | */ | ||
83 | #define CON_STATE_CLOSED 1 /* -> PREOPEN */ | ||
84 | #define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */ | ||
85 | #define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */ | ||
86 | #define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */ | ||
87 | #define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ | ||
88 | #define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ | ||
89 | |||
90 | /* | ||
91 | * ceph_connection flag bits | ||
92 | */ | ||
93 | #define CON_FLAG_LOSSYTX 0 /* we can close channel or drop | ||
94 | * messages on errors */ | ||
95 | #define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ | ||
96 | #define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */ | ||
97 | #define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ | ||
98 | #define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ | ||
99 | |||
32 | /* static tag bytes (protocol control messages) */ | 100 | /* static tag bytes (protocol control messages) */ |
33 | static char tag_msg = CEPH_MSGR_TAG_MSG; | 101 | static char tag_msg = CEPH_MSGR_TAG_MSG; |
34 | static char tag_ack = CEPH_MSGR_TAG_ACK; | 102 | static char tag_ack = CEPH_MSGR_TAG_ACK; |
@@ -147,72 +215,130 @@ void ceph_msgr_flush(void) | |||
147 | } | 215 | } |
148 | EXPORT_SYMBOL(ceph_msgr_flush); | 216 | EXPORT_SYMBOL(ceph_msgr_flush); |
149 | 217 | ||
218 | /* Connection socket state transition functions */ | ||
219 | |||
220 | static void con_sock_state_init(struct ceph_connection *con) | ||
221 | { | ||
222 | int old_state; | ||
223 | |||
224 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); | ||
225 | if (WARN_ON(old_state != CON_SOCK_STATE_NEW)) | ||
226 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
227 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
228 | CON_SOCK_STATE_CLOSED); | ||
229 | } | ||
230 | |||
231 | static void con_sock_state_connecting(struct ceph_connection *con) | ||
232 | { | ||
233 | int old_state; | ||
234 | |||
235 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); | ||
236 | if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED)) | ||
237 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
238 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
239 | CON_SOCK_STATE_CONNECTING); | ||
240 | } | ||
241 | |||
242 | static void con_sock_state_connected(struct ceph_connection *con) | ||
243 | { | ||
244 | int old_state; | ||
245 | |||
246 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); | ||
247 | if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING)) | ||
248 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
249 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
250 | CON_SOCK_STATE_CONNECTED); | ||
251 | } | ||
252 | |||
253 | static void con_sock_state_closing(struct ceph_connection *con) | ||
254 | { | ||
255 | int old_state; | ||
256 | |||
257 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); | ||
258 | if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING && | ||
259 | old_state != CON_SOCK_STATE_CONNECTED && | ||
260 | old_state != CON_SOCK_STATE_CLOSING)) | ||
261 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
262 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
263 | CON_SOCK_STATE_CLOSING); | ||
264 | } | ||
265 | |||
266 | static void con_sock_state_closed(struct ceph_connection *con) | ||
267 | { | ||
268 | int old_state; | ||
269 | |||
270 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); | ||
271 | if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && | ||
272 | old_state != CON_SOCK_STATE_CLOSING && | ||
273 | old_state != CON_SOCK_STATE_CONNECTING && | ||
274 | old_state != CON_SOCK_STATE_CLOSED)) | ||
275 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
276 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
277 | CON_SOCK_STATE_CLOSED); | ||
278 | } | ||
150 | 279 | ||
151 | /* | 280 | /* |
152 | * socket callback functions | 281 | * socket callback functions |
153 | */ | 282 | */ |
154 | 283 | ||
155 | /* data available on socket, or listen socket received a connect */ | 284 | /* data available on socket, or listen socket received a connect */ |
156 | static void ceph_data_ready(struct sock *sk, int count_unused) | 285 | static void ceph_sock_data_ready(struct sock *sk, int count_unused) |
157 | { | 286 | { |
158 | struct ceph_connection *con = sk->sk_user_data; | 287 | struct ceph_connection *con = sk->sk_user_data; |
288 | if (atomic_read(&con->msgr->stopping)) { | ||
289 | return; | ||
290 | } | ||
159 | 291 | ||
160 | if (sk->sk_state != TCP_CLOSE_WAIT) { | 292 | if (sk->sk_state != TCP_CLOSE_WAIT) { |
161 | dout("ceph_data_ready on %p state = %lu, queueing work\n", | 293 | dout("%s on %p state = %lu, queueing work\n", __func__, |
162 | con, con->state); | 294 | con, con->state); |
163 | queue_con(con); | 295 | queue_con(con); |
164 | } | 296 | } |
165 | } | 297 | } |
166 | 298 | ||
167 | /* socket has buffer space for writing */ | 299 | /* socket has buffer space for writing */ |
168 | static void ceph_write_space(struct sock *sk) | 300 | static void ceph_sock_write_space(struct sock *sk) |
169 | { | 301 | { |
170 | struct ceph_connection *con = sk->sk_user_data; | 302 | struct ceph_connection *con = sk->sk_user_data; |
171 | 303 | ||
172 | /* only queue to workqueue if there is data we want to write, | 304 | /* only queue to workqueue if there is data we want to write, |
173 | * and there is sufficient space in the socket buffer to accept | 305 | * and there is sufficient space in the socket buffer to accept |
174 | * more data. clear SOCK_NOSPACE so that ceph_write_space() | 306 | * more data. clear SOCK_NOSPACE so that ceph_sock_write_space() |
175 | * doesn't get called again until try_write() fills the socket | 307 | * doesn't get called again until try_write() fills the socket |
176 | * buffer. See net/ipv4/tcp_input.c:tcp_check_space() | 308 | * buffer. See net/ipv4/tcp_input.c:tcp_check_space() |
177 | * and net/core/stream.c:sk_stream_write_space(). | 309 | * and net/core/stream.c:sk_stream_write_space(). |
178 | */ | 310 | */ |
179 | if (test_bit(WRITE_PENDING, &con->state)) { | 311 | if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) { |
180 | if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { | 312 | if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { |
181 | dout("ceph_write_space %p queueing write work\n", con); | 313 | dout("%s %p queueing write work\n", __func__, con); |
182 | clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); | 314 | clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); |
183 | queue_con(con); | 315 | queue_con(con); |
184 | } | 316 | } |
185 | } else { | 317 | } else { |
186 | dout("ceph_write_space %p nothing to write\n", con); | 318 | dout("%s %p nothing to write\n", __func__, con); |
187 | } | 319 | } |
188 | } | 320 | } |
189 | 321 | ||
190 | /* socket's state has changed */ | 322 | /* socket's state has changed */ |
191 | static void ceph_state_change(struct sock *sk) | 323 | static void ceph_sock_state_change(struct sock *sk) |
192 | { | 324 | { |
193 | struct ceph_connection *con = sk->sk_user_data; | 325 | struct ceph_connection *con = sk->sk_user_data; |
194 | 326 | ||
195 | dout("ceph_state_change %p state = %lu sk_state = %u\n", | 327 | dout("%s %p state = %lu sk_state = %u\n", __func__, |
196 | con, con->state, sk->sk_state); | 328 | con, con->state, sk->sk_state); |
197 | 329 | ||
198 | if (test_bit(CLOSED, &con->state)) | ||
199 | return; | ||
200 | |||
201 | switch (sk->sk_state) { | 330 | switch (sk->sk_state) { |
202 | case TCP_CLOSE: | 331 | case TCP_CLOSE: |
203 | dout("ceph_state_change TCP_CLOSE\n"); | 332 | dout("%s TCP_CLOSE\n", __func__); |
204 | case TCP_CLOSE_WAIT: | 333 | case TCP_CLOSE_WAIT: |
205 | dout("ceph_state_change TCP_CLOSE_WAIT\n"); | 334 | dout("%s TCP_CLOSE_WAIT\n", __func__); |
206 | if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { | 335 | con_sock_state_closing(con); |
207 | if (test_bit(CONNECTING, &con->state)) | 336 | set_bit(CON_FLAG_SOCK_CLOSED, &con->flags); |
208 | con->error_msg = "connection failed"; | 337 | queue_con(con); |
209 | else | ||
210 | con->error_msg = "socket closed"; | ||
211 | queue_con(con); | ||
212 | } | ||
213 | break; | 338 | break; |
214 | case TCP_ESTABLISHED: | 339 | case TCP_ESTABLISHED: |
215 | dout("ceph_state_change TCP_ESTABLISHED\n"); | 340 | dout("%s TCP_ESTABLISHED\n", __func__); |
341 | con_sock_state_connected(con); | ||
216 | queue_con(con); | 342 | queue_con(con); |
217 | break; | 343 | break; |
218 | default: /* Everything else is uninteresting */ | 344 | default: /* Everything else is uninteresting */ |
@@ -228,9 +354,9 @@ static void set_sock_callbacks(struct socket *sock, | |||
228 | { | 354 | { |
229 | struct sock *sk = sock->sk; | 355 | struct sock *sk = sock->sk; |
230 | sk->sk_user_data = con; | 356 | sk->sk_user_data = con; |
231 | sk->sk_data_ready = ceph_data_ready; | 357 | sk->sk_data_ready = ceph_sock_data_ready; |
232 | sk->sk_write_space = ceph_write_space; | 358 | sk->sk_write_space = ceph_sock_write_space; |
233 | sk->sk_state_change = ceph_state_change; | 359 | sk->sk_state_change = ceph_sock_state_change; |
234 | } | 360 | } |
235 | 361 | ||
236 | 362 | ||
@@ -262,6 +388,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) | |||
262 | 388 | ||
263 | dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); | 389 | dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); |
264 | 390 | ||
391 | con_sock_state_connecting(con); | ||
265 | ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), | 392 | ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), |
266 | O_NONBLOCK); | 393 | O_NONBLOCK); |
267 | if (ret == -EINPROGRESS) { | 394 | if (ret == -EINPROGRESS) { |
@@ -277,7 +404,6 @@ static int ceph_tcp_connect(struct ceph_connection *con) | |||
277 | return ret; | 404 | return ret; |
278 | } | 405 | } |
279 | con->sock = sock; | 406 | con->sock = sock; |
280 | |||
281 | return 0; | 407 | return 0; |
282 | } | 408 | } |
283 | 409 | ||
@@ -333,16 +459,24 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, | |||
333 | */ | 459 | */ |
334 | static int con_close_socket(struct ceph_connection *con) | 460 | static int con_close_socket(struct ceph_connection *con) |
335 | { | 461 | { |
336 | int rc; | 462 | int rc = 0; |
337 | 463 | ||
338 | dout("con_close_socket on %p sock %p\n", con, con->sock); | 464 | dout("con_close_socket on %p sock %p\n", con, con->sock); |
339 | if (!con->sock) | 465 | if (con->sock) { |
340 | return 0; | 466 | rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); |
341 | set_bit(SOCK_CLOSED, &con->state); | 467 | sock_release(con->sock); |
342 | rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); | 468 | con->sock = NULL; |
343 | sock_release(con->sock); | 469 | } |
344 | con->sock = NULL; | 470 | |
345 | clear_bit(SOCK_CLOSED, &con->state); | 471 | /* |
472 | * Forcibly clear the SOCK_CLOSED flag. It gets set | ||
473 | * independent of the connection mutex, and we could have | ||
474 | * received a socket close event before we had the chance to | ||
475 | * shut the socket down. | ||
476 | */ | ||
477 | clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); | ||
478 | |||
479 | con_sock_state_closed(con); | ||
346 | return rc; | 480 | return rc; |
347 | } | 481 | } |
348 | 482 | ||
@@ -353,6 +487,10 @@ static int con_close_socket(struct ceph_connection *con) | |||
353 | static void ceph_msg_remove(struct ceph_msg *msg) | 487 | static void ceph_msg_remove(struct ceph_msg *msg) |
354 | { | 488 | { |
355 | list_del_init(&msg->list_head); | 489 | list_del_init(&msg->list_head); |
490 | BUG_ON(msg->con == NULL); | ||
491 | msg->con->ops->put(msg->con); | ||
492 | msg->con = NULL; | ||
493 | |||
356 | ceph_msg_put(msg); | 494 | ceph_msg_put(msg); |
357 | } | 495 | } |
358 | static void ceph_msg_remove_list(struct list_head *head) | 496 | static void ceph_msg_remove_list(struct list_head *head) |
@@ -372,8 +510,11 @@ static void reset_connection(struct ceph_connection *con) | |||
372 | ceph_msg_remove_list(&con->out_sent); | 510 | ceph_msg_remove_list(&con->out_sent); |
373 | 511 | ||
374 | if (con->in_msg) { | 512 | if (con->in_msg) { |
513 | BUG_ON(con->in_msg->con != con); | ||
514 | con->in_msg->con = NULL; | ||
375 | ceph_msg_put(con->in_msg); | 515 | ceph_msg_put(con->in_msg); |
376 | con->in_msg = NULL; | 516 | con->in_msg = NULL; |
517 | con->ops->put(con); | ||
377 | } | 518 | } |
378 | 519 | ||
379 | con->connect_seq = 0; | 520 | con->connect_seq = 0; |
@@ -391,32 +532,44 @@ static void reset_connection(struct ceph_connection *con) | |||
391 | */ | 532 | */ |
392 | void ceph_con_close(struct ceph_connection *con) | 533 | void ceph_con_close(struct ceph_connection *con) |
393 | { | 534 | { |
535 | mutex_lock(&con->mutex); | ||
394 | dout("con_close %p peer %s\n", con, | 536 | dout("con_close %p peer %s\n", con, |
395 | ceph_pr_addr(&con->peer_addr.in_addr)); | 537 | ceph_pr_addr(&con->peer_addr.in_addr)); |
396 | set_bit(CLOSED, &con->state); /* in case there's queued work */ | 538 | con->state = CON_STATE_CLOSED; |
397 | clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ | 539 | |
398 | clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ | 540 | clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ |
399 | clear_bit(KEEPALIVE_PENDING, &con->state); | 541 | clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); |
400 | clear_bit(WRITE_PENDING, &con->state); | 542 | clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
401 | mutex_lock(&con->mutex); | 543 | clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); |
544 | clear_bit(CON_FLAG_BACKOFF, &con->flags); | ||
545 | |||
402 | reset_connection(con); | 546 | reset_connection(con); |
403 | con->peer_global_seq = 0; | 547 | con->peer_global_seq = 0; |
404 | cancel_delayed_work(&con->work); | 548 | cancel_delayed_work(&con->work); |
549 | con_close_socket(con); | ||
405 | mutex_unlock(&con->mutex); | 550 | mutex_unlock(&con->mutex); |
406 | queue_con(con); | ||
407 | } | 551 | } |
408 | EXPORT_SYMBOL(ceph_con_close); | 552 | EXPORT_SYMBOL(ceph_con_close); |
409 | 553 | ||
410 | /* | 554 | /* |
411 | * Reopen a closed connection, with a new peer address. | 555 | * Reopen a closed connection, with a new peer address. |
412 | */ | 556 | */ |
413 | void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) | 557 | void ceph_con_open(struct ceph_connection *con, |
558 | __u8 entity_type, __u64 entity_num, | ||
559 | struct ceph_entity_addr *addr) | ||
414 | { | 560 | { |
561 | mutex_lock(&con->mutex); | ||
415 | dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); | 562 | dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); |
416 | set_bit(OPENING, &con->state); | 563 | |
417 | clear_bit(CLOSED, &con->state); | 564 | BUG_ON(con->state != CON_STATE_CLOSED); |
565 | con->state = CON_STATE_PREOPEN; | ||
566 | |||
567 | con->peer_name.type = (__u8) entity_type; | ||
568 | con->peer_name.num = cpu_to_le64(entity_num); | ||
569 | |||
418 | memcpy(&con->peer_addr, addr, sizeof(*addr)); | 570 | memcpy(&con->peer_addr, addr, sizeof(*addr)); |
419 | con->delay = 0; /* reset backoff memory */ | 571 | con->delay = 0; /* reset backoff memory */ |
572 | mutex_unlock(&con->mutex); | ||
420 | queue_con(con); | 573 | queue_con(con); |
421 | } | 574 | } |
422 | EXPORT_SYMBOL(ceph_con_open); | 575 | EXPORT_SYMBOL(ceph_con_open); |
@@ -430,42 +583,26 @@ bool ceph_con_opened(struct ceph_connection *con) | |||
430 | } | 583 | } |
431 | 584 | ||
432 | /* | 585 | /* |
433 | * generic get/put | ||
434 | */ | ||
435 | struct ceph_connection *ceph_con_get(struct ceph_connection *con) | ||
436 | { | ||
437 | int nref = __atomic_add_unless(&con->nref, 1, 0); | ||
438 | |||
439 | dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1); | ||
440 | |||
441 | return nref ? con : NULL; | ||
442 | } | ||
443 | |||
444 | void ceph_con_put(struct ceph_connection *con) | ||
445 | { | ||
446 | int nref = atomic_dec_return(&con->nref); | ||
447 | |||
448 | BUG_ON(nref < 0); | ||
449 | if (nref == 0) { | ||
450 | BUG_ON(con->sock); | ||
451 | kfree(con); | ||
452 | } | ||
453 | dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref); | ||
454 | } | ||
455 | |||
456 | /* | ||
457 | * initialize a new connection. | 586 | * initialize a new connection. |
458 | */ | 587 | */ |
459 | void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) | 588 | void ceph_con_init(struct ceph_connection *con, void *private, |
589 | const struct ceph_connection_operations *ops, | ||
590 | struct ceph_messenger *msgr) | ||
460 | { | 591 | { |
461 | dout("con_init %p\n", con); | 592 | dout("con_init %p\n", con); |
462 | memset(con, 0, sizeof(*con)); | 593 | memset(con, 0, sizeof(*con)); |
463 | atomic_set(&con->nref, 1); | 594 | con->private = private; |
595 | con->ops = ops; | ||
464 | con->msgr = msgr; | 596 | con->msgr = msgr; |
597 | |||
598 | con_sock_state_init(con); | ||
599 | |||
465 | mutex_init(&con->mutex); | 600 | mutex_init(&con->mutex); |
466 | INIT_LIST_HEAD(&con->out_queue); | 601 | INIT_LIST_HEAD(&con->out_queue); |
467 | INIT_LIST_HEAD(&con->out_sent); | 602 | INIT_LIST_HEAD(&con->out_sent); |
468 | INIT_DELAYED_WORK(&con->work, con_work); | 603 | INIT_DELAYED_WORK(&con->work, con_work); |
604 | |||
605 | con->state = CON_STATE_CLOSED; | ||
469 | } | 606 | } |
470 | EXPORT_SYMBOL(ceph_con_init); | 607 | EXPORT_SYMBOL(ceph_con_init); |
471 | 608 | ||
@@ -486,14 +623,14 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) | |||
486 | return ret; | 623 | return ret; |
487 | } | 624 | } |
488 | 625 | ||
489 | static void ceph_con_out_kvec_reset(struct ceph_connection *con) | 626 | static void con_out_kvec_reset(struct ceph_connection *con) |
490 | { | 627 | { |
491 | con->out_kvec_left = 0; | 628 | con->out_kvec_left = 0; |
492 | con->out_kvec_bytes = 0; | 629 | con->out_kvec_bytes = 0; |
493 | con->out_kvec_cur = &con->out_kvec[0]; | 630 | con->out_kvec_cur = &con->out_kvec[0]; |
494 | } | 631 | } |
495 | 632 | ||
496 | static void ceph_con_out_kvec_add(struct ceph_connection *con, | 633 | static void con_out_kvec_add(struct ceph_connection *con, |
497 | size_t size, void *data) | 634 | size_t size, void *data) |
498 | { | 635 | { |
499 | int index; | 636 | int index; |
@@ -507,6 +644,53 @@ static void ceph_con_out_kvec_add(struct ceph_connection *con, | |||
507 | con->out_kvec_bytes += size; | 644 | con->out_kvec_bytes += size; |
508 | } | 645 | } |
509 | 646 | ||
647 | #ifdef CONFIG_BLOCK | ||
648 | static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) | ||
649 | { | ||
650 | if (!bio) { | ||
651 | *iter = NULL; | ||
652 | *seg = 0; | ||
653 | return; | ||
654 | } | ||
655 | *iter = bio; | ||
656 | *seg = bio->bi_idx; | ||
657 | } | ||
658 | |||
659 | static void iter_bio_next(struct bio **bio_iter, int *seg) | ||
660 | { | ||
661 | if (*bio_iter == NULL) | ||
662 | return; | ||
663 | |||
664 | BUG_ON(*seg >= (*bio_iter)->bi_vcnt); | ||
665 | |||
666 | (*seg)++; | ||
667 | if (*seg == (*bio_iter)->bi_vcnt) | ||
668 | init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); | ||
669 | } | ||
670 | #endif | ||
671 | |||
672 | static void prepare_write_message_data(struct ceph_connection *con) | ||
673 | { | ||
674 | struct ceph_msg *msg = con->out_msg; | ||
675 | |||
676 | BUG_ON(!msg); | ||
677 | BUG_ON(!msg->hdr.data_len); | ||
678 | |||
679 | /* initialize page iterator */ | ||
680 | con->out_msg_pos.page = 0; | ||
681 | if (msg->pages) | ||
682 | con->out_msg_pos.page_pos = msg->page_alignment; | ||
683 | else | ||
684 | con->out_msg_pos.page_pos = 0; | ||
685 | #ifdef CONFIG_BLOCK | ||
686 | if (msg->bio) | ||
687 | init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); | ||
688 | #endif | ||
689 | con->out_msg_pos.data_pos = 0; | ||
690 | con->out_msg_pos.did_page_crc = false; | ||
691 | con->out_more = 1; /* data + footer will follow */ | ||
692 | } | ||
693 | |||
510 | /* | 694 | /* |
511 | * Prepare footer for currently outgoing message, and finish things | 695 | * Prepare footer for currently outgoing message, and finish things |
512 | * off. Assumes out_kvec* are already valid.. we just add on to the end. | 696 | * off. Assumes out_kvec* are already valid.. we just add on to the end. |
@@ -516,6 +700,8 @@ static void prepare_write_message_footer(struct ceph_connection *con) | |||
516 | struct ceph_msg *m = con->out_msg; | 700 | struct ceph_msg *m = con->out_msg; |
517 | int v = con->out_kvec_left; | 701 | int v = con->out_kvec_left; |
518 | 702 | ||
703 | m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; | ||
704 | |||
519 | dout("prepare_write_message_footer %p\n", con); | 705 | dout("prepare_write_message_footer %p\n", con); |
520 | con->out_kvec_is_msg = true; | 706 | con->out_kvec_is_msg = true; |
521 | con->out_kvec[v].iov_base = &m->footer; | 707 | con->out_kvec[v].iov_base = &m->footer; |
@@ -534,7 +720,7 @@ static void prepare_write_message(struct ceph_connection *con) | |||
534 | struct ceph_msg *m; | 720 | struct ceph_msg *m; |
535 | u32 crc; | 721 | u32 crc; |
536 | 722 | ||
537 | ceph_con_out_kvec_reset(con); | 723 | con_out_kvec_reset(con); |
538 | con->out_kvec_is_msg = true; | 724 | con->out_kvec_is_msg = true; |
539 | con->out_msg_done = false; | 725 | con->out_msg_done = false; |
540 | 726 | ||
@@ -542,14 +728,16 @@ static void prepare_write_message(struct ceph_connection *con) | |||
542 | * TCP packet that's a good thing. */ | 728 | * TCP packet that's a good thing. */ |
543 | if (con->in_seq > con->in_seq_acked) { | 729 | if (con->in_seq > con->in_seq_acked) { |
544 | con->in_seq_acked = con->in_seq; | 730 | con->in_seq_acked = con->in_seq; |
545 | ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); | 731 | con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); |
546 | con->out_temp_ack = cpu_to_le64(con->in_seq_acked); | 732 | con->out_temp_ack = cpu_to_le64(con->in_seq_acked); |
547 | ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), | 733 | con_out_kvec_add(con, sizeof (con->out_temp_ack), |
548 | &con->out_temp_ack); | 734 | &con->out_temp_ack); |
549 | } | 735 | } |
550 | 736 | ||
737 | BUG_ON(list_empty(&con->out_queue)); | ||
551 | m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); | 738 | m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); |
552 | con->out_msg = m; | 739 | con->out_msg = m; |
740 | BUG_ON(m->con != con); | ||
553 | 741 | ||
554 | /* put message on sent list */ | 742 | /* put message on sent list */ |
555 | ceph_msg_get(m); | 743 | ceph_msg_get(m); |
@@ -576,18 +764,18 @@ static void prepare_write_message(struct ceph_connection *con) | |||
576 | BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); | 764 | BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); |
577 | 765 | ||
578 | /* tag + hdr + front + middle */ | 766 | /* tag + hdr + front + middle */ |
579 | ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); | 767 | con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); |
580 | ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); | 768 | con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); |
581 | ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); | 769 | con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); |
582 | 770 | ||
583 | if (m->middle) | 771 | if (m->middle) |
584 | ceph_con_out_kvec_add(con, m->middle->vec.iov_len, | 772 | con_out_kvec_add(con, m->middle->vec.iov_len, |
585 | m->middle->vec.iov_base); | 773 | m->middle->vec.iov_base); |
586 | 774 | ||
587 | /* fill in crc (except data pages), footer */ | 775 | /* fill in crc (except data pages), footer */ |
588 | crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); | 776 | crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); |
589 | con->out_msg->hdr.crc = cpu_to_le32(crc); | 777 | con->out_msg->hdr.crc = cpu_to_le32(crc); |
590 | con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; | 778 | con->out_msg->footer.flags = 0; |
591 | 779 | ||
592 | crc = crc32c(0, m->front.iov_base, m->front.iov_len); | 780 | crc = crc32c(0, m->front.iov_base, m->front.iov_len); |
593 | con->out_msg->footer.front_crc = cpu_to_le32(crc); | 781 | con->out_msg->footer.front_crc = cpu_to_le32(crc); |
@@ -597,28 +785,19 @@ static void prepare_write_message(struct ceph_connection *con) | |||
597 | con->out_msg->footer.middle_crc = cpu_to_le32(crc); | 785 | con->out_msg->footer.middle_crc = cpu_to_le32(crc); |
598 | } else | 786 | } else |
599 | con->out_msg->footer.middle_crc = 0; | 787 | con->out_msg->footer.middle_crc = 0; |
600 | con->out_msg->footer.data_crc = 0; | 788 | dout("%s front_crc %u middle_crc %u\n", __func__, |
601 | dout("prepare_write_message front_crc %u data_crc %u\n", | ||
602 | le32_to_cpu(con->out_msg->footer.front_crc), | 789 | le32_to_cpu(con->out_msg->footer.front_crc), |
603 | le32_to_cpu(con->out_msg->footer.middle_crc)); | 790 | le32_to_cpu(con->out_msg->footer.middle_crc)); |
604 | 791 | ||
605 | /* is there a data payload? */ | 792 | /* is there a data payload? */ |
606 | if (le32_to_cpu(m->hdr.data_len) > 0) { | 793 | con->out_msg->footer.data_crc = 0; |
607 | /* initialize page iterator */ | 794 | if (m->hdr.data_len) |
608 | con->out_msg_pos.page = 0; | 795 | prepare_write_message_data(con); |
609 | if (m->pages) | 796 | else |
610 | con->out_msg_pos.page_pos = m->page_alignment; | ||
611 | else | ||
612 | con->out_msg_pos.page_pos = 0; | ||
613 | con->out_msg_pos.data_pos = 0; | ||
614 | con->out_msg_pos.did_page_crc = false; | ||
615 | con->out_more = 1; /* data + footer will follow */ | ||
616 | } else { | ||
617 | /* no, queue up footer too and be done */ | 797 | /* no, queue up footer too and be done */ |
618 | prepare_write_message_footer(con); | 798 | prepare_write_message_footer(con); |
619 | } | ||
620 | 799 | ||
621 | set_bit(WRITE_PENDING, &con->state); | 800 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
622 | } | 801 | } |
623 | 802 | ||
624 | /* | 803 | /* |
@@ -630,16 +809,16 @@ static void prepare_write_ack(struct ceph_connection *con) | |||
630 | con->in_seq_acked, con->in_seq); | 809 | con->in_seq_acked, con->in_seq); |
631 | con->in_seq_acked = con->in_seq; | 810 | con->in_seq_acked = con->in_seq; |
632 | 811 | ||
633 | ceph_con_out_kvec_reset(con); | 812 | con_out_kvec_reset(con); |
634 | 813 | ||
635 | ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); | 814 | con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); |
636 | 815 | ||
637 | con->out_temp_ack = cpu_to_le64(con->in_seq_acked); | 816 | con->out_temp_ack = cpu_to_le64(con->in_seq_acked); |
638 | ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), | 817 | con_out_kvec_add(con, sizeof (con->out_temp_ack), |
639 | &con->out_temp_ack); | 818 | &con->out_temp_ack); |
640 | 819 | ||
641 | con->out_more = 1; /* more will follow.. eventually.. */ | 820 | con->out_more = 1; /* more will follow.. eventually.. */ |
642 | set_bit(WRITE_PENDING, &con->state); | 821 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
643 | } | 822 | } |
644 | 823 | ||
645 | /* | 824 | /* |
@@ -648,9 +827,9 @@ static void prepare_write_ack(struct ceph_connection *con) | |||
648 | static void prepare_write_keepalive(struct ceph_connection *con) | 827 | static void prepare_write_keepalive(struct ceph_connection *con) |
649 | { | 828 | { |
650 | dout("prepare_write_keepalive %p\n", con); | 829 | dout("prepare_write_keepalive %p\n", con); |
651 | ceph_con_out_kvec_reset(con); | 830 | con_out_kvec_reset(con); |
652 | ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); | 831 | con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); |
653 | set_bit(WRITE_PENDING, &con->state); | 832 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
654 | } | 833 | } |
655 | 834 | ||
656 | /* | 835 | /* |
@@ -665,27 +844,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection | |||
665 | if (!con->ops->get_authorizer) { | 844 | if (!con->ops->get_authorizer) { |
666 | con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; | 845 | con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; |
667 | con->out_connect.authorizer_len = 0; | 846 | con->out_connect.authorizer_len = 0; |
668 | |||
669 | return NULL; | 847 | return NULL; |
670 | } | 848 | } |
671 | 849 | ||
672 | /* Can't hold the mutex while getting authorizer */ | 850 | /* Can't hold the mutex while getting authorizer */ |
673 | |||
674 | mutex_unlock(&con->mutex); | 851 | mutex_unlock(&con->mutex); |
675 | |||
676 | auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); | 852 | auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); |
677 | |||
678 | mutex_lock(&con->mutex); | 853 | mutex_lock(&con->mutex); |
679 | 854 | ||
680 | if (IS_ERR(auth)) | 855 | if (IS_ERR(auth)) |
681 | return auth; | 856 | return auth; |
682 | if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) | 857 | if (con->state != CON_STATE_NEGOTIATING) |
683 | return ERR_PTR(-EAGAIN); | 858 | return ERR_PTR(-EAGAIN); |
684 | 859 | ||
685 | con->auth_reply_buf = auth->authorizer_reply_buf; | 860 | con->auth_reply_buf = auth->authorizer_reply_buf; |
686 | con->auth_reply_buf_len = auth->authorizer_reply_buf_len; | 861 | con->auth_reply_buf_len = auth->authorizer_reply_buf_len; |
687 | |||
688 | |||
689 | return auth; | 862 | return auth; |
690 | } | 863 | } |
691 | 864 | ||
@@ -694,12 +867,12 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection | |||
694 | */ | 867 | */ |
695 | static void prepare_write_banner(struct ceph_connection *con) | 868 | static void prepare_write_banner(struct ceph_connection *con) |
696 | { | 869 | { |
697 | ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); | 870 | con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); |
698 | ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), | 871 | con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), |
699 | &con->msgr->my_enc_addr); | 872 | &con->msgr->my_enc_addr); |
700 | 873 | ||
701 | con->out_more = 0; | 874 | con->out_more = 0; |
702 | set_bit(WRITE_PENDING, &con->state); | 875 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
703 | } | 876 | } |
704 | 877 | ||
705 | static int prepare_write_connect(struct ceph_connection *con) | 878 | static int prepare_write_connect(struct ceph_connection *con) |
@@ -742,14 +915,15 @@ static int prepare_write_connect(struct ceph_connection *con) | |||
742 | con->out_connect.authorizer_len = auth ? | 915 | con->out_connect.authorizer_len = auth ? |
743 | cpu_to_le32(auth->authorizer_buf_len) : 0; | 916 | cpu_to_le32(auth->authorizer_buf_len) : 0; |
744 | 917 | ||
745 | ceph_con_out_kvec_add(con, sizeof (con->out_connect), | 918 | con_out_kvec_reset(con); |
919 | con_out_kvec_add(con, sizeof (con->out_connect), | ||
746 | &con->out_connect); | 920 | &con->out_connect); |
747 | if (auth && auth->authorizer_buf_len) | 921 | if (auth && auth->authorizer_buf_len) |
748 | ceph_con_out_kvec_add(con, auth->authorizer_buf_len, | 922 | con_out_kvec_add(con, auth->authorizer_buf_len, |
749 | auth->authorizer_buf); | 923 | auth->authorizer_buf); |
750 | 924 | ||
751 | con->out_more = 0; | 925 | con->out_more = 0; |
752 | set_bit(WRITE_PENDING, &con->state); | 926 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
753 | 927 | ||
754 | return 0; | 928 | return 0; |
755 | } | 929 | } |
@@ -797,30 +971,34 @@ out: | |||
797 | return ret; /* done! */ | 971 | return ret; /* done! */ |
798 | } | 972 | } |
799 | 973 | ||
800 | #ifdef CONFIG_BLOCK | 974 | static void out_msg_pos_next(struct ceph_connection *con, struct page *page, |
801 | static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) | 975 | size_t len, size_t sent, bool in_trail) |
802 | { | 976 | { |
803 | if (!bio) { | 977 | struct ceph_msg *msg = con->out_msg; |
804 | *iter = NULL; | ||
805 | *seg = 0; | ||
806 | return; | ||
807 | } | ||
808 | *iter = bio; | ||
809 | *seg = bio->bi_idx; | ||
810 | } | ||
811 | 978 | ||
812 | static void iter_bio_next(struct bio **bio_iter, int *seg) | 979 | BUG_ON(!msg); |
813 | { | 980 | BUG_ON(!sent); |
814 | if (*bio_iter == NULL) | ||
815 | return; | ||
816 | 981 | ||
817 | BUG_ON(*seg >= (*bio_iter)->bi_vcnt); | 982 | con->out_msg_pos.data_pos += sent; |
983 | con->out_msg_pos.page_pos += sent; | ||
984 | if (sent < len) | ||
985 | return; | ||
818 | 986 | ||
819 | (*seg)++; | 987 | BUG_ON(sent != len); |
820 | if (*seg == (*bio_iter)->bi_vcnt) | 988 | con->out_msg_pos.page_pos = 0; |
821 | init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); | 989 | con->out_msg_pos.page++; |
822 | } | 990 | con->out_msg_pos.did_page_crc = false; |
991 | if (in_trail) | ||
992 | list_move_tail(&page->lru, | ||
993 | &msg->trail->head); | ||
994 | else if (msg->pagelist) | ||
995 | list_move_tail(&page->lru, | ||
996 | &msg->pagelist->head); | ||
997 | #ifdef CONFIG_BLOCK | ||
998 | else if (msg->bio) | ||
999 | iter_bio_next(&msg->bio_iter, &msg->bio_seg); | ||
823 | #endif | 1000 | #endif |
1001 | } | ||
824 | 1002 | ||
825 | /* | 1003 | /* |
826 | * Write as much message data payload as we can. If we finish, queue | 1004 | * Write as much message data payload as we can. If we finish, queue |
@@ -837,41 +1015,36 @@ static int write_partial_msg_pages(struct ceph_connection *con) | |||
837 | bool do_datacrc = !con->msgr->nocrc; | 1015 | bool do_datacrc = !con->msgr->nocrc; |
838 | int ret; | 1016 | int ret; |
839 | int total_max_write; | 1017 | int total_max_write; |
840 | int in_trail = 0; | 1018 | bool in_trail = false; |
841 | size_t trail_len = (msg->trail ? msg->trail->length : 0); | 1019 | const size_t trail_len = (msg->trail ? msg->trail->length : 0); |
1020 | const size_t trail_off = data_len - trail_len; | ||
842 | 1021 | ||
843 | dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", | 1022 | dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", |
844 | con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, | 1023 | con, msg, con->out_msg_pos.page, msg->nr_pages, |
845 | con->out_msg_pos.page_pos); | 1024 | con->out_msg_pos.page_pos); |
846 | 1025 | ||
847 | #ifdef CONFIG_BLOCK | 1026 | /* |
848 | if (msg->bio && !msg->bio_iter) | 1027 | * Iterate through each page that contains data to be |
849 | init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); | 1028 | * written, and send as much as possible for each. |
850 | #endif | 1029 | * |
851 | 1030 | * If we are calculating the data crc (the default), we will | |
1031 | * need to map the page. If we have no pages, they have | ||
1032 | * been revoked, so use the zero page. | ||
1033 | */ | ||
852 | while (data_len > con->out_msg_pos.data_pos) { | 1034 | while (data_len > con->out_msg_pos.data_pos) { |
853 | struct page *page = NULL; | 1035 | struct page *page = NULL; |
854 | int max_write = PAGE_SIZE; | 1036 | int max_write = PAGE_SIZE; |
855 | int bio_offset = 0; | 1037 | int bio_offset = 0; |
856 | 1038 | ||
857 | total_max_write = data_len - trail_len - | 1039 | in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off; |
858 | con->out_msg_pos.data_pos; | 1040 | if (!in_trail) |
859 | 1041 | total_max_write = trail_off - con->out_msg_pos.data_pos; | |
860 | /* | ||
861 | * if we are calculating the data crc (the default), we need | ||
862 | * to map the page. if our pages[] has been revoked, use the | ||
863 | * zero page. | ||
864 | */ | ||
865 | |||
866 | /* have we reached the trail part of the data? */ | ||
867 | if (con->out_msg_pos.data_pos >= data_len - trail_len) { | ||
868 | in_trail = 1; | ||
869 | 1042 | ||
1043 | if (in_trail) { | ||
870 | total_max_write = data_len - con->out_msg_pos.data_pos; | 1044 | total_max_write = data_len - con->out_msg_pos.data_pos; |
871 | 1045 | ||
872 | page = list_first_entry(&msg->trail->head, | 1046 | page = list_first_entry(&msg->trail->head, |
873 | struct page, lru); | 1047 | struct page, lru); |
874 | max_write = PAGE_SIZE; | ||
875 | } else if (msg->pages) { | 1048 | } else if (msg->pages) { |
876 | page = msg->pages[con->out_msg_pos.page]; | 1049 | page = msg->pages[con->out_msg_pos.page]; |
877 | } else if (msg->pagelist) { | 1050 | } else if (msg->pagelist) { |
@@ -894,15 +1067,14 @@ static int write_partial_msg_pages(struct ceph_connection *con) | |||
894 | 1067 | ||
895 | if (do_datacrc && !con->out_msg_pos.did_page_crc) { | 1068 | if (do_datacrc && !con->out_msg_pos.did_page_crc) { |
896 | void *base; | 1069 | void *base; |
897 | u32 crc; | 1070 | u32 crc = le32_to_cpu(msg->footer.data_crc); |
898 | u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); | ||
899 | char *kaddr; | 1071 | char *kaddr; |
900 | 1072 | ||
901 | kaddr = kmap(page); | 1073 | kaddr = kmap(page); |
902 | BUG_ON(kaddr == NULL); | 1074 | BUG_ON(kaddr == NULL); |
903 | base = kaddr + con->out_msg_pos.page_pos + bio_offset; | 1075 | base = kaddr + con->out_msg_pos.page_pos + bio_offset; |
904 | crc = crc32c(tmpcrc, base, len); | 1076 | crc = crc32c(crc, base, len); |
905 | con->out_msg->footer.data_crc = cpu_to_le32(crc); | 1077 | msg->footer.data_crc = cpu_to_le32(crc); |
906 | con->out_msg_pos.did_page_crc = true; | 1078 | con->out_msg_pos.did_page_crc = true; |
907 | } | 1079 | } |
908 | ret = ceph_tcp_sendpage(con->sock, page, | 1080 | ret = ceph_tcp_sendpage(con->sock, page, |
@@ -915,31 +1087,15 @@ static int write_partial_msg_pages(struct ceph_connection *con) | |||
915 | if (ret <= 0) | 1087 | if (ret <= 0) |
916 | goto out; | 1088 | goto out; |
917 | 1089 | ||
918 | con->out_msg_pos.data_pos += ret; | 1090 | out_msg_pos_next(con, page, len, (size_t) ret, in_trail); |
919 | con->out_msg_pos.page_pos += ret; | ||
920 | if (ret == len) { | ||
921 | con->out_msg_pos.page_pos = 0; | ||
922 | con->out_msg_pos.page++; | ||
923 | con->out_msg_pos.did_page_crc = false; | ||
924 | if (in_trail) | ||
925 | list_move_tail(&page->lru, | ||
926 | &msg->trail->head); | ||
927 | else if (msg->pagelist) | ||
928 | list_move_tail(&page->lru, | ||
929 | &msg->pagelist->head); | ||
930 | #ifdef CONFIG_BLOCK | ||
931 | else if (msg->bio) | ||
932 | iter_bio_next(&msg->bio_iter, &msg->bio_seg); | ||
933 | #endif | ||
934 | } | ||
935 | } | 1091 | } |
936 | 1092 | ||
937 | dout("write_partial_msg_pages %p msg %p done\n", con, msg); | 1093 | dout("write_partial_msg_pages %p msg %p done\n", con, msg); |
938 | 1094 | ||
939 | /* prepare and queue up footer, too */ | 1095 | /* prepare and queue up footer, too */ |
940 | if (!do_datacrc) | 1096 | if (!do_datacrc) |
941 | con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; | 1097 | msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; |
942 | ceph_con_out_kvec_reset(con); | 1098 | con_out_kvec_reset(con); |
943 | prepare_write_message_footer(con); | 1099 | prepare_write_message_footer(con); |
944 | ret = 1; | 1100 | ret = 1; |
945 | out: | 1101 | out: |
@@ -1351,20 +1507,14 @@ static int process_banner(struct ceph_connection *con) | |||
1351 | ceph_pr_addr(&con->msgr->inst.addr.in_addr)); | 1507 | ceph_pr_addr(&con->msgr->inst.addr.in_addr)); |
1352 | } | 1508 | } |
1353 | 1509 | ||
1354 | set_bit(NEGOTIATING, &con->state); | ||
1355 | prepare_read_connect(con); | ||
1356 | return 0; | 1510 | return 0; |
1357 | } | 1511 | } |
1358 | 1512 | ||
1359 | static void fail_protocol(struct ceph_connection *con) | 1513 | static void fail_protocol(struct ceph_connection *con) |
1360 | { | 1514 | { |
1361 | reset_connection(con); | 1515 | reset_connection(con); |
1362 | set_bit(CLOSED, &con->state); /* in case there's queued work */ | 1516 | BUG_ON(con->state != CON_STATE_NEGOTIATING); |
1363 | 1517 | con->state = CON_STATE_CLOSED; | |
1364 | mutex_unlock(&con->mutex); | ||
1365 | if (con->ops->bad_proto) | ||
1366 | con->ops->bad_proto(con); | ||
1367 | mutex_lock(&con->mutex); | ||
1368 | } | 1518 | } |
1369 | 1519 | ||
1370 | static int process_connect(struct ceph_connection *con) | 1520 | static int process_connect(struct ceph_connection *con) |
@@ -1407,7 +1557,6 @@ static int process_connect(struct ceph_connection *con) | |||
1407 | return -1; | 1557 | return -1; |
1408 | } | 1558 | } |
1409 | con->auth_retry = 1; | 1559 | con->auth_retry = 1; |
1410 | ceph_con_out_kvec_reset(con); | ||
1411 | ret = prepare_write_connect(con); | 1560 | ret = prepare_write_connect(con); |
1412 | if (ret < 0) | 1561 | if (ret < 0) |
1413 | return ret; | 1562 | return ret; |
@@ -1428,7 +1577,6 @@ static int process_connect(struct ceph_connection *con) | |||
1428 | ENTITY_NAME(con->peer_name), | 1577 | ENTITY_NAME(con->peer_name), |
1429 | ceph_pr_addr(&con->peer_addr.in_addr)); | 1578 | ceph_pr_addr(&con->peer_addr.in_addr)); |
1430 | reset_connection(con); | 1579 | reset_connection(con); |
1431 | ceph_con_out_kvec_reset(con); | ||
1432 | ret = prepare_write_connect(con); | 1580 | ret = prepare_write_connect(con); |
1433 | if (ret < 0) | 1581 | if (ret < 0) |
1434 | return ret; | 1582 | return ret; |
@@ -1440,8 +1588,7 @@ static int process_connect(struct ceph_connection *con) | |||
1440 | if (con->ops->peer_reset) | 1588 | if (con->ops->peer_reset) |
1441 | con->ops->peer_reset(con); | 1589 | con->ops->peer_reset(con); |
1442 | mutex_lock(&con->mutex); | 1590 | mutex_lock(&con->mutex); |
1443 | if (test_bit(CLOSED, &con->state) || | 1591 | if (con->state != CON_STATE_NEGOTIATING) |
1444 | test_bit(OPENING, &con->state)) | ||
1445 | return -EAGAIN; | 1592 | return -EAGAIN; |
1446 | break; | 1593 | break; |
1447 | 1594 | ||
@@ -1454,7 +1601,6 @@ static int process_connect(struct ceph_connection *con) | |||
1454 | le32_to_cpu(con->out_connect.connect_seq), | 1601 | le32_to_cpu(con->out_connect.connect_seq), |
1455 | le32_to_cpu(con->in_reply.connect_seq)); | 1602 | le32_to_cpu(con->in_reply.connect_seq)); |
1456 | con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); | 1603 | con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); |
1457 | ceph_con_out_kvec_reset(con); | ||
1458 | ret = prepare_write_connect(con); | 1604 | ret = prepare_write_connect(con); |
1459 | if (ret < 0) | 1605 | if (ret < 0) |
1460 | return ret; | 1606 | return ret; |
@@ -1471,7 +1617,6 @@ static int process_connect(struct ceph_connection *con) | |||
1471 | le32_to_cpu(con->in_reply.global_seq)); | 1617 | le32_to_cpu(con->in_reply.global_seq)); |
1472 | get_global_seq(con->msgr, | 1618 | get_global_seq(con->msgr, |
1473 | le32_to_cpu(con->in_reply.global_seq)); | 1619 | le32_to_cpu(con->in_reply.global_seq)); |
1474 | ceph_con_out_kvec_reset(con); | ||
1475 | ret = prepare_write_connect(con); | 1620 | ret = prepare_write_connect(con); |
1476 | if (ret < 0) | 1621 | if (ret < 0) |
1477 | return ret; | 1622 | return ret; |
@@ -1489,7 +1634,10 @@ static int process_connect(struct ceph_connection *con) | |||
1489 | fail_protocol(con); | 1634 | fail_protocol(con); |
1490 | return -1; | 1635 | return -1; |
1491 | } | 1636 | } |
1492 | clear_bit(CONNECTING, &con->state); | 1637 | |
1638 | BUG_ON(con->state != CON_STATE_NEGOTIATING); | ||
1639 | con->state = CON_STATE_OPEN; | ||
1640 | |||
1493 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); | 1641 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); |
1494 | con->connect_seq++; | 1642 | con->connect_seq++; |
1495 | con->peer_features = server_feat; | 1643 | con->peer_features = server_feat; |
@@ -1501,7 +1649,9 @@ static int process_connect(struct ceph_connection *con) | |||
1501 | le32_to_cpu(con->in_reply.connect_seq)); | 1649 | le32_to_cpu(con->in_reply.connect_seq)); |
1502 | 1650 | ||
1503 | if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) | 1651 | if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) |
1504 | set_bit(LOSSYTX, &con->state); | 1652 | set_bit(CON_FLAG_LOSSYTX, &con->flags); |
1653 | |||
1654 | con->delay = 0; /* reset backoff memory */ | ||
1505 | 1655 | ||
1506 | prepare_read_tag(con); | 1656 | prepare_read_tag(con); |
1507 | break; | 1657 | break; |
@@ -1587,10 +1737,7 @@ static int read_partial_message_section(struct ceph_connection *con, | |||
1587 | return 1; | 1737 | return 1; |
1588 | } | 1738 | } |
1589 | 1739 | ||
1590 | static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | 1740 | static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip); |
1591 | struct ceph_msg_header *hdr, | ||
1592 | int *skip); | ||
1593 | |||
1594 | 1741 | ||
1595 | static int read_partial_message_pages(struct ceph_connection *con, | 1742 | static int read_partial_message_pages(struct ceph_connection *con, |
1596 | struct page **pages, | 1743 | struct page **pages, |
@@ -1633,9 +1780,6 @@ static int read_partial_message_bio(struct ceph_connection *con, | |||
1633 | void *p; | 1780 | void *p; |
1634 | int ret, left; | 1781 | int ret, left; |
1635 | 1782 | ||
1636 | if (IS_ERR(bv)) | ||
1637 | return PTR_ERR(bv); | ||
1638 | |||
1639 | left = min((int)(data_len - con->in_msg_pos.data_pos), | 1783 | left = min((int)(data_len - con->in_msg_pos.data_pos), |
1640 | (int)(bv->bv_len - con->in_msg_pos.page_pos)); | 1784 | (int)(bv->bv_len - con->in_msg_pos.page_pos)); |
1641 | 1785 | ||
@@ -1672,7 +1816,6 @@ static int read_partial_message(struct ceph_connection *con) | |||
1672 | int ret; | 1816 | int ret; |
1673 | unsigned int front_len, middle_len, data_len; | 1817 | unsigned int front_len, middle_len, data_len; |
1674 | bool do_datacrc = !con->msgr->nocrc; | 1818 | bool do_datacrc = !con->msgr->nocrc; |
1675 | int skip; | ||
1676 | u64 seq; | 1819 | u64 seq; |
1677 | u32 crc; | 1820 | u32 crc; |
1678 | 1821 | ||
@@ -1723,10 +1866,13 @@ static int read_partial_message(struct ceph_connection *con) | |||
1723 | 1866 | ||
1724 | /* allocate message? */ | 1867 | /* allocate message? */ |
1725 | if (!con->in_msg) { | 1868 | if (!con->in_msg) { |
1869 | int skip = 0; | ||
1870 | |||
1726 | dout("got hdr type %d front %d data %d\n", con->in_hdr.type, | 1871 | dout("got hdr type %d front %d data %d\n", con->in_hdr.type, |
1727 | con->in_hdr.front_len, con->in_hdr.data_len); | 1872 | con->in_hdr.front_len, con->in_hdr.data_len); |
1728 | skip = 0; | 1873 | ret = ceph_con_in_msg_alloc(con, &skip); |
1729 | con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); | 1874 | if (ret < 0) |
1875 | return ret; | ||
1730 | if (skip) { | 1876 | if (skip) { |
1731 | /* skip this message */ | 1877 | /* skip this message */ |
1732 | dout("alloc_msg said skip message\n"); | 1878 | dout("alloc_msg said skip message\n"); |
@@ -1737,11 +1883,9 @@ static int read_partial_message(struct ceph_connection *con) | |||
1737 | con->in_seq++; | 1883 | con->in_seq++; |
1738 | return 0; | 1884 | return 0; |
1739 | } | 1885 | } |
1740 | if (!con->in_msg) { | 1886 | |
1741 | con->error_msg = | 1887 | BUG_ON(!con->in_msg); |
1742 | "error allocating memory for incoming message"; | 1888 | BUG_ON(con->in_msg->con != con); |
1743 | return -ENOMEM; | ||
1744 | } | ||
1745 | m = con->in_msg; | 1889 | m = con->in_msg; |
1746 | m->front.iov_len = 0; /* haven't read it yet */ | 1890 | m->front.iov_len = 0; /* haven't read it yet */ |
1747 | if (m->middle) | 1891 | if (m->middle) |
@@ -1753,6 +1897,11 @@ static int read_partial_message(struct ceph_connection *con) | |||
1753 | else | 1897 | else |
1754 | con->in_msg_pos.page_pos = 0; | 1898 | con->in_msg_pos.page_pos = 0; |
1755 | con->in_msg_pos.data_pos = 0; | 1899 | con->in_msg_pos.data_pos = 0; |
1900 | |||
1901 | #ifdef CONFIG_BLOCK | ||
1902 | if (m->bio) | ||
1903 | init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); | ||
1904 | #endif | ||
1756 | } | 1905 | } |
1757 | 1906 | ||
1758 | /* front */ | 1907 | /* front */ |
@@ -1769,10 +1918,6 @@ static int read_partial_message(struct ceph_connection *con) | |||
1769 | if (ret <= 0) | 1918 | if (ret <= 0) |
1770 | return ret; | 1919 | return ret; |
1771 | } | 1920 | } |
1772 | #ifdef CONFIG_BLOCK | ||
1773 | if (m->bio && !m->bio_iter) | ||
1774 | init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); | ||
1775 | #endif | ||
1776 | 1921 | ||
1777 | /* (page) data */ | 1922 | /* (page) data */ |
1778 | while (con->in_msg_pos.data_pos < data_len) { | 1923 | while (con->in_msg_pos.data_pos < data_len) { |
@@ -1783,7 +1928,7 @@ static int read_partial_message(struct ceph_connection *con) | |||
1783 | return ret; | 1928 | return ret; |
1784 | #ifdef CONFIG_BLOCK | 1929 | #ifdef CONFIG_BLOCK |
1785 | } else if (m->bio) { | 1930 | } else if (m->bio) { |
1786 | 1931 | BUG_ON(!m->bio_iter); | |
1787 | ret = read_partial_message_bio(con, | 1932 | ret = read_partial_message_bio(con, |
1788 | &m->bio_iter, &m->bio_seg, | 1933 | &m->bio_iter, &m->bio_seg, |
1789 | data_len, do_datacrc); | 1934 | data_len, do_datacrc); |
@@ -1837,8 +1982,11 @@ static void process_message(struct ceph_connection *con) | |||
1837 | { | 1982 | { |
1838 | struct ceph_msg *msg; | 1983 | struct ceph_msg *msg; |
1839 | 1984 | ||
1985 | BUG_ON(con->in_msg->con != con); | ||
1986 | con->in_msg->con = NULL; | ||
1840 | msg = con->in_msg; | 1987 | msg = con->in_msg; |
1841 | con->in_msg = NULL; | 1988 | con->in_msg = NULL; |
1989 | con->ops->put(con); | ||
1842 | 1990 | ||
1843 | /* if first message, set peer_name */ | 1991 | /* if first message, set peer_name */ |
1844 | if (con->peer_name.type == 0) | 1992 | if (con->peer_name.type == 0) |
@@ -1858,7 +2006,6 @@ static void process_message(struct ceph_connection *con) | |||
1858 | con->ops->dispatch(con, msg); | 2006 | con->ops->dispatch(con, msg); |
1859 | 2007 | ||
1860 | mutex_lock(&con->mutex); | 2008 | mutex_lock(&con->mutex); |
1861 | prepare_read_tag(con); | ||
1862 | } | 2009 | } |
1863 | 2010 | ||
1864 | 2011 | ||
@@ -1870,22 +2017,19 @@ static int try_write(struct ceph_connection *con) | |||
1870 | { | 2017 | { |
1871 | int ret = 1; | 2018 | int ret = 1; |
1872 | 2019 | ||
1873 | dout("try_write start %p state %lu nref %d\n", con, con->state, | 2020 | dout("try_write start %p state %lu\n", con, con->state); |
1874 | atomic_read(&con->nref)); | ||
1875 | 2021 | ||
1876 | more: | 2022 | more: |
1877 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); | 2023 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); |
1878 | 2024 | ||
1879 | /* open the socket first? */ | 2025 | /* open the socket first? */ |
1880 | if (con->sock == NULL) { | 2026 | if (con->state == CON_STATE_PREOPEN) { |
1881 | ceph_con_out_kvec_reset(con); | 2027 | BUG_ON(con->sock); |
2028 | con->state = CON_STATE_CONNECTING; | ||
2029 | |||
2030 | con_out_kvec_reset(con); | ||
1882 | prepare_write_banner(con); | 2031 | prepare_write_banner(con); |
1883 | ret = prepare_write_connect(con); | ||
1884 | if (ret < 0) | ||
1885 | goto out; | ||
1886 | prepare_read_banner(con); | 2032 | prepare_read_banner(con); |
1887 | set_bit(CONNECTING, &con->state); | ||
1888 | clear_bit(NEGOTIATING, &con->state); | ||
1889 | 2033 | ||
1890 | BUG_ON(con->in_msg); | 2034 | BUG_ON(con->in_msg); |
1891 | con->in_tag = CEPH_MSGR_TAG_READY; | 2035 | con->in_tag = CEPH_MSGR_TAG_READY; |
@@ -1932,7 +2076,7 @@ more_kvec: | |||
1932 | } | 2076 | } |
1933 | 2077 | ||
1934 | do_next: | 2078 | do_next: |
1935 | if (!test_bit(CONNECTING, &con->state)) { | 2079 | if (con->state == CON_STATE_OPEN) { |
1936 | /* is anything else pending? */ | 2080 | /* is anything else pending? */ |
1937 | if (!list_empty(&con->out_queue)) { | 2081 | if (!list_empty(&con->out_queue)) { |
1938 | prepare_write_message(con); | 2082 | prepare_write_message(con); |
@@ -1942,14 +2086,15 @@ do_next: | |||
1942 | prepare_write_ack(con); | 2086 | prepare_write_ack(con); |
1943 | goto more; | 2087 | goto more; |
1944 | } | 2088 | } |
1945 | if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { | 2089 | if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING, |
2090 | &con->flags)) { | ||
1946 | prepare_write_keepalive(con); | 2091 | prepare_write_keepalive(con); |
1947 | goto more; | 2092 | goto more; |
1948 | } | 2093 | } |
1949 | } | 2094 | } |
1950 | 2095 | ||
1951 | /* Nothing to do! */ | 2096 | /* Nothing to do! */ |
1952 | clear_bit(WRITE_PENDING, &con->state); | 2097 | clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
1953 | dout("try_write nothing else to write.\n"); | 2098 | dout("try_write nothing else to write.\n"); |
1954 | ret = 0; | 2099 | ret = 0; |
1955 | out: | 2100 | out: |
@@ -1966,38 +2111,42 @@ static int try_read(struct ceph_connection *con) | |||
1966 | { | 2111 | { |
1967 | int ret = -1; | 2112 | int ret = -1; |
1968 | 2113 | ||
1969 | if (!con->sock) | 2114 | more: |
1970 | return 0; | 2115 | dout("try_read start on %p state %lu\n", con, con->state); |
1971 | 2116 | if (con->state != CON_STATE_CONNECTING && | |
1972 | if (test_bit(STANDBY, &con->state)) | 2117 | con->state != CON_STATE_NEGOTIATING && |
2118 | con->state != CON_STATE_OPEN) | ||
1973 | return 0; | 2119 | return 0; |
1974 | 2120 | ||
1975 | dout("try_read start on %p\n", con); | 2121 | BUG_ON(!con->sock); |
1976 | 2122 | ||
1977 | more: | ||
1978 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, | 2123 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
1979 | con->in_base_pos); | 2124 | con->in_base_pos); |
1980 | 2125 | ||
1981 | /* | 2126 | if (con->state == CON_STATE_CONNECTING) { |
1982 | * process_connect and process_message drop and re-take | 2127 | dout("try_read connecting\n"); |
1983 | * con->mutex. make sure we handle a racing close or reopen. | 2128 | ret = read_partial_banner(con); |
1984 | */ | 2129 | if (ret <= 0) |
1985 | if (test_bit(CLOSED, &con->state) || | 2130 | goto out; |
1986 | test_bit(OPENING, &con->state)) { | 2131 | ret = process_banner(con); |
1987 | ret = -EAGAIN; | 2132 | if (ret < 0) |
2133 | goto out; | ||
2134 | |||
2135 | BUG_ON(con->state != CON_STATE_CONNECTING); | ||
2136 | con->state = CON_STATE_NEGOTIATING; | ||
2137 | |||
2138 | /* Banner is good, exchange connection info */ | ||
2139 | ret = prepare_write_connect(con); | ||
2140 | if (ret < 0) | ||
2141 | goto out; | ||
2142 | prepare_read_connect(con); | ||
2143 | |||
2144 | /* Send connection info before awaiting response */ | ||
1988 | goto out; | 2145 | goto out; |
1989 | } | 2146 | } |
1990 | 2147 | ||
1991 | if (test_bit(CONNECTING, &con->state)) { | 2148 | if (con->state == CON_STATE_NEGOTIATING) { |
1992 | if (!test_bit(NEGOTIATING, &con->state)) { | 2149 | dout("try_read negotiating\n"); |
1993 | dout("try_read connecting\n"); | ||
1994 | ret = read_partial_banner(con); | ||
1995 | if (ret <= 0) | ||
1996 | goto out; | ||
1997 | ret = process_banner(con); | ||
1998 | if (ret < 0) | ||
1999 | goto out; | ||
2000 | } | ||
2001 | ret = read_partial_connect(con); | 2150 | ret = read_partial_connect(con); |
2002 | if (ret <= 0) | 2151 | if (ret <= 0) |
2003 | goto out; | 2152 | goto out; |
@@ -2007,6 +2156,8 @@ more: | |||
2007 | goto more; | 2156 | goto more; |
2008 | } | 2157 | } |
2009 | 2158 | ||
2159 | BUG_ON(con->state != CON_STATE_OPEN); | ||
2160 | |||
2010 | if (con->in_base_pos < 0) { | 2161 | if (con->in_base_pos < 0) { |
2011 | /* | 2162 | /* |
2012 | * skipping + discarding content. | 2163 | * skipping + discarding content. |
@@ -2040,7 +2191,8 @@ more: | |||
2040 | prepare_read_ack(con); | 2191 | prepare_read_ack(con); |
2041 | break; | 2192 | break; |
2042 | case CEPH_MSGR_TAG_CLOSE: | 2193 | case CEPH_MSGR_TAG_CLOSE: |
2043 | set_bit(CLOSED, &con->state); /* fixme */ | 2194 | con_close_socket(con); |
2195 | con->state = CON_STATE_CLOSED; | ||
2044 | goto out; | 2196 | goto out; |
2045 | default: | 2197 | default: |
2046 | goto bad_tag; | 2198 | goto bad_tag; |
@@ -2063,6 +2215,8 @@ more: | |||
2063 | if (con->in_tag == CEPH_MSGR_TAG_READY) | 2215 | if (con->in_tag == CEPH_MSGR_TAG_READY) |
2064 | goto more; | 2216 | goto more; |
2065 | process_message(con); | 2217 | process_message(con); |
2218 | if (con->state == CON_STATE_OPEN) | ||
2219 | prepare_read_tag(con); | ||
2066 | goto more; | 2220 | goto more; |
2067 | } | 2221 | } |
2068 | if (con->in_tag == CEPH_MSGR_TAG_ACK) { | 2222 | if (con->in_tag == CEPH_MSGR_TAG_ACK) { |
@@ -2091,12 +2245,6 @@ bad_tag: | |||
2091 | */ | 2245 | */ |
2092 | static void queue_con(struct ceph_connection *con) | 2246 | static void queue_con(struct ceph_connection *con) |
2093 | { | 2247 | { |
2094 | if (test_bit(DEAD, &con->state)) { | ||
2095 | dout("queue_con %p ignoring: DEAD\n", | ||
2096 | con); | ||
2097 | return; | ||
2098 | } | ||
2099 | |||
2100 | if (!con->ops->get(con)) { | 2248 | if (!con->ops->get(con)) { |
2101 | dout("queue_con %p ref count 0\n", con); | 2249 | dout("queue_con %p ref count 0\n", con); |
2102 | return; | 2250 | return; |
@@ -2121,7 +2269,26 @@ static void con_work(struct work_struct *work) | |||
2121 | 2269 | ||
2122 | mutex_lock(&con->mutex); | 2270 | mutex_lock(&con->mutex); |
2123 | restart: | 2271 | restart: |
2124 | if (test_and_clear_bit(BACKOFF, &con->state)) { | 2272 | if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) { |
2273 | switch (con->state) { | ||
2274 | case CON_STATE_CONNECTING: | ||
2275 | con->error_msg = "connection failed"; | ||
2276 | break; | ||
2277 | case CON_STATE_NEGOTIATING: | ||
2278 | con->error_msg = "negotiation failed"; | ||
2279 | break; | ||
2280 | case CON_STATE_OPEN: | ||
2281 | con->error_msg = "socket closed"; | ||
2282 | break; | ||
2283 | default: | ||
2284 | dout("unrecognized con state %d\n", (int)con->state); | ||
2285 | con->error_msg = "unrecognized con state"; | ||
2286 | BUG(); | ||
2287 | } | ||
2288 | goto fault; | ||
2289 | } | ||
2290 | |||
2291 | if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { | ||
2125 | dout("con_work %p backing off\n", con); | 2292 | dout("con_work %p backing off\n", con); |
2126 | if (queue_delayed_work(ceph_msgr_wq, &con->work, | 2293 | if (queue_delayed_work(ceph_msgr_wq, &con->work, |
2127 | round_jiffies_relative(con->delay))) { | 2294 | round_jiffies_relative(con->delay))) { |
@@ -2135,35 +2302,35 @@ restart: | |||
2135 | } | 2302 | } |
2136 | } | 2303 | } |
2137 | 2304 | ||
2138 | if (test_bit(STANDBY, &con->state)) { | 2305 | if (con->state == CON_STATE_STANDBY) { |
2139 | dout("con_work %p STANDBY\n", con); | 2306 | dout("con_work %p STANDBY\n", con); |
2140 | goto done; | 2307 | goto done; |
2141 | } | 2308 | } |
2142 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ | 2309 | if (con->state == CON_STATE_CLOSED) { |
2143 | dout("con_work CLOSED\n"); | 2310 | dout("con_work %p CLOSED\n", con); |
2144 | con_close_socket(con); | 2311 | BUG_ON(con->sock); |
2145 | goto done; | 2312 | goto done; |
2146 | } | 2313 | } |
2147 | if (test_and_clear_bit(OPENING, &con->state)) { | 2314 | if (con->state == CON_STATE_PREOPEN) { |
2148 | /* reopen w/ new peer */ | ||
2149 | dout("con_work OPENING\n"); | 2315 | dout("con_work OPENING\n"); |
2150 | con_close_socket(con); | 2316 | BUG_ON(con->sock); |
2151 | } | 2317 | } |
2152 | 2318 | ||
2153 | if (test_and_clear_bit(SOCK_CLOSED, &con->state)) | ||
2154 | goto fault; | ||
2155 | |||
2156 | ret = try_read(con); | 2319 | ret = try_read(con); |
2157 | if (ret == -EAGAIN) | 2320 | if (ret == -EAGAIN) |
2158 | goto restart; | 2321 | goto restart; |
2159 | if (ret < 0) | 2322 | if (ret < 0) { |
2323 | con->error_msg = "socket error on read"; | ||
2160 | goto fault; | 2324 | goto fault; |
2325 | } | ||
2161 | 2326 | ||
2162 | ret = try_write(con); | 2327 | ret = try_write(con); |
2163 | if (ret == -EAGAIN) | 2328 | if (ret == -EAGAIN) |
2164 | goto restart; | 2329 | goto restart; |
2165 | if (ret < 0) | 2330 | if (ret < 0) { |
2331 | con->error_msg = "socket error on write"; | ||
2166 | goto fault; | 2332 | goto fault; |
2333 | } | ||
2167 | 2334 | ||
2168 | done: | 2335 | done: |
2169 | mutex_unlock(&con->mutex); | 2336 | mutex_unlock(&con->mutex); |
@@ -2172,7 +2339,6 @@ done_unlocked: | |||
2172 | return; | 2339 | return; |
2173 | 2340 | ||
2174 | fault: | 2341 | fault: |
2175 | mutex_unlock(&con->mutex); | ||
2176 | ceph_fault(con); /* error/fault path */ | 2342 | ceph_fault(con); /* error/fault path */ |
2177 | goto done_unlocked; | 2343 | goto done_unlocked; |
2178 | } | 2344 | } |
@@ -2183,26 +2349,31 @@ fault: | |||
2183 | * exponential backoff | 2349 | * exponential backoff |
2184 | */ | 2350 | */ |
2185 | static void ceph_fault(struct ceph_connection *con) | 2351 | static void ceph_fault(struct ceph_connection *con) |
2352 | __releases(con->mutex) | ||
2186 | { | 2353 | { |
2187 | pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), | 2354 | pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), |
2188 | ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); | 2355 | ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); |
2189 | dout("fault %p state %lu to peer %s\n", | 2356 | dout("fault %p state %lu to peer %s\n", |
2190 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); | 2357 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); |
2191 | 2358 | ||
2192 | if (test_bit(LOSSYTX, &con->state)) { | 2359 | BUG_ON(con->state != CON_STATE_CONNECTING && |
2193 | dout("fault on LOSSYTX channel\n"); | 2360 | con->state != CON_STATE_NEGOTIATING && |
2194 | goto out; | 2361 | con->state != CON_STATE_OPEN); |
2195 | } | ||
2196 | |||
2197 | mutex_lock(&con->mutex); | ||
2198 | if (test_bit(CLOSED, &con->state)) | ||
2199 | goto out_unlock; | ||
2200 | 2362 | ||
2201 | con_close_socket(con); | 2363 | con_close_socket(con); |
2202 | 2364 | ||
2365 | if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) { | ||
2366 | dout("fault on LOSSYTX channel, marking CLOSED\n"); | ||
2367 | con->state = CON_STATE_CLOSED; | ||
2368 | goto out_unlock; | ||
2369 | } | ||
2370 | |||
2203 | if (con->in_msg) { | 2371 | if (con->in_msg) { |
2372 | BUG_ON(con->in_msg->con != con); | ||
2373 | con->in_msg->con = NULL; | ||
2204 | ceph_msg_put(con->in_msg); | 2374 | ceph_msg_put(con->in_msg); |
2205 | con->in_msg = NULL; | 2375 | con->in_msg = NULL; |
2376 | con->ops->put(con); | ||
2206 | } | 2377 | } |
2207 | 2378 | ||
2208 | /* Requeue anything that hasn't been acked */ | 2379 | /* Requeue anything that hasn't been acked */ |
@@ -2211,12 +2382,13 @@ static void ceph_fault(struct ceph_connection *con) | |||
2211 | /* If there are no messages queued or keepalive pending, place | 2382 | /* If there are no messages queued or keepalive pending, place |
2212 | * the connection in a STANDBY state */ | 2383 | * the connection in a STANDBY state */ |
2213 | if (list_empty(&con->out_queue) && | 2384 | if (list_empty(&con->out_queue) && |
2214 | !test_bit(KEEPALIVE_PENDING, &con->state)) { | 2385 | !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) { |
2215 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); | 2386 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); |
2216 | clear_bit(WRITE_PENDING, &con->state); | 2387 | clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
2217 | set_bit(STANDBY, &con->state); | 2388 | con->state = CON_STATE_STANDBY; |
2218 | } else { | 2389 | } else { |
2219 | /* retry after a delay. */ | 2390 | /* retry after a delay. */ |
2391 | con->state = CON_STATE_PREOPEN; | ||
2220 | if (con->delay == 0) | 2392 | if (con->delay == 0) |
2221 | con->delay = BASE_DELAY_INTERVAL; | 2393 | con->delay = BASE_DELAY_INTERVAL; |
2222 | else if (con->delay < MAX_DELAY_INTERVAL) | 2394 | else if (con->delay < MAX_DELAY_INTERVAL) |
@@ -2237,13 +2409,12 @@ static void ceph_fault(struct ceph_connection *con) | |||
2237 | * that when con_work restarts we schedule the | 2409 | * that when con_work restarts we schedule the |
2238 | * delay then. | 2410 | * delay then. |
2239 | */ | 2411 | */ |
2240 | set_bit(BACKOFF, &con->state); | 2412 | set_bit(CON_FLAG_BACKOFF, &con->flags); |
2241 | } | 2413 | } |
2242 | } | 2414 | } |
2243 | 2415 | ||
2244 | out_unlock: | 2416 | out_unlock: |
2245 | mutex_unlock(&con->mutex); | 2417 | mutex_unlock(&con->mutex); |
2246 | out: | ||
2247 | /* | 2418 | /* |
2248 | * in case we faulted due to authentication, invalidate our | 2419 | * in case we faulted due to authentication, invalidate our |
2249 | * current tickets so that we can get new ones. | 2420 | * current tickets so that we can get new ones. |
@@ -2260,18 +2431,14 @@ out: | |||
2260 | 2431 | ||
2261 | 2432 | ||
2262 | /* | 2433 | /* |
2263 | * create a new messenger instance | 2434 | * initialize a new messenger instance |
2264 | */ | 2435 | */ |
2265 | struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, | 2436 | void ceph_messenger_init(struct ceph_messenger *msgr, |
2266 | u32 supported_features, | 2437 | struct ceph_entity_addr *myaddr, |
2267 | u32 required_features) | 2438 | u32 supported_features, |
2439 | u32 required_features, | ||
2440 | bool nocrc) | ||
2268 | { | 2441 | { |
2269 | struct ceph_messenger *msgr; | ||
2270 | |||
2271 | msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); | ||
2272 | if (msgr == NULL) | ||
2273 | return ERR_PTR(-ENOMEM); | ||
2274 | |||
2275 | msgr->supported_features = supported_features; | 2442 | msgr->supported_features = supported_features; |
2276 | msgr->required_features = required_features; | 2443 | msgr->required_features = required_features; |
2277 | 2444 | ||
@@ -2284,30 +2451,23 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, | |||
2284 | msgr->inst.addr.type = 0; | 2451 | msgr->inst.addr.type = 0; |
2285 | get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); | 2452 | get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); |
2286 | encode_my_addr(msgr); | 2453 | encode_my_addr(msgr); |
2454 | msgr->nocrc = nocrc; | ||
2287 | 2455 | ||
2288 | dout("messenger_create %p\n", msgr); | 2456 | atomic_set(&msgr->stopping, 0); |
2289 | return msgr; | ||
2290 | } | ||
2291 | EXPORT_SYMBOL(ceph_messenger_create); | ||
2292 | 2457 | ||
2293 | void ceph_messenger_destroy(struct ceph_messenger *msgr) | 2458 | dout("%s %p\n", __func__, msgr); |
2294 | { | ||
2295 | dout("destroy %p\n", msgr); | ||
2296 | kfree(msgr); | ||
2297 | dout("destroyed messenger %p\n", msgr); | ||
2298 | } | 2459 | } |
2299 | EXPORT_SYMBOL(ceph_messenger_destroy); | 2460 | EXPORT_SYMBOL(ceph_messenger_init); |
2300 | 2461 | ||
2301 | static void clear_standby(struct ceph_connection *con) | 2462 | static void clear_standby(struct ceph_connection *con) |
2302 | { | 2463 | { |
2303 | /* come back from STANDBY? */ | 2464 | /* come back from STANDBY? */ |
2304 | if (test_and_clear_bit(STANDBY, &con->state)) { | 2465 | if (con->state == CON_STATE_STANDBY) { |
2305 | mutex_lock(&con->mutex); | ||
2306 | dout("clear_standby %p and ++connect_seq\n", con); | 2466 | dout("clear_standby %p and ++connect_seq\n", con); |
2467 | con->state = CON_STATE_PREOPEN; | ||
2307 | con->connect_seq++; | 2468 | con->connect_seq++; |
2308 | WARN_ON(test_bit(WRITE_PENDING, &con->state)); | 2469 | WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags)); |
2309 | WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); | 2470 | WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)); |
2310 | mutex_unlock(&con->mutex); | ||
2311 | } | 2471 | } |
2312 | } | 2472 | } |
2313 | 2473 | ||
@@ -2316,21 +2476,24 @@ static void clear_standby(struct ceph_connection *con) | |||
2316 | */ | 2476 | */ |
2317 | void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | 2477 | void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) |
2318 | { | 2478 | { |
2319 | if (test_bit(CLOSED, &con->state)) { | ||
2320 | dout("con_send %p closed, dropping %p\n", con, msg); | ||
2321 | ceph_msg_put(msg); | ||
2322 | return; | ||
2323 | } | ||
2324 | |||
2325 | /* set src+dst */ | 2479 | /* set src+dst */ |
2326 | msg->hdr.src = con->msgr->inst.name; | 2480 | msg->hdr.src = con->msgr->inst.name; |
2327 | |||
2328 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); | 2481 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); |
2329 | |||
2330 | msg->needs_out_seq = true; | 2482 | msg->needs_out_seq = true; |
2331 | 2483 | ||
2332 | /* queue */ | ||
2333 | mutex_lock(&con->mutex); | 2484 | mutex_lock(&con->mutex); |
2485 | |||
2486 | if (con->state == CON_STATE_CLOSED) { | ||
2487 | dout("con_send %p closed, dropping %p\n", con, msg); | ||
2488 | ceph_msg_put(msg); | ||
2489 | mutex_unlock(&con->mutex); | ||
2490 | return; | ||
2491 | } | ||
2492 | |||
2493 | BUG_ON(msg->con != NULL); | ||
2494 | msg->con = con->ops->get(con); | ||
2495 | BUG_ON(msg->con == NULL); | ||
2496 | |||
2334 | BUG_ON(!list_empty(&msg->list_head)); | 2497 | BUG_ON(!list_empty(&msg->list_head)); |
2335 | list_add_tail(&msg->list_head, &con->out_queue); | 2498 | list_add_tail(&msg->list_head, &con->out_queue); |
2336 | dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, | 2499 | dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, |
@@ -2339,12 +2502,13 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
2339 | le32_to_cpu(msg->hdr.front_len), | 2502 | le32_to_cpu(msg->hdr.front_len), |
2340 | le32_to_cpu(msg->hdr.middle_len), | 2503 | le32_to_cpu(msg->hdr.middle_len), |
2341 | le32_to_cpu(msg->hdr.data_len)); | 2504 | le32_to_cpu(msg->hdr.data_len)); |
2505 | |||
2506 | clear_standby(con); | ||
2342 | mutex_unlock(&con->mutex); | 2507 | mutex_unlock(&con->mutex); |
2343 | 2508 | ||
2344 | /* if there wasn't anything waiting to send before, queue | 2509 | /* if there wasn't anything waiting to send before, queue |
2345 | * new work */ | 2510 | * new work */ |
2346 | clear_standby(con); | 2511 | if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) |
2347 | if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) | ||
2348 | queue_con(con); | 2512 | queue_con(con); |
2349 | } | 2513 | } |
2350 | EXPORT_SYMBOL(ceph_con_send); | 2514 | EXPORT_SYMBOL(ceph_con_send); |
@@ -2352,24 +2516,34 @@ EXPORT_SYMBOL(ceph_con_send); | |||
2352 | /* | 2516 | /* |
2353 | * Revoke a message that was previously queued for send | 2517 | * Revoke a message that was previously queued for send |
2354 | */ | 2518 | */ |
2355 | void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) | 2519 | void ceph_msg_revoke(struct ceph_msg *msg) |
2356 | { | 2520 | { |
2521 | struct ceph_connection *con = msg->con; | ||
2522 | |||
2523 | if (!con) | ||
2524 | return; /* Message not in our possession */ | ||
2525 | |||
2357 | mutex_lock(&con->mutex); | 2526 | mutex_lock(&con->mutex); |
2358 | if (!list_empty(&msg->list_head)) { | 2527 | if (!list_empty(&msg->list_head)) { |
2359 | dout("con_revoke %p msg %p - was on queue\n", con, msg); | 2528 | dout("%s %p msg %p - was on queue\n", __func__, con, msg); |
2360 | list_del_init(&msg->list_head); | 2529 | list_del_init(&msg->list_head); |
2361 | ceph_msg_put(msg); | 2530 | BUG_ON(msg->con == NULL); |
2531 | msg->con->ops->put(msg->con); | ||
2532 | msg->con = NULL; | ||
2362 | msg->hdr.seq = 0; | 2533 | msg->hdr.seq = 0; |
2534 | |||
2535 | ceph_msg_put(msg); | ||
2363 | } | 2536 | } |
2364 | if (con->out_msg == msg) { | 2537 | if (con->out_msg == msg) { |
2365 | dout("con_revoke %p msg %p - was sending\n", con, msg); | 2538 | dout("%s %p msg %p - was sending\n", __func__, con, msg); |
2366 | con->out_msg = NULL; | 2539 | con->out_msg = NULL; |
2367 | if (con->out_kvec_is_msg) { | 2540 | if (con->out_kvec_is_msg) { |
2368 | con->out_skip = con->out_kvec_bytes; | 2541 | con->out_skip = con->out_kvec_bytes; |
2369 | con->out_kvec_is_msg = false; | 2542 | con->out_kvec_is_msg = false; |
2370 | } | 2543 | } |
2371 | ceph_msg_put(msg); | ||
2372 | msg->hdr.seq = 0; | 2544 | msg->hdr.seq = 0; |
2545 | |||
2546 | ceph_msg_put(msg); | ||
2373 | } | 2547 | } |
2374 | mutex_unlock(&con->mutex); | 2548 | mutex_unlock(&con->mutex); |
2375 | } | 2549 | } |
@@ -2377,17 +2551,27 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) | |||
2377 | /* | 2551 | /* |
2378 | * Revoke a message that we may be reading data into | 2552 | * Revoke a message that we may be reading data into |
2379 | */ | 2553 | */ |
2380 | void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) | 2554 | void ceph_msg_revoke_incoming(struct ceph_msg *msg) |
2381 | { | 2555 | { |
2556 | struct ceph_connection *con; | ||
2557 | |||
2558 | BUG_ON(msg == NULL); | ||
2559 | if (!msg->con) { | ||
2560 | dout("%s msg %p null con\n", __func__, msg); | ||
2561 | |||
2562 | return; /* Message not in our possession */ | ||
2563 | } | ||
2564 | |||
2565 | con = msg->con; | ||
2382 | mutex_lock(&con->mutex); | 2566 | mutex_lock(&con->mutex); |
2383 | if (con->in_msg && con->in_msg == msg) { | 2567 | if (con->in_msg == msg) { |
2384 | unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); | 2568 | unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); |
2385 | unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); | 2569 | unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); |
2386 | unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); | 2570 | unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); |
2387 | 2571 | ||
2388 | /* skip rest of message */ | 2572 | /* skip rest of message */ |
2389 | dout("con_revoke_pages %p msg %p revoked\n", con, msg); | 2573 | dout("%s %p msg %p revoked\n", __func__, con, msg); |
2390 | con->in_base_pos = con->in_base_pos - | 2574 | con->in_base_pos = con->in_base_pos - |
2391 | sizeof(struct ceph_msg_header) - | 2575 | sizeof(struct ceph_msg_header) - |
2392 | front_len - | 2576 | front_len - |
2393 | middle_len - | 2577 | middle_len - |
@@ -2398,8 +2582,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) | |||
2398 | con->in_tag = CEPH_MSGR_TAG_READY; | 2582 | con->in_tag = CEPH_MSGR_TAG_READY; |
2399 | con->in_seq++; | 2583 | con->in_seq++; |
2400 | } else { | 2584 | } else { |
2401 | dout("con_revoke_pages %p msg %p pages %p no-op\n", | 2585 | dout("%s %p in_msg %p msg %p no-op\n", |
2402 | con, con->in_msg, msg); | 2586 | __func__, con, con->in_msg, msg); |
2403 | } | 2587 | } |
2404 | mutex_unlock(&con->mutex); | 2588 | mutex_unlock(&con->mutex); |
2405 | } | 2589 | } |
@@ -2410,9 +2594,11 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) | |||
2410 | void ceph_con_keepalive(struct ceph_connection *con) | 2594 | void ceph_con_keepalive(struct ceph_connection *con) |
2411 | { | 2595 | { |
2412 | dout("con_keepalive %p\n", con); | 2596 | dout("con_keepalive %p\n", con); |
2597 | mutex_lock(&con->mutex); | ||
2413 | clear_standby(con); | 2598 | clear_standby(con); |
2414 | if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && | 2599 | mutex_unlock(&con->mutex); |
2415 | test_and_set_bit(WRITE_PENDING, &con->state) == 0) | 2600 | if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 && |
2601 | test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) | ||
2416 | queue_con(con); | 2602 | queue_con(con); |
2417 | } | 2603 | } |
2418 | EXPORT_SYMBOL(ceph_con_keepalive); | 2604 | EXPORT_SYMBOL(ceph_con_keepalive); |
@@ -2431,6 +2617,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, | |||
2431 | if (m == NULL) | 2617 | if (m == NULL) |
2432 | goto out; | 2618 | goto out; |
2433 | kref_init(&m->kref); | 2619 | kref_init(&m->kref); |
2620 | |||
2621 | m->con = NULL; | ||
2434 | INIT_LIST_HEAD(&m->list_head); | 2622 | INIT_LIST_HEAD(&m->list_head); |
2435 | 2623 | ||
2436 | m->hdr.tid = 0; | 2624 | m->hdr.tid = 0; |
@@ -2526,46 +2714,77 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) | |||
2526 | } | 2714 | } |
2527 | 2715 | ||
2528 | /* | 2716 | /* |
2529 | * Generic message allocator, for incoming messages. | 2717 | * Allocate a message for receiving an incoming message on a |
2718 | * connection, and save the result in con->in_msg. Uses the | ||
2719 | * connection's private alloc_msg op if available. | ||
2720 | * | ||
2721 | * Returns 0 on success, or a negative error code. | ||
2722 | * | ||
2723 | * On success, if we set *skip = 1: | ||
2724 | * - the next message should be skipped and ignored. | ||
2725 | * - con->in_msg == NULL | ||
2726 | * or if we set *skip = 0: | ||
2727 | * - con->in_msg is non-null. | ||
2728 | * On error (ENOMEM, EAGAIN, ...), | ||
2729 | * - con->in_msg == NULL | ||
2530 | */ | 2730 | */ |
2531 | static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | 2731 | static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) |
2532 | struct ceph_msg_header *hdr, | ||
2533 | int *skip) | ||
2534 | { | 2732 | { |
2733 | struct ceph_msg_header *hdr = &con->in_hdr; | ||
2535 | int type = le16_to_cpu(hdr->type); | 2734 | int type = le16_to_cpu(hdr->type); |
2536 | int front_len = le32_to_cpu(hdr->front_len); | 2735 | int front_len = le32_to_cpu(hdr->front_len); |
2537 | int middle_len = le32_to_cpu(hdr->middle_len); | 2736 | int middle_len = le32_to_cpu(hdr->middle_len); |
2538 | struct ceph_msg *msg = NULL; | 2737 | int ret = 0; |
2539 | int ret; | 2738 | |
2739 | BUG_ON(con->in_msg != NULL); | ||
2540 | 2740 | ||
2541 | if (con->ops->alloc_msg) { | 2741 | if (con->ops->alloc_msg) { |
2742 | struct ceph_msg *msg; | ||
2743 | |||
2542 | mutex_unlock(&con->mutex); | 2744 | mutex_unlock(&con->mutex); |
2543 | msg = con->ops->alloc_msg(con, hdr, skip); | 2745 | msg = con->ops->alloc_msg(con, hdr, skip); |
2544 | mutex_lock(&con->mutex); | 2746 | mutex_lock(&con->mutex); |
2545 | if (!msg || *skip) | 2747 | if (con->state != CON_STATE_OPEN) { |
2546 | return NULL; | 2748 | ceph_msg_put(msg); |
2749 | return -EAGAIN; | ||
2750 | } | ||
2751 | con->in_msg = msg; | ||
2752 | if (con->in_msg) { | ||
2753 | con->in_msg->con = con->ops->get(con); | ||
2754 | BUG_ON(con->in_msg->con == NULL); | ||
2755 | } | ||
2756 | if (*skip) { | ||
2757 | con->in_msg = NULL; | ||
2758 | return 0; | ||
2759 | } | ||
2760 | if (!con->in_msg) { | ||
2761 | con->error_msg = | ||
2762 | "error allocating memory for incoming message"; | ||
2763 | return -ENOMEM; | ||
2764 | } | ||
2547 | } | 2765 | } |
2548 | if (!msg) { | 2766 | if (!con->in_msg) { |
2549 | *skip = 0; | 2767 | con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false); |
2550 | msg = ceph_msg_new(type, front_len, GFP_NOFS, false); | 2768 | if (!con->in_msg) { |
2551 | if (!msg) { | ||
2552 | pr_err("unable to allocate msg type %d len %d\n", | 2769 | pr_err("unable to allocate msg type %d len %d\n", |
2553 | type, front_len); | 2770 | type, front_len); |
2554 | return NULL; | 2771 | return -ENOMEM; |
2555 | } | 2772 | } |
2556 | msg->page_alignment = le16_to_cpu(hdr->data_off); | 2773 | con->in_msg->con = con->ops->get(con); |
2774 | BUG_ON(con->in_msg->con == NULL); | ||
2775 | con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); | ||
2557 | } | 2776 | } |
2558 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); | 2777 | memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); |
2559 | 2778 | ||
2560 | if (middle_len && !msg->middle) { | 2779 | if (middle_len && !con->in_msg->middle) { |
2561 | ret = ceph_alloc_middle(con, msg); | 2780 | ret = ceph_alloc_middle(con, con->in_msg); |
2562 | if (ret < 0) { | 2781 | if (ret < 0) { |
2563 | ceph_msg_put(msg); | 2782 | ceph_msg_put(con->in_msg); |
2564 | return NULL; | 2783 | con->in_msg = NULL; |
2565 | } | 2784 | } |
2566 | } | 2785 | } |
2567 | 2786 | ||
2568 | return msg; | 2787 | return ret; |
2569 | } | 2788 | } |
2570 | 2789 | ||
2571 | 2790 | ||
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index d0649a9655b..105d533b55f 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c | |||
@@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) | |||
106 | monc->pending_auth = 1; | 106 | monc->pending_auth = 1; |
107 | monc->m_auth->front.iov_len = len; | 107 | monc->m_auth->front.iov_len = len; |
108 | monc->m_auth->hdr.front_len = cpu_to_le32(len); | 108 | monc->m_auth->hdr.front_len = cpu_to_le32(len); |
109 | ceph_con_revoke(monc->con, monc->m_auth); | 109 | ceph_msg_revoke(monc->m_auth); |
110 | ceph_msg_get(monc->m_auth); /* keep our ref */ | 110 | ceph_msg_get(monc->m_auth); /* keep our ref */ |
111 | ceph_con_send(monc->con, monc->m_auth); | 111 | ceph_con_send(&monc->con, monc->m_auth); |
112 | } | 112 | } |
113 | 113 | ||
114 | /* | 114 | /* |
@@ -117,8 +117,11 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) | |||
117 | static void __close_session(struct ceph_mon_client *monc) | 117 | static void __close_session(struct ceph_mon_client *monc) |
118 | { | 118 | { |
119 | dout("__close_session closing mon%d\n", monc->cur_mon); | 119 | dout("__close_session closing mon%d\n", monc->cur_mon); |
120 | ceph_con_revoke(monc->con, monc->m_auth); | 120 | ceph_msg_revoke(monc->m_auth); |
121 | ceph_con_close(monc->con); | 121 | ceph_msg_revoke_incoming(monc->m_auth_reply); |
122 | ceph_msg_revoke(monc->m_subscribe); | ||
123 | ceph_msg_revoke_incoming(monc->m_subscribe_ack); | ||
124 | ceph_con_close(&monc->con); | ||
122 | monc->cur_mon = -1; | 125 | monc->cur_mon = -1; |
123 | monc->pending_auth = 0; | 126 | monc->pending_auth = 0; |
124 | ceph_auth_reset(monc->auth); | 127 | ceph_auth_reset(monc->auth); |
@@ -142,9 +145,8 @@ static int __open_session(struct ceph_mon_client *monc) | |||
142 | monc->want_next_osdmap = !!monc->want_next_osdmap; | 145 | monc->want_next_osdmap = !!monc->want_next_osdmap; |
143 | 146 | ||
144 | dout("open_session mon%d opening\n", monc->cur_mon); | 147 | dout("open_session mon%d opening\n", monc->cur_mon); |
145 | monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON; | 148 | ceph_con_open(&monc->con, |
146 | monc->con->peer_name.num = cpu_to_le64(monc->cur_mon); | 149 | CEPH_ENTITY_TYPE_MON, monc->cur_mon, |
147 | ceph_con_open(monc->con, | ||
148 | &monc->monmap->mon_inst[monc->cur_mon].addr); | 150 | &monc->monmap->mon_inst[monc->cur_mon].addr); |
149 | 151 | ||
150 | /* initiatiate authentication handshake */ | 152 | /* initiatiate authentication handshake */ |
@@ -226,8 +228,8 @@ static void __send_subscribe(struct ceph_mon_client *monc) | |||
226 | 228 | ||
227 | msg->front.iov_len = p - msg->front.iov_base; | 229 | msg->front.iov_len = p - msg->front.iov_base; |
228 | msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); | 230 | msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); |
229 | ceph_con_revoke(monc->con, msg); | 231 | ceph_msg_revoke(msg); |
230 | ceph_con_send(monc->con, ceph_msg_get(msg)); | 232 | ceph_con_send(&monc->con, ceph_msg_get(msg)); |
231 | 233 | ||
232 | monc->sub_sent = jiffies | 1; /* never 0 */ | 234 | monc->sub_sent = jiffies | 1; /* never 0 */ |
233 | } | 235 | } |
@@ -247,7 +249,7 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc, | |||
247 | if (monc->hunting) { | 249 | if (monc->hunting) { |
248 | pr_info("mon%d %s session established\n", | 250 | pr_info("mon%d %s session established\n", |
249 | monc->cur_mon, | 251 | monc->cur_mon, |
250 | ceph_pr_addr(&monc->con->peer_addr.in_addr)); | 252 | ceph_pr_addr(&monc->con.peer_addr.in_addr)); |
251 | monc->hunting = false; | 253 | monc->hunting = false; |
252 | } | 254 | } |
253 | dout("handle_subscribe_ack after %d seconds\n", seconds); | 255 | dout("handle_subscribe_ack after %d seconds\n", seconds); |
@@ -439,6 +441,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, | |||
439 | m = NULL; | 441 | m = NULL; |
440 | } else { | 442 | } else { |
441 | dout("get_generic_reply %lld got %p\n", tid, req->reply); | 443 | dout("get_generic_reply %lld got %p\n", tid, req->reply); |
444 | *skip = 0; | ||
442 | m = ceph_msg_get(req->reply); | 445 | m = ceph_msg_get(req->reply); |
443 | /* | 446 | /* |
444 | * we don't need to track the connection reading into | 447 | * we don't need to track the connection reading into |
@@ -461,7 +464,7 @@ static int do_generic_request(struct ceph_mon_client *monc, | |||
461 | req->request->hdr.tid = cpu_to_le64(req->tid); | 464 | req->request->hdr.tid = cpu_to_le64(req->tid); |
462 | __insert_generic_request(monc, req); | 465 | __insert_generic_request(monc, req); |
463 | monc->num_generic_requests++; | 466 | monc->num_generic_requests++; |
464 | ceph_con_send(monc->con, ceph_msg_get(req->request)); | 467 | ceph_con_send(&monc->con, ceph_msg_get(req->request)); |
465 | mutex_unlock(&monc->mutex); | 468 | mutex_unlock(&monc->mutex); |
466 | 469 | ||
467 | err = wait_for_completion_interruptible(&req->completion); | 470 | err = wait_for_completion_interruptible(&req->completion); |
@@ -684,8 +687,9 @@ static void __resend_generic_request(struct ceph_mon_client *monc) | |||
684 | 687 | ||
685 | for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { | 688 | for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { |
686 | req = rb_entry(p, struct ceph_mon_generic_request, node); | 689 | req = rb_entry(p, struct ceph_mon_generic_request, node); |
687 | ceph_con_revoke(monc->con, req->request); | 690 | ceph_msg_revoke(req->request); |
688 | ceph_con_send(monc->con, ceph_msg_get(req->request)); | 691 | ceph_msg_revoke_incoming(req->reply); |
692 | ceph_con_send(&monc->con, ceph_msg_get(req->request)); | ||
689 | } | 693 | } |
690 | } | 694 | } |
691 | 695 | ||
@@ -705,7 +709,7 @@ static void delayed_work(struct work_struct *work) | |||
705 | __close_session(monc); | 709 | __close_session(monc); |
706 | __open_session(monc); /* continue hunting */ | 710 | __open_session(monc); /* continue hunting */ |
707 | } else { | 711 | } else { |
708 | ceph_con_keepalive(monc->con); | 712 | ceph_con_keepalive(&monc->con); |
709 | 713 | ||
710 | __validate_auth(monc); | 714 | __validate_auth(monc); |
711 | 715 | ||
@@ -760,19 +764,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) | |||
760 | goto out; | 764 | goto out; |
761 | 765 | ||
762 | /* connection */ | 766 | /* connection */ |
763 | monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); | ||
764 | if (!monc->con) | ||
765 | goto out_monmap; | ||
766 | ceph_con_init(monc->client->msgr, monc->con); | ||
767 | monc->con->private = monc; | ||
768 | monc->con->ops = &mon_con_ops; | ||
769 | |||
770 | /* authentication */ | 767 | /* authentication */ |
771 | monc->auth = ceph_auth_init(cl->options->name, | 768 | monc->auth = ceph_auth_init(cl->options->name, |
772 | cl->options->key); | 769 | cl->options->key); |
773 | if (IS_ERR(monc->auth)) { | 770 | if (IS_ERR(monc->auth)) { |
774 | err = PTR_ERR(monc->auth); | 771 | err = PTR_ERR(monc->auth); |
775 | goto out_con; | 772 | goto out_monmap; |
776 | } | 773 | } |
777 | monc->auth->want_keys = | 774 | monc->auth->want_keys = |
778 | CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | | 775 | CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | |
@@ -801,6 +798,9 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) | |||
801 | if (!monc->m_auth) | 798 | if (!monc->m_auth) |
802 | goto out_auth_reply; | 799 | goto out_auth_reply; |
803 | 800 | ||
801 | ceph_con_init(&monc->con, monc, &mon_con_ops, | ||
802 | &monc->client->msgr); | ||
803 | |||
804 | monc->cur_mon = -1; | 804 | monc->cur_mon = -1; |
805 | monc->hunting = true; | 805 | monc->hunting = true; |
806 | monc->sub_renew_after = jiffies; | 806 | monc->sub_renew_after = jiffies; |
@@ -824,8 +824,6 @@ out_subscribe_ack: | |||
824 | ceph_msg_put(monc->m_subscribe_ack); | 824 | ceph_msg_put(monc->m_subscribe_ack); |
825 | out_auth: | 825 | out_auth: |
826 | ceph_auth_destroy(monc->auth); | 826 | ceph_auth_destroy(monc->auth); |
827 | out_con: | ||
828 | monc->con->ops->put(monc->con); | ||
829 | out_monmap: | 827 | out_monmap: |
830 | kfree(monc->monmap); | 828 | kfree(monc->monmap); |
831 | out: | 829 | out: |
@@ -841,10 +839,6 @@ void ceph_monc_stop(struct ceph_mon_client *monc) | |||
841 | mutex_lock(&monc->mutex); | 839 | mutex_lock(&monc->mutex); |
842 | __close_session(monc); | 840 | __close_session(monc); |
843 | 841 | ||
844 | monc->con->private = NULL; | ||
845 | monc->con->ops->put(monc->con); | ||
846 | monc->con = NULL; | ||
847 | |||
848 | mutex_unlock(&monc->mutex); | 842 | mutex_unlock(&monc->mutex); |
849 | 843 | ||
850 | /* | 844 | /* |
@@ -888,8 +882,8 @@ static void handle_auth_reply(struct ceph_mon_client *monc, | |||
888 | } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { | 882 | } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { |
889 | dout("authenticated, starting session\n"); | 883 | dout("authenticated, starting session\n"); |
890 | 884 | ||
891 | monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; | 885 | monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; |
892 | monc->client->msgr->inst.name.num = | 886 | monc->client->msgr.inst.name.num = |
893 | cpu_to_le64(monc->auth->global_id); | 887 | cpu_to_le64(monc->auth->global_id); |
894 | 888 | ||
895 | __send_subscribe(monc); | 889 | __send_subscribe(monc); |
@@ -1000,6 +994,8 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, | |||
1000 | case CEPH_MSG_MDS_MAP: | 994 | case CEPH_MSG_MDS_MAP: |
1001 | case CEPH_MSG_OSD_MAP: | 995 | case CEPH_MSG_OSD_MAP: |
1002 | m = ceph_msg_new(type, front_len, GFP_NOFS, false); | 996 | m = ceph_msg_new(type, front_len, GFP_NOFS, false); |
997 | if (!m) | ||
998 | return NULL; /* ENOMEM--return skip == 0 */ | ||
1003 | break; | 999 | break; |
1004 | } | 1000 | } |
1005 | 1001 | ||
@@ -1029,7 +1025,7 @@ static void mon_fault(struct ceph_connection *con) | |||
1029 | if (!monc->hunting) | 1025 | if (!monc->hunting) |
1030 | pr_info("mon%d %s session lost, " | 1026 | pr_info("mon%d %s session lost, " |
1031 | "hunting for new mon\n", monc->cur_mon, | 1027 | "hunting for new mon\n", monc->cur_mon, |
1032 | ceph_pr_addr(&monc->con->peer_addr.in_addr)); | 1028 | ceph_pr_addr(&monc->con.peer_addr.in_addr)); |
1033 | 1029 | ||
1034 | __close_session(monc); | 1030 | __close_session(monc); |
1035 | if (!monc->hunting) { | 1031 | if (!monc->hunting) { |
@@ -1044,9 +1040,23 @@ out: | |||
1044 | mutex_unlock(&monc->mutex); | 1040 | mutex_unlock(&monc->mutex); |
1045 | } | 1041 | } |
1046 | 1042 | ||
1043 | /* | ||
1044 | * We can ignore refcounting on the connection struct, as all references | ||
1045 | * will come from the messenger workqueue, which is drained prior to | ||
1046 | * mon_client destruction. | ||
1047 | */ | ||
1048 | static struct ceph_connection *con_get(struct ceph_connection *con) | ||
1049 | { | ||
1050 | return con; | ||
1051 | } | ||
1052 | |||
1053 | static void con_put(struct ceph_connection *con) | ||
1054 | { | ||
1055 | } | ||
1056 | |||
1047 | static const struct ceph_connection_operations mon_con_ops = { | 1057 | static const struct ceph_connection_operations mon_con_ops = { |
1048 | .get = ceph_con_get, | 1058 | .get = con_get, |
1049 | .put = ceph_con_put, | 1059 | .put = con_put, |
1050 | .dispatch = dispatch, | 1060 | .dispatch = dispatch, |
1051 | .fault = mon_fault, | 1061 | .fault = mon_fault, |
1052 | .alloc_msg = mon_alloc_msg, | 1062 | .alloc_msg = mon_alloc_msg, |
diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c index 11d5f4196a7..ddec1c10ac8 100644 --- a/net/ceph/msgpool.c +++ b/net/ceph/msgpool.c | |||
@@ -12,7 +12,7 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg) | |||
12 | struct ceph_msgpool *pool = arg; | 12 | struct ceph_msgpool *pool = arg; |
13 | struct ceph_msg *msg; | 13 | struct ceph_msg *msg; |
14 | 14 | ||
15 | msg = ceph_msg_new(0, pool->front_len, gfp_mask, true); | 15 | msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true); |
16 | if (!msg) { | 16 | if (!msg) { |
17 | dout("msgpool_alloc %s failed\n", pool->name); | 17 | dout("msgpool_alloc %s failed\n", pool->name); |
18 | } else { | 18 | } else { |
@@ -32,10 +32,11 @@ static void msgpool_free(void *element, void *arg) | |||
32 | ceph_msg_put(msg); | 32 | ceph_msg_put(msg); |
33 | } | 33 | } |
34 | 34 | ||
35 | int ceph_msgpool_init(struct ceph_msgpool *pool, | 35 | int ceph_msgpool_init(struct ceph_msgpool *pool, int type, |
36 | int front_len, int size, bool blocking, const char *name) | 36 | int front_len, int size, bool blocking, const char *name) |
37 | { | 37 | { |
38 | dout("msgpool %s init\n", name); | 38 | dout("msgpool %s init\n", name); |
39 | pool->type = type; | ||
39 | pool->front_len = front_len; | 40 | pool->front_len = front_len; |
40 | pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); | 41 | pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); |
41 | if (!pool->pool) | 42 | if (!pool->pool) |
@@ -61,7 +62,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, | |||
61 | WARN_ON(1); | 62 | WARN_ON(1); |
62 | 63 | ||
63 | /* try to alloc a fresh message */ | 64 | /* try to alloc a fresh message */ |
64 | return ceph_msg_new(0, front_len, GFP_NOFS, false); | 65 | return ceph_msg_new(pool->type, front_len, GFP_NOFS, false); |
65 | } | 66 | } |
66 | 67 | ||
67 | msg = mempool_alloc(pool->pool, GFP_NOFS); | 68 | msg = mempool_alloc(pool->pool, GFP_NOFS); |
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ca59e66c978..42119c05e82 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c | |||
@@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref) | |||
140 | if (req->r_request) | 140 | if (req->r_request) |
141 | ceph_msg_put(req->r_request); | 141 | ceph_msg_put(req->r_request); |
142 | if (req->r_con_filling_msg) { | 142 | if (req->r_con_filling_msg) { |
143 | dout("release_request revoking pages %p from con %p\n", | 143 | dout("%s revoking pages %p from con %p\n", __func__, |
144 | req->r_pages, req->r_con_filling_msg); | 144 | req->r_pages, req->r_con_filling_msg); |
145 | ceph_con_revoke_message(req->r_con_filling_msg, | 145 | ceph_msg_revoke_incoming(req->r_reply); |
146 | req->r_reply); | ||
147 | req->r_con_filling_msg->ops->put(req->r_con_filling_msg); | 146 | req->r_con_filling_msg->ops->put(req->r_con_filling_msg); |
148 | } | 147 | } |
149 | if (req->r_reply) | 148 | if (req->r_reply) |
@@ -214,10 +213,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, | |||
214 | kref_init(&req->r_kref); | 213 | kref_init(&req->r_kref); |
215 | init_completion(&req->r_completion); | 214 | init_completion(&req->r_completion); |
216 | init_completion(&req->r_safe_completion); | 215 | init_completion(&req->r_safe_completion); |
216 | rb_init_node(&req->r_node); | ||
217 | INIT_LIST_HEAD(&req->r_unsafe_item); | 217 | INIT_LIST_HEAD(&req->r_unsafe_item); |
218 | INIT_LIST_HEAD(&req->r_linger_item); | 218 | INIT_LIST_HEAD(&req->r_linger_item); |
219 | INIT_LIST_HEAD(&req->r_linger_osd); | 219 | INIT_LIST_HEAD(&req->r_linger_osd); |
220 | INIT_LIST_HEAD(&req->r_req_lru_item); | 220 | INIT_LIST_HEAD(&req->r_req_lru_item); |
221 | INIT_LIST_HEAD(&req->r_osd_item); | ||
222 | |||
221 | req->r_flags = flags; | 223 | req->r_flags = flags; |
222 | 224 | ||
223 | WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); | 225 | WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); |
@@ -243,6 +245,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, | |||
243 | } | 245 | } |
244 | ceph_pagelist_init(req->r_trail); | 246 | ceph_pagelist_init(req->r_trail); |
245 | } | 247 | } |
248 | |||
246 | /* create request message; allow space for oid */ | 249 | /* create request message; allow space for oid */ |
247 | msg_size += MAX_OBJ_NAME_SIZE; | 250 | msg_size += MAX_OBJ_NAME_SIZE; |
248 | if (snapc) | 251 | if (snapc) |
@@ -256,7 +259,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, | |||
256 | return NULL; | 259 | return NULL; |
257 | } | 260 | } |
258 | 261 | ||
259 | msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); | ||
260 | memset(msg->front.iov_base, 0, msg->front.iov_len); | 262 | memset(msg->front.iov_base, 0, msg->front.iov_len); |
261 | 263 | ||
262 | req->r_request = msg; | 264 | req->r_request = msg; |
@@ -624,7 +626,7 @@ static void osd_reset(struct ceph_connection *con) | |||
624 | /* | 626 | /* |
625 | * Track open sessions with osds. | 627 | * Track open sessions with osds. |
626 | */ | 628 | */ |
627 | static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) | 629 | static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) |
628 | { | 630 | { |
629 | struct ceph_osd *osd; | 631 | struct ceph_osd *osd; |
630 | 632 | ||
@@ -634,15 +636,13 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) | |||
634 | 636 | ||
635 | atomic_set(&osd->o_ref, 1); | 637 | atomic_set(&osd->o_ref, 1); |
636 | osd->o_osdc = osdc; | 638 | osd->o_osdc = osdc; |
639 | osd->o_osd = onum; | ||
637 | INIT_LIST_HEAD(&osd->o_requests); | 640 | INIT_LIST_HEAD(&osd->o_requests); |
638 | INIT_LIST_HEAD(&osd->o_linger_requests); | 641 | INIT_LIST_HEAD(&osd->o_linger_requests); |
639 | INIT_LIST_HEAD(&osd->o_osd_lru); | 642 | INIT_LIST_HEAD(&osd->o_osd_lru); |
640 | osd->o_incarnation = 1; | 643 | osd->o_incarnation = 1; |
641 | 644 | ||
642 | ceph_con_init(osdc->client->msgr, &osd->o_con); | 645 | ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); |
643 | osd->o_con.private = osd; | ||
644 | osd->o_con.ops = &osd_con_ops; | ||
645 | osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; | ||
646 | 646 | ||
647 | INIT_LIST_HEAD(&osd->o_keepalive_item); | 647 | INIT_LIST_HEAD(&osd->o_keepalive_item); |
648 | return osd; | 648 | return osd; |
@@ -688,7 +688,7 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) | |||
688 | 688 | ||
689 | static void remove_all_osds(struct ceph_osd_client *osdc) | 689 | static void remove_all_osds(struct ceph_osd_client *osdc) |
690 | { | 690 | { |
691 | dout("__remove_old_osds %p\n", osdc); | 691 | dout("%s %p\n", __func__, osdc); |
692 | mutex_lock(&osdc->request_mutex); | 692 | mutex_lock(&osdc->request_mutex); |
693 | while (!RB_EMPTY_ROOT(&osdc->osds)) { | 693 | while (!RB_EMPTY_ROOT(&osdc->osds)) { |
694 | struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), | 694 | struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), |
@@ -752,7 +752,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) | |||
752 | ret = -EAGAIN; | 752 | ret = -EAGAIN; |
753 | } else { | 753 | } else { |
754 | ceph_con_close(&osd->o_con); | 754 | ceph_con_close(&osd->o_con); |
755 | ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]); | 755 | ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, |
756 | &osdc->osdmap->osd_addr[osd->o_osd]); | ||
756 | osd->o_incarnation++; | 757 | osd->o_incarnation++; |
757 | } | 758 | } |
758 | return ret; | 759 | return ret; |
@@ -853,7 +854,7 @@ static void __unregister_request(struct ceph_osd_client *osdc, | |||
853 | 854 | ||
854 | if (req->r_osd) { | 855 | if (req->r_osd) { |
855 | /* make sure the original request isn't in flight. */ | 856 | /* make sure the original request isn't in flight. */ |
856 | ceph_con_revoke(&req->r_osd->o_con, req->r_request); | 857 | ceph_msg_revoke(req->r_request); |
857 | 858 | ||
858 | list_del_init(&req->r_osd_item); | 859 | list_del_init(&req->r_osd_item); |
859 | if (list_empty(&req->r_osd->o_requests) && | 860 | if (list_empty(&req->r_osd->o_requests) && |
@@ -880,7 +881,7 @@ static void __unregister_request(struct ceph_osd_client *osdc, | |||
880 | static void __cancel_request(struct ceph_osd_request *req) | 881 | static void __cancel_request(struct ceph_osd_request *req) |
881 | { | 882 | { |
882 | if (req->r_sent && req->r_osd) { | 883 | if (req->r_sent && req->r_osd) { |
883 | ceph_con_revoke(&req->r_osd->o_con, req->r_request); | 884 | ceph_msg_revoke(req->r_request); |
884 | req->r_sent = 0; | 885 | req->r_sent = 0; |
885 | } | 886 | } |
886 | } | 887 | } |
@@ -890,7 +891,9 @@ static void __register_linger_request(struct ceph_osd_client *osdc, | |||
890 | { | 891 | { |
891 | dout("__register_linger_request %p\n", req); | 892 | dout("__register_linger_request %p\n", req); |
892 | list_add_tail(&req->r_linger_item, &osdc->req_linger); | 893 | list_add_tail(&req->r_linger_item, &osdc->req_linger); |
893 | list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests); | 894 | if (req->r_osd) |
895 | list_add_tail(&req->r_linger_osd, | ||
896 | &req->r_osd->o_linger_requests); | ||
894 | } | 897 | } |
895 | 898 | ||
896 | static void __unregister_linger_request(struct ceph_osd_client *osdc, | 899 | static void __unregister_linger_request(struct ceph_osd_client *osdc, |
@@ -998,18 +1001,18 @@ static int __map_request(struct ceph_osd_client *osdc, | |||
998 | req->r_osd = __lookup_osd(osdc, o); | 1001 | req->r_osd = __lookup_osd(osdc, o); |
999 | if (!req->r_osd && o >= 0) { | 1002 | if (!req->r_osd && o >= 0) { |
1000 | err = -ENOMEM; | 1003 | err = -ENOMEM; |
1001 | req->r_osd = create_osd(osdc); | 1004 | req->r_osd = create_osd(osdc, o); |
1002 | if (!req->r_osd) { | 1005 | if (!req->r_osd) { |
1003 | list_move(&req->r_req_lru_item, &osdc->req_notarget); | 1006 | list_move(&req->r_req_lru_item, &osdc->req_notarget); |
1004 | goto out; | 1007 | goto out; |
1005 | } | 1008 | } |
1006 | 1009 | ||
1007 | dout("map_request osd %p is osd%d\n", req->r_osd, o); | 1010 | dout("map_request osd %p is osd%d\n", req->r_osd, o); |
1008 | req->r_osd->o_osd = o; | ||
1009 | req->r_osd->o_con.peer_name.num = cpu_to_le64(o); | ||
1010 | __insert_osd(osdc, req->r_osd); | 1011 | __insert_osd(osdc, req->r_osd); |
1011 | 1012 | ||
1012 | ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]); | 1013 | ceph_con_open(&req->r_osd->o_con, |
1014 | CEPH_ENTITY_TYPE_OSD, o, | ||
1015 | &osdc->osdmap->osd_addr[o]); | ||
1013 | } | 1016 | } |
1014 | 1017 | ||
1015 | if (req->r_osd) { | 1018 | if (req->r_osd) { |
@@ -1304,8 +1307,9 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) | |||
1304 | 1307 | ||
1305 | dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); | 1308 | dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); |
1306 | mutex_lock(&osdc->request_mutex); | 1309 | mutex_lock(&osdc->request_mutex); |
1307 | for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { | 1310 | for (p = rb_first(&osdc->requests); p; ) { |
1308 | req = rb_entry(p, struct ceph_osd_request, r_node); | 1311 | req = rb_entry(p, struct ceph_osd_request, r_node); |
1312 | p = rb_next(p); | ||
1309 | err = __map_request(osdc, req, force_resend); | 1313 | err = __map_request(osdc, req, force_resend); |
1310 | if (err < 0) | 1314 | if (err < 0) |
1311 | continue; /* error */ | 1315 | continue; /* error */ |
@@ -1313,10 +1317,23 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) | |||
1313 | dout("%p tid %llu maps to no osd\n", req, req->r_tid); | 1317 | dout("%p tid %llu maps to no osd\n", req, req->r_tid); |
1314 | needmap++; /* request a newer map */ | 1318 | needmap++; /* request a newer map */ |
1315 | } else if (err > 0) { | 1319 | } else if (err > 0) { |
1316 | dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, | 1320 | if (!req->r_linger) { |
1317 | req->r_osd ? req->r_osd->o_osd : -1); | 1321 | dout("%p tid %llu requeued on osd%d\n", req, |
1318 | if (!req->r_linger) | 1322 | req->r_tid, |
1323 | req->r_osd ? req->r_osd->o_osd : -1); | ||
1319 | req->r_flags |= CEPH_OSD_FLAG_RETRY; | 1324 | req->r_flags |= CEPH_OSD_FLAG_RETRY; |
1325 | } | ||
1326 | } | ||
1327 | if (req->r_linger && list_empty(&req->r_linger_item)) { | ||
1328 | /* | ||
1329 | * register as a linger so that we will | ||
1330 | * re-submit below and get a new tid | ||
1331 | */ | ||
1332 | dout("%p tid %llu restart on osd%d\n", | ||
1333 | req, req->r_tid, | ||
1334 | req->r_osd ? req->r_osd->o_osd : -1); | ||
1335 | __register_linger_request(osdc, req); | ||
1336 | __unregister_request(osdc, req); | ||
1320 | } | 1337 | } |
1321 | } | 1338 | } |
1322 | 1339 | ||
@@ -1391,7 +1408,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) | |||
1391 | epoch, maplen); | 1408 | epoch, maplen); |
1392 | newmap = osdmap_apply_incremental(&p, next, | 1409 | newmap = osdmap_apply_incremental(&p, next, |
1393 | osdc->osdmap, | 1410 | osdc->osdmap, |
1394 | osdc->client->msgr); | 1411 | &osdc->client->msgr); |
1395 | if (IS_ERR(newmap)) { | 1412 | if (IS_ERR(newmap)) { |
1396 | err = PTR_ERR(newmap); | 1413 | err = PTR_ERR(newmap); |
1397 | goto bad; | 1414 | goto bad; |
@@ -1839,11 +1856,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) | |||
1839 | if (!osdc->req_mempool) | 1856 | if (!osdc->req_mempool) |
1840 | goto out; | 1857 | goto out; |
1841 | 1858 | ||
1842 | err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true, | 1859 | err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, |
1860 | OSD_OP_FRONT_LEN, 10, true, | ||
1843 | "osd_op"); | 1861 | "osd_op"); |
1844 | if (err < 0) | 1862 | if (err < 0) |
1845 | goto out_mempool; | 1863 | goto out_mempool; |
1846 | err = ceph_msgpool_init(&osdc->msgpool_op_reply, | 1864 | err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, |
1847 | OSD_OPREPLY_FRONT_LEN, 10, true, | 1865 | OSD_OPREPLY_FRONT_LEN, 10, true, |
1848 | "osd_op_reply"); | 1866 | "osd_op_reply"); |
1849 | if (err < 0) | 1867 | if (err < 0) |
@@ -2019,15 +2037,15 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, | |||
2019 | if (!req) { | 2037 | if (!req) { |
2020 | *skip = 1; | 2038 | *skip = 1; |
2021 | m = NULL; | 2039 | m = NULL; |
2022 | pr_info("get_reply unknown tid %llu from osd%d\n", tid, | 2040 | dout("get_reply unknown tid %llu from osd%d\n", tid, |
2023 | osd->o_osd); | 2041 | osd->o_osd); |
2024 | goto out; | 2042 | goto out; |
2025 | } | 2043 | } |
2026 | 2044 | ||
2027 | if (req->r_con_filling_msg) { | 2045 | if (req->r_con_filling_msg) { |
2028 | dout("get_reply revoking msg %p from old con %p\n", | 2046 | dout("%s revoking msg %p from old con %p\n", __func__, |
2029 | req->r_reply, req->r_con_filling_msg); | 2047 | req->r_reply, req->r_con_filling_msg); |
2030 | ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); | 2048 | ceph_msg_revoke_incoming(req->r_reply); |
2031 | req->r_con_filling_msg->ops->put(req->r_con_filling_msg); | 2049 | req->r_con_filling_msg->ops->put(req->r_con_filling_msg); |
2032 | req->r_con_filling_msg = NULL; | 2050 | req->r_con_filling_msg = NULL; |
2033 | } | 2051 | } |
@@ -2080,6 +2098,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, | |||
2080 | int type = le16_to_cpu(hdr->type); | 2098 | int type = le16_to_cpu(hdr->type); |
2081 | int front = le32_to_cpu(hdr->front_len); | 2099 | int front = le32_to_cpu(hdr->front_len); |
2082 | 2100 | ||
2101 | *skip = 0; | ||
2083 | switch (type) { | 2102 | switch (type) { |
2084 | case CEPH_MSG_OSD_MAP: | 2103 | case CEPH_MSG_OSD_MAP: |
2085 | case CEPH_MSG_WATCH_NOTIFY: | 2104 | case CEPH_MSG_WATCH_NOTIFY: |
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 81e3b84a77e..3124b71a888 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c | |||
@@ -135,6 +135,21 @@ bad: | |||
135 | return -EINVAL; | 135 | return -EINVAL; |
136 | } | 136 | } |
137 | 137 | ||
138 | static int skip_name_map(void **p, void *end) | ||
139 | { | ||
140 | int len; | ||
141 | ceph_decode_32_safe(p, end, len ,bad); | ||
142 | while (len--) { | ||
143 | int strlen; | ||
144 | *p += sizeof(u32); | ||
145 | ceph_decode_32_safe(p, end, strlen, bad); | ||
146 | *p += strlen; | ||
147 | } | ||
148 | return 0; | ||
149 | bad: | ||
150 | return -EINVAL; | ||
151 | } | ||
152 | |||
138 | static struct crush_map *crush_decode(void *pbyval, void *end) | 153 | static struct crush_map *crush_decode(void *pbyval, void *end) |
139 | { | 154 | { |
140 | struct crush_map *c; | 155 | struct crush_map *c; |
@@ -143,6 +158,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end) | |||
143 | void **p = &pbyval; | 158 | void **p = &pbyval; |
144 | void *start = pbyval; | 159 | void *start = pbyval; |
145 | u32 magic; | 160 | u32 magic; |
161 | u32 num_name_maps; | ||
146 | 162 | ||
147 | dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); | 163 | dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); |
148 | 164 | ||
@@ -150,6 +166,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end) | |||
150 | if (c == NULL) | 166 | if (c == NULL) |
151 | return ERR_PTR(-ENOMEM); | 167 | return ERR_PTR(-ENOMEM); |
152 | 168 | ||
169 | /* set tunables to default values */ | ||
170 | c->choose_local_tries = 2; | ||
171 | c->choose_local_fallback_tries = 5; | ||
172 | c->choose_total_tries = 19; | ||
173 | |||
153 | ceph_decode_need(p, end, 4*sizeof(u32), bad); | 174 | ceph_decode_need(p, end, 4*sizeof(u32), bad); |
154 | magic = ceph_decode_32(p); | 175 | magic = ceph_decode_32(p); |
155 | if (magic != CRUSH_MAGIC) { | 176 | if (magic != CRUSH_MAGIC) { |
@@ -297,7 +318,25 @@ static struct crush_map *crush_decode(void *pbyval, void *end) | |||
297 | } | 318 | } |
298 | 319 | ||
299 | /* ignore trailing name maps. */ | 320 | /* ignore trailing name maps. */ |
321 | for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) { | ||
322 | err = skip_name_map(p, end); | ||
323 | if (err < 0) | ||
324 | goto done; | ||
325 | } | ||
326 | |||
327 | /* tunables */ | ||
328 | ceph_decode_need(p, end, 3*sizeof(u32), done); | ||
329 | c->choose_local_tries = ceph_decode_32(p); | ||
330 | c->choose_local_fallback_tries = ceph_decode_32(p); | ||
331 | c->choose_total_tries = ceph_decode_32(p); | ||
332 | dout("crush decode tunable choose_local_tries = %d", | ||
333 | c->choose_local_tries); | ||
334 | dout("crush decode tunable choose_local_fallback_tries = %d", | ||
335 | c->choose_local_fallback_tries); | ||
336 | dout("crush decode tunable choose_total_tries = %d", | ||
337 | c->choose_total_tries); | ||
300 | 338 | ||
339 | done: | ||
301 | dout("crush_decode success\n"); | 340 | dout("crush_decode success\n"); |
302 | return c; | 341 | return c; |
303 | 342 | ||
@@ -488,15 +527,16 @@ static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) | |||
488 | ceph_decode_32_safe(p, end, pool, bad); | 527 | ceph_decode_32_safe(p, end, pool, bad); |
489 | ceph_decode_32_safe(p, end, len, bad); | 528 | ceph_decode_32_safe(p, end, len, bad); |
490 | dout(" pool %d len %d\n", pool, len); | 529 | dout(" pool %d len %d\n", pool, len); |
530 | ceph_decode_need(p, end, len, bad); | ||
491 | pi = __lookup_pg_pool(&map->pg_pools, pool); | 531 | pi = __lookup_pg_pool(&map->pg_pools, pool); |
492 | if (pi) { | 532 | if (pi) { |
533 | char *name = kstrndup(*p, len, GFP_NOFS); | ||
534 | |||
535 | if (!name) | ||
536 | return -ENOMEM; | ||
493 | kfree(pi->name); | 537 | kfree(pi->name); |
494 | pi->name = kmalloc(len + 1, GFP_NOFS); | 538 | pi->name = name; |
495 | if (pi->name) { | 539 | dout(" name is %s\n", pi->name); |
496 | memcpy(pi->name, *p, len); | ||
497 | pi->name[len] = '\0'; | ||
498 | dout(" name is %s\n", pi->name); | ||
499 | } | ||
500 | } | 540 | } |
501 | *p += len; | 541 | *p += len; |
502 | } | 542 | } |
@@ -666,6 +706,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) | |||
666 | ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); | 706 | ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); |
667 | ceph_decode_copy(p, &pgid, sizeof(pgid)); | 707 | ceph_decode_copy(p, &pgid, sizeof(pgid)); |
668 | n = ceph_decode_32(p); | 708 | n = ceph_decode_32(p); |
709 | err = -EINVAL; | ||
710 | if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) | ||
711 | goto bad; | ||
669 | ceph_decode_need(p, end, n * sizeof(u32), bad); | 712 | ceph_decode_need(p, end, n * sizeof(u32), bad); |
670 | err = -ENOMEM; | 713 | err = -ENOMEM; |
671 | pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS); | 714 | pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS); |
@@ -889,6 +932,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, | |||
889 | (void) __remove_pg_mapping(&map->pg_temp, pgid); | 932 | (void) __remove_pg_mapping(&map->pg_temp, pgid); |
890 | 933 | ||
891 | /* insert */ | 934 | /* insert */ |
935 | if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) { | ||
936 | err = -EINVAL; | ||
937 | goto bad; | ||
938 | } | ||
892 | pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); | 939 | pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); |
893 | if (!pg) { | 940 | if (!pg) { |
894 | err = -ENOMEM; | 941 | err = -ENOMEM; |