diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2012-07-31 17:35:28 -0400 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2012-07-31 17:35:28 -0400 |
commit | cc8362b1f6d724e46f515121d442779924b19fec (patch) | |
tree | 86fb5c3767e538ec9ded57dd7b3ce5d69dcde691 /net/ceph/messenger.c | |
parent | 2e3ee613480563a6d5c01b57d342e65cc58c06df (diff) | |
parent | 1fe5e9932156f6122c3b1ff6ba7541c27c86718c (diff) |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil:
"Lots of stuff this time around:
- lots of cleanup and refactoring in the libceph messenger code, and
many hard to hit races and bugs closed as a result.
- lots of cleanup and refactoring in the rbd code from Alex Elder,
mostly in preparation for the layering functionality that will be
coming in 3.7.
- some misc rbd cleanups from Josh Durgin that are finally going
upstream
- support for CRUSH tunables (used by newer clusters to improve the
data placement)
- some cleanup in our use of d_parent that Al brought up a while back
- a random collection of fixes across the tree
There is another patch coming that fixes up our ->atomic_open()
behavior, but I'm going to hammer on it a bit more before sending it."
Fix up conflicts due to commits that were already committed earlier in
drivers/block/rbd.c, net/ceph/{messenger.c, osd_client.c}
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (132 commits)
rbd: create rbd_refresh_helper()
rbd: return obj version in __rbd_refresh_header()
rbd: fixes in rbd_header_from_disk()
rbd: always pass ops array to rbd_req_sync_op()
rbd: pass null version pointer in add_snap()
rbd: make rbd_create_rw_ops() return a pointer
rbd: have __rbd_add_snap_dev() return a pointer
libceph: recheck con state after allocating incoming message
libceph: change ceph_con_in_msg_alloc convention to be less weird
libceph: avoid dropping con mutex before fault
libceph: verify state after retaking con lock after dispatch
libceph: revoke mon_client messages on session restart
libceph: fix handling of immediate socket connect failure
ceph: update MAINTAINERS file
libceph: be less chatty about stray replies
libceph: clear all flags on con_close
libceph: clean up con flags
libceph: replace connection state bits with states
libceph: drop unnecessary CLOSED check in socket state change callback
libceph: close socket directly from ceph_con_close()
...
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r-- | net/ceph/messenger.c | 925 |
1 files changed, 572 insertions, 353 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 10255e81be79..b9796750034a 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
@@ -29,6 +29,74 @@ | |||
29 | * the sender. | 29 | * the sender. |
30 | */ | 30 | */ |
31 | 31 | ||
32 | /* | ||
33 | * We track the state of the socket on a given connection using | ||
34 | * values defined below. The transition to a new socket state is | ||
35 | * handled by a function which verifies we aren't coming from an | ||
36 | * unexpected state. | ||
37 | * | ||
38 | * -------- | ||
39 | * | NEW* | transient initial state | ||
40 | * -------- | ||
41 | * | con_sock_state_init() | ||
42 | * v | ||
43 | * ---------- | ||
44 | * | CLOSED | initialized, but no socket (and no | ||
45 | * ---------- TCP connection) | ||
46 | * ^ \ | ||
47 | * | \ con_sock_state_connecting() | ||
48 | * | ---------------------- | ||
49 | * | \ | ||
50 | * + con_sock_state_closed() \ | ||
51 | * |+--------------------------- \ | ||
52 | * | \ \ \ | ||
53 | * | ----------- \ \ | ||
54 | * | | CLOSING | socket event; \ \ | ||
55 | * | ----------- await close \ \ | ||
56 | * | ^ \ | | ||
57 | * | | \ | | ||
58 | * | + con_sock_state_closing() \ | | ||
59 | * | / \ | | | ||
60 | * | / --------------- | | | ||
61 | * | / \ v v | ||
62 | * | / -------------- | ||
63 | * | / -----------------| CONNECTING | socket created, TCP | ||
64 | * | | / -------------- connect initiated | ||
65 | * | | | con_sock_state_connected() | ||
66 | * | | v | ||
67 | * ------------- | ||
68 | * | CONNECTED | TCP connection established | ||
69 | * ------------- | ||
70 | * | ||
71 | * State values for ceph_connection->sock_state; NEW is assumed to be 0. | ||
72 | */ | ||
73 | |||
74 | #define CON_SOCK_STATE_NEW 0 /* -> CLOSED */ | ||
75 | #define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */ | ||
76 | #define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */ | ||
77 | #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ | ||
78 | #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ | ||
79 | |||
80 | /* | ||
81 | * connection states | ||
82 | */ | ||
83 | #define CON_STATE_CLOSED 1 /* -> PREOPEN */ | ||
84 | #define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */ | ||
85 | #define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */ | ||
86 | #define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */ | ||
87 | #define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ | ||
88 | #define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ | ||
89 | |||
90 | /* | ||
91 | * ceph_connection flag bits | ||
92 | */ | ||
93 | #define CON_FLAG_LOSSYTX 0 /* we can close channel or drop | ||
94 | * messages on errors */ | ||
95 | #define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */ | ||
96 | #define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */ | ||
97 | #define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ | ||
98 | #define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ | ||
99 | |||
32 | /* static tag bytes (protocol control messages) */ | 100 | /* static tag bytes (protocol control messages) */ |
33 | static char tag_msg = CEPH_MSGR_TAG_MSG; | 101 | static char tag_msg = CEPH_MSGR_TAG_MSG; |
34 | static char tag_ack = CEPH_MSGR_TAG_ACK; | 102 | static char tag_ack = CEPH_MSGR_TAG_ACK; |
@@ -147,72 +215,130 @@ void ceph_msgr_flush(void) | |||
147 | } | 215 | } |
148 | EXPORT_SYMBOL(ceph_msgr_flush); | 216 | EXPORT_SYMBOL(ceph_msgr_flush); |
149 | 217 | ||
218 | /* Connection socket state transition functions */ | ||
219 | |||
220 | static void con_sock_state_init(struct ceph_connection *con) | ||
221 | { | ||
222 | int old_state; | ||
223 | |||
224 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); | ||
225 | if (WARN_ON(old_state != CON_SOCK_STATE_NEW)) | ||
226 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
227 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
228 | CON_SOCK_STATE_CLOSED); | ||
229 | } | ||
230 | |||
231 | static void con_sock_state_connecting(struct ceph_connection *con) | ||
232 | { | ||
233 | int old_state; | ||
234 | |||
235 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); | ||
236 | if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED)) | ||
237 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
238 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
239 | CON_SOCK_STATE_CONNECTING); | ||
240 | } | ||
241 | |||
242 | static void con_sock_state_connected(struct ceph_connection *con) | ||
243 | { | ||
244 | int old_state; | ||
245 | |||
246 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); | ||
247 | if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING)) | ||
248 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
249 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
250 | CON_SOCK_STATE_CONNECTED); | ||
251 | } | ||
252 | |||
253 | static void con_sock_state_closing(struct ceph_connection *con) | ||
254 | { | ||
255 | int old_state; | ||
256 | |||
257 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); | ||
258 | if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING && | ||
259 | old_state != CON_SOCK_STATE_CONNECTED && | ||
260 | old_state != CON_SOCK_STATE_CLOSING)) | ||
261 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
262 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
263 | CON_SOCK_STATE_CLOSING); | ||
264 | } | ||
265 | |||
266 | static void con_sock_state_closed(struct ceph_connection *con) | ||
267 | { | ||
268 | int old_state; | ||
269 | |||
270 | old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); | ||
271 | if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && | ||
272 | old_state != CON_SOCK_STATE_CLOSING && | ||
273 | old_state != CON_SOCK_STATE_CONNECTING && | ||
274 | old_state != CON_SOCK_STATE_CLOSED)) | ||
275 | printk("%s: unexpected old state %d\n", __func__, old_state); | ||
276 | dout("%s con %p sock %d -> %d\n", __func__, con, old_state, | ||
277 | CON_SOCK_STATE_CLOSED); | ||
278 | } | ||
150 | 279 | ||
151 | /* | 280 | /* |
152 | * socket callback functions | 281 | * socket callback functions |
153 | */ | 282 | */ |
154 | 283 | ||
155 | /* data available on socket, or listen socket received a connect */ | 284 | /* data available on socket, or listen socket received a connect */ |
156 | static void ceph_data_ready(struct sock *sk, int count_unused) | 285 | static void ceph_sock_data_ready(struct sock *sk, int count_unused) |
157 | { | 286 | { |
158 | struct ceph_connection *con = sk->sk_user_data; | 287 | struct ceph_connection *con = sk->sk_user_data; |
288 | if (atomic_read(&con->msgr->stopping)) { | ||
289 | return; | ||
290 | } | ||
159 | 291 | ||
160 | if (sk->sk_state != TCP_CLOSE_WAIT) { | 292 | if (sk->sk_state != TCP_CLOSE_WAIT) { |
161 | dout("ceph_data_ready on %p state = %lu, queueing work\n", | 293 | dout("%s on %p state = %lu, queueing work\n", __func__, |
162 | con, con->state); | 294 | con, con->state); |
163 | queue_con(con); | 295 | queue_con(con); |
164 | } | 296 | } |
165 | } | 297 | } |
166 | 298 | ||
167 | /* socket has buffer space for writing */ | 299 | /* socket has buffer space for writing */ |
168 | static void ceph_write_space(struct sock *sk) | 300 | static void ceph_sock_write_space(struct sock *sk) |
169 | { | 301 | { |
170 | struct ceph_connection *con = sk->sk_user_data; | 302 | struct ceph_connection *con = sk->sk_user_data; |
171 | 303 | ||
172 | /* only queue to workqueue if there is data we want to write, | 304 | /* only queue to workqueue if there is data we want to write, |
173 | * and there is sufficient space in the socket buffer to accept | 305 | * and there is sufficient space in the socket buffer to accept |
174 | * more data. clear SOCK_NOSPACE so that ceph_write_space() | 306 | * more data. clear SOCK_NOSPACE so that ceph_sock_write_space() |
175 | * doesn't get called again until try_write() fills the socket | 307 | * doesn't get called again until try_write() fills the socket |
176 | * buffer. See net/ipv4/tcp_input.c:tcp_check_space() | 308 | * buffer. See net/ipv4/tcp_input.c:tcp_check_space() |
177 | * and net/core/stream.c:sk_stream_write_space(). | 309 | * and net/core/stream.c:sk_stream_write_space(). |
178 | */ | 310 | */ |
179 | if (test_bit(WRITE_PENDING, &con->state)) { | 311 | if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) { |
180 | if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { | 312 | if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { |
181 | dout("ceph_write_space %p queueing write work\n", con); | 313 | dout("%s %p queueing write work\n", __func__, con); |
182 | clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); | 314 | clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); |
183 | queue_con(con); | 315 | queue_con(con); |
184 | } | 316 | } |
185 | } else { | 317 | } else { |
186 | dout("ceph_write_space %p nothing to write\n", con); | 318 | dout("%s %p nothing to write\n", __func__, con); |
187 | } | 319 | } |
188 | } | 320 | } |
189 | 321 | ||
190 | /* socket's state has changed */ | 322 | /* socket's state has changed */ |
191 | static void ceph_state_change(struct sock *sk) | 323 | static void ceph_sock_state_change(struct sock *sk) |
192 | { | 324 | { |
193 | struct ceph_connection *con = sk->sk_user_data; | 325 | struct ceph_connection *con = sk->sk_user_data; |
194 | 326 | ||
195 | dout("ceph_state_change %p state = %lu sk_state = %u\n", | 327 | dout("%s %p state = %lu sk_state = %u\n", __func__, |
196 | con, con->state, sk->sk_state); | 328 | con, con->state, sk->sk_state); |
197 | 329 | ||
198 | if (test_bit(CLOSED, &con->state)) | ||
199 | return; | ||
200 | |||
201 | switch (sk->sk_state) { | 330 | switch (sk->sk_state) { |
202 | case TCP_CLOSE: | 331 | case TCP_CLOSE: |
203 | dout("ceph_state_change TCP_CLOSE\n"); | 332 | dout("%s TCP_CLOSE\n", __func__); |
204 | case TCP_CLOSE_WAIT: | 333 | case TCP_CLOSE_WAIT: |
205 | dout("ceph_state_change TCP_CLOSE_WAIT\n"); | 334 | dout("%s TCP_CLOSE_WAIT\n", __func__); |
206 | if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { | 335 | con_sock_state_closing(con); |
207 | if (test_bit(CONNECTING, &con->state)) | 336 | set_bit(CON_FLAG_SOCK_CLOSED, &con->flags); |
208 | con->error_msg = "connection failed"; | 337 | queue_con(con); |
209 | else | ||
210 | con->error_msg = "socket closed"; | ||
211 | queue_con(con); | ||
212 | } | ||
213 | break; | 338 | break; |
214 | case TCP_ESTABLISHED: | 339 | case TCP_ESTABLISHED: |
215 | dout("ceph_state_change TCP_ESTABLISHED\n"); | 340 | dout("%s TCP_ESTABLISHED\n", __func__); |
341 | con_sock_state_connected(con); | ||
216 | queue_con(con); | 342 | queue_con(con); |
217 | break; | 343 | break; |
218 | default: /* Everything else is uninteresting */ | 344 | default: /* Everything else is uninteresting */ |
@@ -228,9 +354,9 @@ static void set_sock_callbacks(struct socket *sock, | |||
228 | { | 354 | { |
229 | struct sock *sk = sock->sk; | 355 | struct sock *sk = sock->sk; |
230 | sk->sk_user_data = con; | 356 | sk->sk_user_data = con; |
231 | sk->sk_data_ready = ceph_data_ready; | 357 | sk->sk_data_ready = ceph_sock_data_ready; |
232 | sk->sk_write_space = ceph_write_space; | 358 | sk->sk_write_space = ceph_sock_write_space; |
233 | sk->sk_state_change = ceph_state_change; | 359 | sk->sk_state_change = ceph_sock_state_change; |
234 | } | 360 | } |
235 | 361 | ||
236 | 362 | ||
@@ -262,6 +388,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) | |||
262 | 388 | ||
263 | dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); | 389 | dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); |
264 | 390 | ||
391 | con_sock_state_connecting(con); | ||
265 | ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), | 392 | ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), |
266 | O_NONBLOCK); | 393 | O_NONBLOCK); |
267 | if (ret == -EINPROGRESS) { | 394 | if (ret == -EINPROGRESS) { |
@@ -277,7 +404,6 @@ static int ceph_tcp_connect(struct ceph_connection *con) | |||
277 | return ret; | 404 | return ret; |
278 | } | 405 | } |
279 | con->sock = sock; | 406 | con->sock = sock; |
280 | |||
281 | return 0; | 407 | return 0; |
282 | } | 408 | } |
283 | 409 | ||
@@ -333,16 +459,24 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, | |||
333 | */ | 459 | */ |
334 | static int con_close_socket(struct ceph_connection *con) | 460 | static int con_close_socket(struct ceph_connection *con) |
335 | { | 461 | { |
336 | int rc; | 462 | int rc = 0; |
337 | 463 | ||
338 | dout("con_close_socket on %p sock %p\n", con, con->sock); | 464 | dout("con_close_socket on %p sock %p\n", con, con->sock); |
339 | if (!con->sock) | 465 | if (con->sock) { |
340 | return 0; | 466 | rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); |
341 | set_bit(SOCK_CLOSED, &con->state); | 467 | sock_release(con->sock); |
342 | rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); | 468 | con->sock = NULL; |
343 | sock_release(con->sock); | 469 | } |
344 | con->sock = NULL; | 470 | |
345 | clear_bit(SOCK_CLOSED, &con->state); | 471 | /* |
472 | * Forcibly clear the SOCK_CLOSED flag. It gets set | ||
473 | * independent of the connection mutex, and we could have | ||
474 | * received a socket close event before we had the chance to | ||
475 | * shut the socket down. | ||
476 | */ | ||
477 | clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); | ||
478 | |||
479 | con_sock_state_closed(con); | ||
346 | return rc; | 480 | return rc; |
347 | } | 481 | } |
348 | 482 | ||
@@ -353,6 +487,10 @@ static int con_close_socket(struct ceph_connection *con) | |||
353 | static void ceph_msg_remove(struct ceph_msg *msg) | 487 | static void ceph_msg_remove(struct ceph_msg *msg) |
354 | { | 488 | { |
355 | list_del_init(&msg->list_head); | 489 | list_del_init(&msg->list_head); |
490 | BUG_ON(msg->con == NULL); | ||
491 | msg->con->ops->put(msg->con); | ||
492 | msg->con = NULL; | ||
493 | |||
356 | ceph_msg_put(msg); | 494 | ceph_msg_put(msg); |
357 | } | 495 | } |
358 | static void ceph_msg_remove_list(struct list_head *head) | 496 | static void ceph_msg_remove_list(struct list_head *head) |
@@ -372,8 +510,11 @@ static void reset_connection(struct ceph_connection *con) | |||
372 | ceph_msg_remove_list(&con->out_sent); | 510 | ceph_msg_remove_list(&con->out_sent); |
373 | 511 | ||
374 | if (con->in_msg) { | 512 | if (con->in_msg) { |
513 | BUG_ON(con->in_msg->con != con); | ||
514 | con->in_msg->con = NULL; | ||
375 | ceph_msg_put(con->in_msg); | 515 | ceph_msg_put(con->in_msg); |
376 | con->in_msg = NULL; | 516 | con->in_msg = NULL; |
517 | con->ops->put(con); | ||
377 | } | 518 | } |
378 | 519 | ||
379 | con->connect_seq = 0; | 520 | con->connect_seq = 0; |
@@ -391,32 +532,44 @@ static void reset_connection(struct ceph_connection *con) | |||
391 | */ | 532 | */ |
392 | void ceph_con_close(struct ceph_connection *con) | 533 | void ceph_con_close(struct ceph_connection *con) |
393 | { | 534 | { |
535 | mutex_lock(&con->mutex); | ||
394 | dout("con_close %p peer %s\n", con, | 536 | dout("con_close %p peer %s\n", con, |
395 | ceph_pr_addr(&con->peer_addr.in_addr)); | 537 | ceph_pr_addr(&con->peer_addr.in_addr)); |
396 | set_bit(CLOSED, &con->state); /* in case there's queued work */ | 538 | con->state = CON_STATE_CLOSED; |
397 | clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ | 539 | |
398 | clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ | 540 | clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ |
399 | clear_bit(KEEPALIVE_PENDING, &con->state); | 541 | clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); |
400 | clear_bit(WRITE_PENDING, &con->state); | 542 | clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
401 | mutex_lock(&con->mutex); | 543 | clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); |
544 | clear_bit(CON_FLAG_BACKOFF, &con->flags); | ||
545 | |||
402 | reset_connection(con); | 546 | reset_connection(con); |
403 | con->peer_global_seq = 0; | 547 | con->peer_global_seq = 0; |
404 | cancel_delayed_work(&con->work); | 548 | cancel_delayed_work(&con->work); |
549 | con_close_socket(con); | ||
405 | mutex_unlock(&con->mutex); | 550 | mutex_unlock(&con->mutex); |
406 | queue_con(con); | ||
407 | } | 551 | } |
408 | EXPORT_SYMBOL(ceph_con_close); | 552 | EXPORT_SYMBOL(ceph_con_close); |
409 | 553 | ||
410 | /* | 554 | /* |
411 | * Reopen a closed connection, with a new peer address. | 555 | * Reopen a closed connection, with a new peer address. |
412 | */ | 556 | */ |
413 | void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) | 557 | void ceph_con_open(struct ceph_connection *con, |
558 | __u8 entity_type, __u64 entity_num, | ||
559 | struct ceph_entity_addr *addr) | ||
414 | { | 560 | { |
561 | mutex_lock(&con->mutex); | ||
415 | dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); | 562 | dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); |
416 | set_bit(OPENING, &con->state); | 563 | |
417 | clear_bit(CLOSED, &con->state); | 564 | BUG_ON(con->state != CON_STATE_CLOSED); |
565 | con->state = CON_STATE_PREOPEN; | ||
566 | |||
567 | con->peer_name.type = (__u8) entity_type; | ||
568 | con->peer_name.num = cpu_to_le64(entity_num); | ||
569 | |||
418 | memcpy(&con->peer_addr, addr, sizeof(*addr)); | 570 | memcpy(&con->peer_addr, addr, sizeof(*addr)); |
419 | con->delay = 0; /* reset backoff memory */ | 571 | con->delay = 0; /* reset backoff memory */ |
572 | mutex_unlock(&con->mutex); | ||
420 | queue_con(con); | 573 | queue_con(con); |
421 | } | 574 | } |
422 | EXPORT_SYMBOL(ceph_con_open); | 575 | EXPORT_SYMBOL(ceph_con_open); |
@@ -430,42 +583,26 @@ bool ceph_con_opened(struct ceph_connection *con) | |||
430 | } | 583 | } |
431 | 584 | ||
432 | /* | 585 | /* |
433 | * generic get/put | ||
434 | */ | ||
435 | struct ceph_connection *ceph_con_get(struct ceph_connection *con) | ||
436 | { | ||
437 | int nref = __atomic_add_unless(&con->nref, 1, 0); | ||
438 | |||
439 | dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1); | ||
440 | |||
441 | return nref ? con : NULL; | ||
442 | } | ||
443 | |||
444 | void ceph_con_put(struct ceph_connection *con) | ||
445 | { | ||
446 | int nref = atomic_dec_return(&con->nref); | ||
447 | |||
448 | BUG_ON(nref < 0); | ||
449 | if (nref == 0) { | ||
450 | BUG_ON(con->sock); | ||
451 | kfree(con); | ||
452 | } | ||
453 | dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref); | ||
454 | } | ||
455 | |||
456 | /* | ||
457 | * initialize a new connection. | 586 | * initialize a new connection. |
458 | */ | 587 | */ |
459 | void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) | 588 | void ceph_con_init(struct ceph_connection *con, void *private, |
589 | const struct ceph_connection_operations *ops, | ||
590 | struct ceph_messenger *msgr) | ||
460 | { | 591 | { |
461 | dout("con_init %p\n", con); | 592 | dout("con_init %p\n", con); |
462 | memset(con, 0, sizeof(*con)); | 593 | memset(con, 0, sizeof(*con)); |
463 | atomic_set(&con->nref, 1); | 594 | con->private = private; |
595 | con->ops = ops; | ||
464 | con->msgr = msgr; | 596 | con->msgr = msgr; |
597 | |||
598 | con_sock_state_init(con); | ||
599 | |||
465 | mutex_init(&con->mutex); | 600 | mutex_init(&con->mutex); |
466 | INIT_LIST_HEAD(&con->out_queue); | 601 | INIT_LIST_HEAD(&con->out_queue); |
467 | INIT_LIST_HEAD(&con->out_sent); | 602 | INIT_LIST_HEAD(&con->out_sent); |
468 | INIT_DELAYED_WORK(&con->work, con_work); | 603 | INIT_DELAYED_WORK(&con->work, con_work); |
604 | |||
605 | con->state = CON_STATE_CLOSED; | ||
469 | } | 606 | } |
470 | EXPORT_SYMBOL(ceph_con_init); | 607 | EXPORT_SYMBOL(ceph_con_init); |
471 | 608 | ||
@@ -486,14 +623,14 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) | |||
486 | return ret; | 623 | return ret; |
487 | } | 624 | } |
488 | 625 | ||
489 | static void ceph_con_out_kvec_reset(struct ceph_connection *con) | 626 | static void con_out_kvec_reset(struct ceph_connection *con) |
490 | { | 627 | { |
491 | con->out_kvec_left = 0; | 628 | con->out_kvec_left = 0; |
492 | con->out_kvec_bytes = 0; | 629 | con->out_kvec_bytes = 0; |
493 | con->out_kvec_cur = &con->out_kvec[0]; | 630 | con->out_kvec_cur = &con->out_kvec[0]; |
494 | } | 631 | } |
495 | 632 | ||
496 | static void ceph_con_out_kvec_add(struct ceph_connection *con, | 633 | static void con_out_kvec_add(struct ceph_connection *con, |
497 | size_t size, void *data) | 634 | size_t size, void *data) |
498 | { | 635 | { |
499 | int index; | 636 | int index; |
@@ -507,6 +644,53 @@ static void ceph_con_out_kvec_add(struct ceph_connection *con, | |||
507 | con->out_kvec_bytes += size; | 644 | con->out_kvec_bytes += size; |
508 | } | 645 | } |
509 | 646 | ||
647 | #ifdef CONFIG_BLOCK | ||
648 | static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) | ||
649 | { | ||
650 | if (!bio) { | ||
651 | *iter = NULL; | ||
652 | *seg = 0; | ||
653 | return; | ||
654 | } | ||
655 | *iter = bio; | ||
656 | *seg = bio->bi_idx; | ||
657 | } | ||
658 | |||
659 | static void iter_bio_next(struct bio **bio_iter, int *seg) | ||
660 | { | ||
661 | if (*bio_iter == NULL) | ||
662 | return; | ||
663 | |||
664 | BUG_ON(*seg >= (*bio_iter)->bi_vcnt); | ||
665 | |||
666 | (*seg)++; | ||
667 | if (*seg == (*bio_iter)->bi_vcnt) | ||
668 | init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); | ||
669 | } | ||
670 | #endif | ||
671 | |||
672 | static void prepare_write_message_data(struct ceph_connection *con) | ||
673 | { | ||
674 | struct ceph_msg *msg = con->out_msg; | ||
675 | |||
676 | BUG_ON(!msg); | ||
677 | BUG_ON(!msg->hdr.data_len); | ||
678 | |||
679 | /* initialize page iterator */ | ||
680 | con->out_msg_pos.page = 0; | ||
681 | if (msg->pages) | ||
682 | con->out_msg_pos.page_pos = msg->page_alignment; | ||
683 | else | ||
684 | con->out_msg_pos.page_pos = 0; | ||
685 | #ifdef CONFIG_BLOCK | ||
686 | if (msg->bio) | ||
687 | init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); | ||
688 | #endif | ||
689 | con->out_msg_pos.data_pos = 0; | ||
690 | con->out_msg_pos.did_page_crc = false; | ||
691 | con->out_more = 1; /* data + footer will follow */ | ||
692 | } | ||
693 | |||
510 | /* | 694 | /* |
511 | * Prepare footer for currently outgoing message, and finish things | 695 | * Prepare footer for currently outgoing message, and finish things |
512 | * off. Assumes out_kvec* are already valid.. we just add on to the end. | 696 | * off. Assumes out_kvec* are already valid.. we just add on to the end. |
@@ -516,6 +700,8 @@ static void prepare_write_message_footer(struct ceph_connection *con) | |||
516 | struct ceph_msg *m = con->out_msg; | 700 | struct ceph_msg *m = con->out_msg; |
517 | int v = con->out_kvec_left; | 701 | int v = con->out_kvec_left; |
518 | 702 | ||
703 | m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; | ||
704 | |||
519 | dout("prepare_write_message_footer %p\n", con); | 705 | dout("prepare_write_message_footer %p\n", con); |
520 | con->out_kvec_is_msg = true; | 706 | con->out_kvec_is_msg = true; |
521 | con->out_kvec[v].iov_base = &m->footer; | 707 | con->out_kvec[v].iov_base = &m->footer; |
@@ -534,7 +720,7 @@ static void prepare_write_message(struct ceph_connection *con) | |||
534 | struct ceph_msg *m; | 720 | struct ceph_msg *m; |
535 | u32 crc; | 721 | u32 crc; |
536 | 722 | ||
537 | ceph_con_out_kvec_reset(con); | 723 | con_out_kvec_reset(con); |
538 | con->out_kvec_is_msg = true; | 724 | con->out_kvec_is_msg = true; |
539 | con->out_msg_done = false; | 725 | con->out_msg_done = false; |
540 | 726 | ||
@@ -542,14 +728,16 @@ static void prepare_write_message(struct ceph_connection *con) | |||
542 | * TCP packet that's a good thing. */ | 728 | * TCP packet that's a good thing. */ |
543 | if (con->in_seq > con->in_seq_acked) { | 729 | if (con->in_seq > con->in_seq_acked) { |
544 | con->in_seq_acked = con->in_seq; | 730 | con->in_seq_acked = con->in_seq; |
545 | ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); | 731 | con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); |
546 | con->out_temp_ack = cpu_to_le64(con->in_seq_acked); | 732 | con->out_temp_ack = cpu_to_le64(con->in_seq_acked); |
547 | ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), | 733 | con_out_kvec_add(con, sizeof (con->out_temp_ack), |
548 | &con->out_temp_ack); | 734 | &con->out_temp_ack); |
549 | } | 735 | } |
550 | 736 | ||
737 | BUG_ON(list_empty(&con->out_queue)); | ||
551 | m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); | 738 | m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); |
552 | con->out_msg = m; | 739 | con->out_msg = m; |
740 | BUG_ON(m->con != con); | ||
553 | 741 | ||
554 | /* put message on sent list */ | 742 | /* put message on sent list */ |
555 | ceph_msg_get(m); | 743 | ceph_msg_get(m); |
@@ -576,18 +764,18 @@ static void prepare_write_message(struct ceph_connection *con) | |||
576 | BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); | 764 | BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); |
577 | 765 | ||
578 | /* tag + hdr + front + middle */ | 766 | /* tag + hdr + front + middle */ |
579 | ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); | 767 | con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); |
580 | ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); | 768 | con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); |
581 | ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); | 769 | con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); |
582 | 770 | ||
583 | if (m->middle) | 771 | if (m->middle) |
584 | ceph_con_out_kvec_add(con, m->middle->vec.iov_len, | 772 | con_out_kvec_add(con, m->middle->vec.iov_len, |
585 | m->middle->vec.iov_base); | 773 | m->middle->vec.iov_base); |
586 | 774 | ||
587 | /* fill in crc (except data pages), footer */ | 775 | /* fill in crc (except data pages), footer */ |
588 | crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); | 776 | crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); |
589 | con->out_msg->hdr.crc = cpu_to_le32(crc); | 777 | con->out_msg->hdr.crc = cpu_to_le32(crc); |
590 | con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; | 778 | con->out_msg->footer.flags = 0; |
591 | 779 | ||
592 | crc = crc32c(0, m->front.iov_base, m->front.iov_len); | 780 | crc = crc32c(0, m->front.iov_base, m->front.iov_len); |
593 | con->out_msg->footer.front_crc = cpu_to_le32(crc); | 781 | con->out_msg->footer.front_crc = cpu_to_le32(crc); |
@@ -597,28 +785,19 @@ static void prepare_write_message(struct ceph_connection *con) | |||
597 | con->out_msg->footer.middle_crc = cpu_to_le32(crc); | 785 | con->out_msg->footer.middle_crc = cpu_to_le32(crc); |
598 | } else | 786 | } else |
599 | con->out_msg->footer.middle_crc = 0; | 787 | con->out_msg->footer.middle_crc = 0; |
600 | con->out_msg->footer.data_crc = 0; | 788 | dout("%s front_crc %u middle_crc %u\n", __func__, |
601 | dout("prepare_write_message front_crc %u data_crc %u\n", | ||
602 | le32_to_cpu(con->out_msg->footer.front_crc), | 789 | le32_to_cpu(con->out_msg->footer.front_crc), |
603 | le32_to_cpu(con->out_msg->footer.middle_crc)); | 790 | le32_to_cpu(con->out_msg->footer.middle_crc)); |
604 | 791 | ||
605 | /* is there a data payload? */ | 792 | /* is there a data payload? */ |
606 | if (le32_to_cpu(m->hdr.data_len) > 0) { | 793 | con->out_msg->footer.data_crc = 0; |
607 | /* initialize page iterator */ | 794 | if (m->hdr.data_len) |
608 | con->out_msg_pos.page = 0; | 795 | prepare_write_message_data(con); |
609 | if (m->pages) | 796 | else |
610 | con->out_msg_pos.page_pos = m->page_alignment; | ||
611 | else | ||
612 | con->out_msg_pos.page_pos = 0; | ||
613 | con->out_msg_pos.data_pos = 0; | ||
614 | con->out_msg_pos.did_page_crc = false; | ||
615 | con->out_more = 1; /* data + footer will follow */ | ||
616 | } else { | ||
617 | /* no, queue up footer too and be done */ | 797 | /* no, queue up footer too and be done */ |
618 | prepare_write_message_footer(con); | 798 | prepare_write_message_footer(con); |
619 | } | ||
620 | 799 | ||
621 | set_bit(WRITE_PENDING, &con->state); | 800 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
622 | } | 801 | } |
623 | 802 | ||
624 | /* | 803 | /* |
@@ -630,16 +809,16 @@ static void prepare_write_ack(struct ceph_connection *con) | |||
630 | con->in_seq_acked, con->in_seq); | 809 | con->in_seq_acked, con->in_seq); |
631 | con->in_seq_acked = con->in_seq; | 810 | con->in_seq_acked = con->in_seq; |
632 | 811 | ||
633 | ceph_con_out_kvec_reset(con); | 812 | con_out_kvec_reset(con); |
634 | 813 | ||
635 | ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); | 814 | con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); |
636 | 815 | ||
637 | con->out_temp_ack = cpu_to_le64(con->in_seq_acked); | 816 | con->out_temp_ack = cpu_to_le64(con->in_seq_acked); |
638 | ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), | 817 | con_out_kvec_add(con, sizeof (con->out_temp_ack), |
639 | &con->out_temp_ack); | 818 | &con->out_temp_ack); |
640 | 819 | ||
641 | con->out_more = 1; /* more will follow.. eventually.. */ | 820 | con->out_more = 1; /* more will follow.. eventually.. */ |
642 | set_bit(WRITE_PENDING, &con->state); | 821 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
643 | } | 822 | } |
644 | 823 | ||
645 | /* | 824 | /* |
@@ -648,9 +827,9 @@ static void prepare_write_ack(struct ceph_connection *con) | |||
648 | static void prepare_write_keepalive(struct ceph_connection *con) | 827 | static void prepare_write_keepalive(struct ceph_connection *con) |
649 | { | 828 | { |
650 | dout("prepare_write_keepalive %p\n", con); | 829 | dout("prepare_write_keepalive %p\n", con); |
651 | ceph_con_out_kvec_reset(con); | 830 | con_out_kvec_reset(con); |
652 | ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); | 831 | con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); |
653 | set_bit(WRITE_PENDING, &con->state); | 832 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
654 | } | 833 | } |
655 | 834 | ||
656 | /* | 835 | /* |
@@ -665,27 +844,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection | |||
665 | if (!con->ops->get_authorizer) { | 844 | if (!con->ops->get_authorizer) { |
666 | con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; | 845 | con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; |
667 | con->out_connect.authorizer_len = 0; | 846 | con->out_connect.authorizer_len = 0; |
668 | |||
669 | return NULL; | 847 | return NULL; |
670 | } | 848 | } |
671 | 849 | ||
672 | /* Can't hold the mutex while getting authorizer */ | 850 | /* Can't hold the mutex while getting authorizer */ |
673 | |||
674 | mutex_unlock(&con->mutex); | 851 | mutex_unlock(&con->mutex); |
675 | |||
676 | auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); | 852 | auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); |
677 | |||
678 | mutex_lock(&con->mutex); | 853 | mutex_lock(&con->mutex); |
679 | 854 | ||
680 | if (IS_ERR(auth)) | 855 | if (IS_ERR(auth)) |
681 | return auth; | 856 | return auth; |
682 | if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) | 857 | if (con->state != CON_STATE_NEGOTIATING) |
683 | return ERR_PTR(-EAGAIN); | 858 | return ERR_PTR(-EAGAIN); |
684 | 859 | ||
685 | con->auth_reply_buf = auth->authorizer_reply_buf; | 860 | con->auth_reply_buf = auth->authorizer_reply_buf; |
686 | con->auth_reply_buf_len = auth->authorizer_reply_buf_len; | 861 | con->auth_reply_buf_len = auth->authorizer_reply_buf_len; |
687 | |||
688 | |||
689 | return auth; | 862 | return auth; |
690 | } | 863 | } |
691 | 864 | ||
@@ -694,12 +867,12 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection | |||
694 | */ | 867 | */ |
695 | static void prepare_write_banner(struct ceph_connection *con) | 868 | static void prepare_write_banner(struct ceph_connection *con) |
696 | { | 869 | { |
697 | ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); | 870 | con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); |
698 | ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), | 871 | con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), |
699 | &con->msgr->my_enc_addr); | 872 | &con->msgr->my_enc_addr); |
700 | 873 | ||
701 | con->out_more = 0; | 874 | con->out_more = 0; |
702 | set_bit(WRITE_PENDING, &con->state); | 875 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
703 | } | 876 | } |
704 | 877 | ||
705 | static int prepare_write_connect(struct ceph_connection *con) | 878 | static int prepare_write_connect(struct ceph_connection *con) |
@@ -742,14 +915,15 @@ static int prepare_write_connect(struct ceph_connection *con) | |||
742 | con->out_connect.authorizer_len = auth ? | 915 | con->out_connect.authorizer_len = auth ? |
743 | cpu_to_le32(auth->authorizer_buf_len) : 0; | 916 | cpu_to_le32(auth->authorizer_buf_len) : 0; |
744 | 917 | ||
745 | ceph_con_out_kvec_add(con, sizeof (con->out_connect), | 918 | con_out_kvec_reset(con); |
919 | con_out_kvec_add(con, sizeof (con->out_connect), | ||
746 | &con->out_connect); | 920 | &con->out_connect); |
747 | if (auth && auth->authorizer_buf_len) | 921 | if (auth && auth->authorizer_buf_len) |
748 | ceph_con_out_kvec_add(con, auth->authorizer_buf_len, | 922 | con_out_kvec_add(con, auth->authorizer_buf_len, |
749 | auth->authorizer_buf); | 923 | auth->authorizer_buf); |
750 | 924 | ||
751 | con->out_more = 0; | 925 | con->out_more = 0; |
752 | set_bit(WRITE_PENDING, &con->state); | 926 | set_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
753 | 927 | ||
754 | return 0; | 928 | return 0; |
755 | } | 929 | } |
@@ -797,30 +971,34 @@ out: | |||
797 | return ret; /* done! */ | 971 | return ret; /* done! */ |
798 | } | 972 | } |
799 | 973 | ||
800 | #ifdef CONFIG_BLOCK | 974 | static void out_msg_pos_next(struct ceph_connection *con, struct page *page, |
801 | static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) | 975 | size_t len, size_t sent, bool in_trail) |
802 | { | 976 | { |
803 | if (!bio) { | 977 | struct ceph_msg *msg = con->out_msg; |
804 | *iter = NULL; | ||
805 | *seg = 0; | ||
806 | return; | ||
807 | } | ||
808 | *iter = bio; | ||
809 | *seg = bio->bi_idx; | ||
810 | } | ||
811 | 978 | ||
812 | static void iter_bio_next(struct bio **bio_iter, int *seg) | 979 | BUG_ON(!msg); |
813 | { | 980 | BUG_ON(!sent); |
814 | if (*bio_iter == NULL) | ||
815 | return; | ||
816 | 981 | ||
817 | BUG_ON(*seg >= (*bio_iter)->bi_vcnt); | 982 | con->out_msg_pos.data_pos += sent; |
983 | con->out_msg_pos.page_pos += sent; | ||
984 | if (sent < len) | ||
985 | return; | ||
818 | 986 | ||
819 | (*seg)++; | 987 | BUG_ON(sent != len); |
820 | if (*seg == (*bio_iter)->bi_vcnt) | 988 | con->out_msg_pos.page_pos = 0; |
821 | init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); | 989 | con->out_msg_pos.page++; |
822 | } | 990 | con->out_msg_pos.did_page_crc = false; |
991 | if (in_trail) | ||
992 | list_move_tail(&page->lru, | ||
993 | &msg->trail->head); | ||
994 | else if (msg->pagelist) | ||
995 | list_move_tail(&page->lru, | ||
996 | &msg->pagelist->head); | ||
997 | #ifdef CONFIG_BLOCK | ||
998 | else if (msg->bio) | ||
999 | iter_bio_next(&msg->bio_iter, &msg->bio_seg); | ||
823 | #endif | 1000 | #endif |
1001 | } | ||
824 | 1002 | ||
825 | /* | 1003 | /* |
826 | * Write as much message data payload as we can. If we finish, queue | 1004 | * Write as much message data payload as we can. If we finish, queue |
@@ -837,41 +1015,36 @@ static int write_partial_msg_pages(struct ceph_connection *con) | |||
837 | bool do_datacrc = !con->msgr->nocrc; | 1015 | bool do_datacrc = !con->msgr->nocrc; |
838 | int ret; | 1016 | int ret; |
839 | int total_max_write; | 1017 | int total_max_write; |
840 | int in_trail = 0; | 1018 | bool in_trail = false; |
841 | size_t trail_len = (msg->trail ? msg->trail->length : 0); | 1019 | const size_t trail_len = (msg->trail ? msg->trail->length : 0); |
1020 | const size_t trail_off = data_len - trail_len; | ||
842 | 1021 | ||
843 | dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", | 1022 | dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", |
844 | con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, | 1023 | con, msg, con->out_msg_pos.page, msg->nr_pages, |
845 | con->out_msg_pos.page_pos); | 1024 | con->out_msg_pos.page_pos); |
846 | 1025 | ||
847 | #ifdef CONFIG_BLOCK | 1026 | /* |
848 | if (msg->bio && !msg->bio_iter) | 1027 | * Iterate through each page that contains data to be |
849 | init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); | 1028 | * written, and send as much as possible for each. |
850 | #endif | 1029 | * |
851 | 1030 | * If we are calculating the data crc (the default), we will | |
1031 | * need to map the page. If we have no pages, they have | ||
1032 | * been revoked, so use the zero page. | ||
1033 | */ | ||
852 | while (data_len > con->out_msg_pos.data_pos) { | 1034 | while (data_len > con->out_msg_pos.data_pos) { |
853 | struct page *page = NULL; | 1035 | struct page *page = NULL; |
854 | int max_write = PAGE_SIZE; | 1036 | int max_write = PAGE_SIZE; |
855 | int bio_offset = 0; | 1037 | int bio_offset = 0; |
856 | 1038 | ||
857 | total_max_write = data_len - trail_len - | 1039 | in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off; |
858 | con->out_msg_pos.data_pos; | 1040 | if (!in_trail) |
859 | 1041 | total_max_write = trail_off - con->out_msg_pos.data_pos; | |
860 | /* | ||
861 | * if we are calculating the data crc (the default), we need | ||
862 | * to map the page. if our pages[] has been revoked, use the | ||
863 | * zero page. | ||
864 | */ | ||
865 | |||
866 | /* have we reached the trail part of the data? */ | ||
867 | if (con->out_msg_pos.data_pos >= data_len - trail_len) { | ||
868 | in_trail = 1; | ||
869 | 1042 | ||
1043 | if (in_trail) { | ||
870 | total_max_write = data_len - con->out_msg_pos.data_pos; | 1044 | total_max_write = data_len - con->out_msg_pos.data_pos; |
871 | 1045 | ||
872 | page = list_first_entry(&msg->trail->head, | 1046 | page = list_first_entry(&msg->trail->head, |
873 | struct page, lru); | 1047 | struct page, lru); |
874 | max_write = PAGE_SIZE; | ||
875 | } else if (msg->pages) { | 1048 | } else if (msg->pages) { |
876 | page = msg->pages[con->out_msg_pos.page]; | 1049 | page = msg->pages[con->out_msg_pos.page]; |
877 | } else if (msg->pagelist) { | 1050 | } else if (msg->pagelist) { |
@@ -894,15 +1067,14 @@ static int write_partial_msg_pages(struct ceph_connection *con) | |||
894 | 1067 | ||
895 | if (do_datacrc && !con->out_msg_pos.did_page_crc) { | 1068 | if (do_datacrc && !con->out_msg_pos.did_page_crc) { |
896 | void *base; | 1069 | void *base; |
897 | u32 crc; | 1070 | u32 crc = le32_to_cpu(msg->footer.data_crc); |
898 | u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); | ||
899 | char *kaddr; | 1071 | char *kaddr; |
900 | 1072 | ||
901 | kaddr = kmap(page); | 1073 | kaddr = kmap(page); |
902 | BUG_ON(kaddr == NULL); | 1074 | BUG_ON(kaddr == NULL); |
903 | base = kaddr + con->out_msg_pos.page_pos + bio_offset; | 1075 | base = kaddr + con->out_msg_pos.page_pos + bio_offset; |
904 | crc = crc32c(tmpcrc, base, len); | 1076 | crc = crc32c(crc, base, len); |
905 | con->out_msg->footer.data_crc = cpu_to_le32(crc); | 1077 | msg->footer.data_crc = cpu_to_le32(crc); |
906 | con->out_msg_pos.did_page_crc = true; | 1078 | con->out_msg_pos.did_page_crc = true; |
907 | } | 1079 | } |
908 | ret = ceph_tcp_sendpage(con->sock, page, | 1080 | ret = ceph_tcp_sendpage(con->sock, page, |
@@ -915,31 +1087,15 @@ static int write_partial_msg_pages(struct ceph_connection *con) | |||
915 | if (ret <= 0) | 1087 | if (ret <= 0) |
916 | goto out; | 1088 | goto out; |
917 | 1089 | ||
918 | con->out_msg_pos.data_pos += ret; | 1090 | out_msg_pos_next(con, page, len, (size_t) ret, in_trail); |
919 | con->out_msg_pos.page_pos += ret; | ||
920 | if (ret == len) { | ||
921 | con->out_msg_pos.page_pos = 0; | ||
922 | con->out_msg_pos.page++; | ||
923 | con->out_msg_pos.did_page_crc = false; | ||
924 | if (in_trail) | ||
925 | list_move_tail(&page->lru, | ||
926 | &msg->trail->head); | ||
927 | else if (msg->pagelist) | ||
928 | list_move_tail(&page->lru, | ||
929 | &msg->pagelist->head); | ||
930 | #ifdef CONFIG_BLOCK | ||
931 | else if (msg->bio) | ||
932 | iter_bio_next(&msg->bio_iter, &msg->bio_seg); | ||
933 | #endif | ||
934 | } | ||
935 | } | 1091 | } |
936 | 1092 | ||
937 | dout("write_partial_msg_pages %p msg %p done\n", con, msg); | 1093 | dout("write_partial_msg_pages %p msg %p done\n", con, msg); |
938 | 1094 | ||
939 | /* prepare and queue up footer, too */ | 1095 | /* prepare and queue up footer, too */ |
940 | if (!do_datacrc) | 1096 | if (!do_datacrc) |
941 | con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; | 1097 | msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; |
942 | ceph_con_out_kvec_reset(con); | 1098 | con_out_kvec_reset(con); |
943 | prepare_write_message_footer(con); | 1099 | prepare_write_message_footer(con); |
944 | ret = 1; | 1100 | ret = 1; |
945 | out: | 1101 | out: |
@@ -1351,20 +1507,14 @@ static int process_banner(struct ceph_connection *con) | |||
1351 | ceph_pr_addr(&con->msgr->inst.addr.in_addr)); | 1507 | ceph_pr_addr(&con->msgr->inst.addr.in_addr)); |
1352 | } | 1508 | } |
1353 | 1509 | ||
1354 | set_bit(NEGOTIATING, &con->state); | ||
1355 | prepare_read_connect(con); | ||
1356 | return 0; | 1510 | return 0; |
1357 | } | 1511 | } |
1358 | 1512 | ||
1359 | static void fail_protocol(struct ceph_connection *con) | 1513 | static void fail_protocol(struct ceph_connection *con) |
1360 | { | 1514 | { |
1361 | reset_connection(con); | 1515 | reset_connection(con); |
1362 | set_bit(CLOSED, &con->state); /* in case there's queued work */ | 1516 | BUG_ON(con->state != CON_STATE_NEGOTIATING); |
1363 | 1517 | con->state = CON_STATE_CLOSED; | |
1364 | mutex_unlock(&con->mutex); | ||
1365 | if (con->ops->bad_proto) | ||
1366 | con->ops->bad_proto(con); | ||
1367 | mutex_lock(&con->mutex); | ||
1368 | } | 1518 | } |
1369 | 1519 | ||
1370 | static int process_connect(struct ceph_connection *con) | 1520 | static int process_connect(struct ceph_connection *con) |
@@ -1407,7 +1557,6 @@ static int process_connect(struct ceph_connection *con) | |||
1407 | return -1; | 1557 | return -1; |
1408 | } | 1558 | } |
1409 | con->auth_retry = 1; | 1559 | con->auth_retry = 1; |
1410 | ceph_con_out_kvec_reset(con); | ||
1411 | ret = prepare_write_connect(con); | 1560 | ret = prepare_write_connect(con); |
1412 | if (ret < 0) | 1561 | if (ret < 0) |
1413 | return ret; | 1562 | return ret; |
@@ -1428,7 +1577,6 @@ static int process_connect(struct ceph_connection *con) | |||
1428 | ENTITY_NAME(con->peer_name), | 1577 | ENTITY_NAME(con->peer_name), |
1429 | ceph_pr_addr(&con->peer_addr.in_addr)); | 1578 | ceph_pr_addr(&con->peer_addr.in_addr)); |
1430 | reset_connection(con); | 1579 | reset_connection(con); |
1431 | ceph_con_out_kvec_reset(con); | ||
1432 | ret = prepare_write_connect(con); | 1580 | ret = prepare_write_connect(con); |
1433 | if (ret < 0) | 1581 | if (ret < 0) |
1434 | return ret; | 1582 | return ret; |
@@ -1440,8 +1588,7 @@ static int process_connect(struct ceph_connection *con) | |||
1440 | if (con->ops->peer_reset) | 1588 | if (con->ops->peer_reset) |
1441 | con->ops->peer_reset(con); | 1589 | con->ops->peer_reset(con); |
1442 | mutex_lock(&con->mutex); | 1590 | mutex_lock(&con->mutex); |
1443 | if (test_bit(CLOSED, &con->state) || | 1591 | if (con->state != CON_STATE_NEGOTIATING) |
1444 | test_bit(OPENING, &con->state)) | ||
1445 | return -EAGAIN; | 1592 | return -EAGAIN; |
1446 | break; | 1593 | break; |
1447 | 1594 | ||
@@ -1454,7 +1601,6 @@ static int process_connect(struct ceph_connection *con) | |||
1454 | le32_to_cpu(con->out_connect.connect_seq), | 1601 | le32_to_cpu(con->out_connect.connect_seq), |
1455 | le32_to_cpu(con->in_reply.connect_seq)); | 1602 | le32_to_cpu(con->in_reply.connect_seq)); |
1456 | con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); | 1603 | con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); |
1457 | ceph_con_out_kvec_reset(con); | ||
1458 | ret = prepare_write_connect(con); | 1604 | ret = prepare_write_connect(con); |
1459 | if (ret < 0) | 1605 | if (ret < 0) |
1460 | return ret; | 1606 | return ret; |
@@ -1471,7 +1617,6 @@ static int process_connect(struct ceph_connection *con) | |||
1471 | le32_to_cpu(con->in_reply.global_seq)); | 1617 | le32_to_cpu(con->in_reply.global_seq)); |
1472 | get_global_seq(con->msgr, | 1618 | get_global_seq(con->msgr, |
1473 | le32_to_cpu(con->in_reply.global_seq)); | 1619 | le32_to_cpu(con->in_reply.global_seq)); |
1474 | ceph_con_out_kvec_reset(con); | ||
1475 | ret = prepare_write_connect(con); | 1620 | ret = prepare_write_connect(con); |
1476 | if (ret < 0) | 1621 | if (ret < 0) |
1477 | return ret; | 1622 | return ret; |
@@ -1489,7 +1634,10 @@ static int process_connect(struct ceph_connection *con) | |||
1489 | fail_protocol(con); | 1634 | fail_protocol(con); |
1490 | return -1; | 1635 | return -1; |
1491 | } | 1636 | } |
1492 | clear_bit(CONNECTING, &con->state); | 1637 | |
1638 | BUG_ON(con->state != CON_STATE_NEGOTIATING); | ||
1639 | con->state = CON_STATE_OPEN; | ||
1640 | |||
1493 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); | 1641 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); |
1494 | con->connect_seq++; | 1642 | con->connect_seq++; |
1495 | con->peer_features = server_feat; | 1643 | con->peer_features = server_feat; |
@@ -1501,7 +1649,9 @@ static int process_connect(struct ceph_connection *con) | |||
1501 | le32_to_cpu(con->in_reply.connect_seq)); | 1649 | le32_to_cpu(con->in_reply.connect_seq)); |
1502 | 1650 | ||
1503 | if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) | 1651 | if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) |
1504 | set_bit(LOSSYTX, &con->state); | 1652 | set_bit(CON_FLAG_LOSSYTX, &con->flags); |
1653 | |||
1654 | con->delay = 0; /* reset backoff memory */ | ||
1505 | 1655 | ||
1506 | prepare_read_tag(con); | 1656 | prepare_read_tag(con); |
1507 | break; | 1657 | break; |
@@ -1587,10 +1737,7 @@ static int read_partial_message_section(struct ceph_connection *con, | |||
1587 | return 1; | 1737 | return 1; |
1588 | } | 1738 | } |
1589 | 1739 | ||
1590 | static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | 1740 | static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip); |
1591 | struct ceph_msg_header *hdr, | ||
1592 | int *skip); | ||
1593 | |||
1594 | 1741 | ||
1595 | static int read_partial_message_pages(struct ceph_connection *con, | 1742 | static int read_partial_message_pages(struct ceph_connection *con, |
1596 | struct page **pages, | 1743 | struct page **pages, |
@@ -1633,9 +1780,6 @@ static int read_partial_message_bio(struct ceph_connection *con, | |||
1633 | void *p; | 1780 | void *p; |
1634 | int ret, left; | 1781 | int ret, left; |
1635 | 1782 | ||
1636 | if (IS_ERR(bv)) | ||
1637 | return PTR_ERR(bv); | ||
1638 | |||
1639 | left = min((int)(data_len - con->in_msg_pos.data_pos), | 1783 | left = min((int)(data_len - con->in_msg_pos.data_pos), |
1640 | (int)(bv->bv_len - con->in_msg_pos.page_pos)); | 1784 | (int)(bv->bv_len - con->in_msg_pos.page_pos)); |
1641 | 1785 | ||
@@ -1672,7 +1816,6 @@ static int read_partial_message(struct ceph_connection *con) | |||
1672 | int ret; | 1816 | int ret; |
1673 | unsigned int front_len, middle_len, data_len; | 1817 | unsigned int front_len, middle_len, data_len; |
1674 | bool do_datacrc = !con->msgr->nocrc; | 1818 | bool do_datacrc = !con->msgr->nocrc; |
1675 | int skip; | ||
1676 | u64 seq; | 1819 | u64 seq; |
1677 | u32 crc; | 1820 | u32 crc; |
1678 | 1821 | ||
@@ -1723,10 +1866,13 @@ static int read_partial_message(struct ceph_connection *con) | |||
1723 | 1866 | ||
1724 | /* allocate message? */ | 1867 | /* allocate message? */ |
1725 | if (!con->in_msg) { | 1868 | if (!con->in_msg) { |
1869 | int skip = 0; | ||
1870 | |||
1726 | dout("got hdr type %d front %d data %d\n", con->in_hdr.type, | 1871 | dout("got hdr type %d front %d data %d\n", con->in_hdr.type, |
1727 | con->in_hdr.front_len, con->in_hdr.data_len); | 1872 | con->in_hdr.front_len, con->in_hdr.data_len); |
1728 | skip = 0; | 1873 | ret = ceph_con_in_msg_alloc(con, &skip); |
1729 | con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); | 1874 | if (ret < 0) |
1875 | return ret; | ||
1730 | if (skip) { | 1876 | if (skip) { |
1731 | /* skip this message */ | 1877 | /* skip this message */ |
1732 | dout("alloc_msg said skip message\n"); | 1878 | dout("alloc_msg said skip message\n"); |
@@ -1737,11 +1883,9 @@ static int read_partial_message(struct ceph_connection *con) | |||
1737 | con->in_seq++; | 1883 | con->in_seq++; |
1738 | return 0; | 1884 | return 0; |
1739 | } | 1885 | } |
1740 | if (!con->in_msg) { | 1886 | |
1741 | con->error_msg = | 1887 | BUG_ON(!con->in_msg); |
1742 | "error allocating memory for incoming message"; | 1888 | BUG_ON(con->in_msg->con != con); |
1743 | return -ENOMEM; | ||
1744 | } | ||
1745 | m = con->in_msg; | 1889 | m = con->in_msg; |
1746 | m->front.iov_len = 0; /* haven't read it yet */ | 1890 | m->front.iov_len = 0; /* haven't read it yet */ |
1747 | if (m->middle) | 1891 | if (m->middle) |
@@ -1753,6 +1897,11 @@ static int read_partial_message(struct ceph_connection *con) | |||
1753 | else | 1897 | else |
1754 | con->in_msg_pos.page_pos = 0; | 1898 | con->in_msg_pos.page_pos = 0; |
1755 | con->in_msg_pos.data_pos = 0; | 1899 | con->in_msg_pos.data_pos = 0; |
1900 | |||
1901 | #ifdef CONFIG_BLOCK | ||
1902 | if (m->bio) | ||
1903 | init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); | ||
1904 | #endif | ||
1756 | } | 1905 | } |
1757 | 1906 | ||
1758 | /* front */ | 1907 | /* front */ |
@@ -1769,10 +1918,6 @@ static int read_partial_message(struct ceph_connection *con) | |||
1769 | if (ret <= 0) | 1918 | if (ret <= 0) |
1770 | return ret; | 1919 | return ret; |
1771 | } | 1920 | } |
1772 | #ifdef CONFIG_BLOCK | ||
1773 | if (m->bio && !m->bio_iter) | ||
1774 | init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); | ||
1775 | #endif | ||
1776 | 1921 | ||
1777 | /* (page) data */ | 1922 | /* (page) data */ |
1778 | while (con->in_msg_pos.data_pos < data_len) { | 1923 | while (con->in_msg_pos.data_pos < data_len) { |
@@ -1783,7 +1928,7 @@ static int read_partial_message(struct ceph_connection *con) | |||
1783 | return ret; | 1928 | return ret; |
1784 | #ifdef CONFIG_BLOCK | 1929 | #ifdef CONFIG_BLOCK |
1785 | } else if (m->bio) { | 1930 | } else if (m->bio) { |
1786 | 1931 | BUG_ON(!m->bio_iter); | |
1787 | ret = read_partial_message_bio(con, | 1932 | ret = read_partial_message_bio(con, |
1788 | &m->bio_iter, &m->bio_seg, | 1933 | &m->bio_iter, &m->bio_seg, |
1789 | data_len, do_datacrc); | 1934 | data_len, do_datacrc); |
@@ -1837,8 +1982,11 @@ static void process_message(struct ceph_connection *con) | |||
1837 | { | 1982 | { |
1838 | struct ceph_msg *msg; | 1983 | struct ceph_msg *msg; |
1839 | 1984 | ||
1985 | BUG_ON(con->in_msg->con != con); | ||
1986 | con->in_msg->con = NULL; | ||
1840 | msg = con->in_msg; | 1987 | msg = con->in_msg; |
1841 | con->in_msg = NULL; | 1988 | con->in_msg = NULL; |
1989 | con->ops->put(con); | ||
1842 | 1990 | ||
1843 | /* if first message, set peer_name */ | 1991 | /* if first message, set peer_name */ |
1844 | if (con->peer_name.type == 0) | 1992 | if (con->peer_name.type == 0) |
@@ -1858,7 +2006,6 @@ static void process_message(struct ceph_connection *con) | |||
1858 | con->ops->dispatch(con, msg); | 2006 | con->ops->dispatch(con, msg); |
1859 | 2007 | ||
1860 | mutex_lock(&con->mutex); | 2008 | mutex_lock(&con->mutex); |
1861 | prepare_read_tag(con); | ||
1862 | } | 2009 | } |
1863 | 2010 | ||
1864 | 2011 | ||
@@ -1870,22 +2017,19 @@ static int try_write(struct ceph_connection *con) | |||
1870 | { | 2017 | { |
1871 | int ret = 1; | 2018 | int ret = 1; |
1872 | 2019 | ||
1873 | dout("try_write start %p state %lu nref %d\n", con, con->state, | 2020 | dout("try_write start %p state %lu\n", con, con->state); |
1874 | atomic_read(&con->nref)); | ||
1875 | 2021 | ||
1876 | more: | 2022 | more: |
1877 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); | 2023 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); |
1878 | 2024 | ||
1879 | /* open the socket first? */ | 2025 | /* open the socket first? */ |
1880 | if (con->sock == NULL) { | 2026 | if (con->state == CON_STATE_PREOPEN) { |
1881 | ceph_con_out_kvec_reset(con); | 2027 | BUG_ON(con->sock); |
2028 | con->state = CON_STATE_CONNECTING; | ||
2029 | |||
2030 | con_out_kvec_reset(con); | ||
1882 | prepare_write_banner(con); | 2031 | prepare_write_banner(con); |
1883 | ret = prepare_write_connect(con); | ||
1884 | if (ret < 0) | ||
1885 | goto out; | ||
1886 | prepare_read_banner(con); | 2032 | prepare_read_banner(con); |
1887 | set_bit(CONNECTING, &con->state); | ||
1888 | clear_bit(NEGOTIATING, &con->state); | ||
1889 | 2033 | ||
1890 | BUG_ON(con->in_msg); | 2034 | BUG_ON(con->in_msg); |
1891 | con->in_tag = CEPH_MSGR_TAG_READY; | 2035 | con->in_tag = CEPH_MSGR_TAG_READY; |
@@ -1932,7 +2076,7 @@ more_kvec: | |||
1932 | } | 2076 | } |
1933 | 2077 | ||
1934 | do_next: | 2078 | do_next: |
1935 | if (!test_bit(CONNECTING, &con->state)) { | 2079 | if (con->state == CON_STATE_OPEN) { |
1936 | /* is anything else pending? */ | 2080 | /* is anything else pending? */ |
1937 | if (!list_empty(&con->out_queue)) { | 2081 | if (!list_empty(&con->out_queue)) { |
1938 | prepare_write_message(con); | 2082 | prepare_write_message(con); |
@@ -1942,14 +2086,15 @@ do_next: | |||
1942 | prepare_write_ack(con); | 2086 | prepare_write_ack(con); |
1943 | goto more; | 2087 | goto more; |
1944 | } | 2088 | } |
1945 | if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { | 2089 | if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING, |
2090 | &con->flags)) { | ||
1946 | prepare_write_keepalive(con); | 2091 | prepare_write_keepalive(con); |
1947 | goto more; | 2092 | goto more; |
1948 | } | 2093 | } |
1949 | } | 2094 | } |
1950 | 2095 | ||
1951 | /* Nothing to do! */ | 2096 | /* Nothing to do! */ |
1952 | clear_bit(WRITE_PENDING, &con->state); | 2097 | clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
1953 | dout("try_write nothing else to write.\n"); | 2098 | dout("try_write nothing else to write.\n"); |
1954 | ret = 0; | 2099 | ret = 0; |
1955 | out: | 2100 | out: |
@@ -1966,38 +2111,42 @@ static int try_read(struct ceph_connection *con) | |||
1966 | { | 2111 | { |
1967 | int ret = -1; | 2112 | int ret = -1; |
1968 | 2113 | ||
1969 | if (!con->sock) | 2114 | more: |
1970 | return 0; | 2115 | dout("try_read start on %p state %lu\n", con, con->state); |
1971 | 2116 | if (con->state != CON_STATE_CONNECTING && | |
1972 | if (test_bit(STANDBY, &con->state)) | 2117 | con->state != CON_STATE_NEGOTIATING && |
2118 | con->state != CON_STATE_OPEN) | ||
1973 | return 0; | 2119 | return 0; |
1974 | 2120 | ||
1975 | dout("try_read start on %p\n", con); | 2121 | BUG_ON(!con->sock); |
1976 | 2122 | ||
1977 | more: | ||
1978 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, | 2123 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
1979 | con->in_base_pos); | 2124 | con->in_base_pos); |
1980 | 2125 | ||
1981 | /* | 2126 | if (con->state == CON_STATE_CONNECTING) { |
1982 | * process_connect and process_message drop and re-take | 2127 | dout("try_read connecting\n"); |
1983 | * con->mutex. make sure we handle a racing close or reopen. | 2128 | ret = read_partial_banner(con); |
1984 | */ | 2129 | if (ret <= 0) |
1985 | if (test_bit(CLOSED, &con->state) || | 2130 | goto out; |
1986 | test_bit(OPENING, &con->state)) { | 2131 | ret = process_banner(con); |
1987 | ret = -EAGAIN; | 2132 | if (ret < 0) |
2133 | goto out; | ||
2134 | |||
2135 | BUG_ON(con->state != CON_STATE_CONNECTING); | ||
2136 | con->state = CON_STATE_NEGOTIATING; | ||
2137 | |||
2138 | /* Banner is good, exchange connection info */ | ||
2139 | ret = prepare_write_connect(con); | ||
2140 | if (ret < 0) | ||
2141 | goto out; | ||
2142 | prepare_read_connect(con); | ||
2143 | |||
2144 | /* Send connection info before awaiting response */ | ||
1988 | goto out; | 2145 | goto out; |
1989 | } | 2146 | } |
1990 | 2147 | ||
1991 | if (test_bit(CONNECTING, &con->state)) { | 2148 | if (con->state == CON_STATE_NEGOTIATING) { |
1992 | if (!test_bit(NEGOTIATING, &con->state)) { | 2149 | dout("try_read negotiating\n"); |
1993 | dout("try_read connecting\n"); | ||
1994 | ret = read_partial_banner(con); | ||
1995 | if (ret <= 0) | ||
1996 | goto out; | ||
1997 | ret = process_banner(con); | ||
1998 | if (ret < 0) | ||
1999 | goto out; | ||
2000 | } | ||
2001 | ret = read_partial_connect(con); | 2150 | ret = read_partial_connect(con); |
2002 | if (ret <= 0) | 2151 | if (ret <= 0) |
2003 | goto out; | 2152 | goto out; |
@@ -2007,6 +2156,8 @@ more: | |||
2007 | goto more; | 2156 | goto more; |
2008 | } | 2157 | } |
2009 | 2158 | ||
2159 | BUG_ON(con->state != CON_STATE_OPEN); | ||
2160 | |||
2010 | if (con->in_base_pos < 0) { | 2161 | if (con->in_base_pos < 0) { |
2011 | /* | 2162 | /* |
2012 | * skipping + discarding content. | 2163 | * skipping + discarding content. |
@@ -2040,7 +2191,8 @@ more: | |||
2040 | prepare_read_ack(con); | 2191 | prepare_read_ack(con); |
2041 | break; | 2192 | break; |
2042 | case CEPH_MSGR_TAG_CLOSE: | 2193 | case CEPH_MSGR_TAG_CLOSE: |
2043 | set_bit(CLOSED, &con->state); /* fixme */ | 2194 | con_close_socket(con); |
2195 | con->state = CON_STATE_CLOSED; | ||
2044 | goto out; | 2196 | goto out; |
2045 | default: | 2197 | default: |
2046 | goto bad_tag; | 2198 | goto bad_tag; |
@@ -2063,6 +2215,8 @@ more: | |||
2063 | if (con->in_tag == CEPH_MSGR_TAG_READY) | 2215 | if (con->in_tag == CEPH_MSGR_TAG_READY) |
2064 | goto more; | 2216 | goto more; |
2065 | process_message(con); | 2217 | process_message(con); |
2218 | if (con->state == CON_STATE_OPEN) | ||
2219 | prepare_read_tag(con); | ||
2066 | goto more; | 2220 | goto more; |
2067 | } | 2221 | } |
2068 | if (con->in_tag == CEPH_MSGR_TAG_ACK) { | 2222 | if (con->in_tag == CEPH_MSGR_TAG_ACK) { |
@@ -2091,12 +2245,6 @@ bad_tag: | |||
2091 | */ | 2245 | */ |
2092 | static void queue_con(struct ceph_connection *con) | 2246 | static void queue_con(struct ceph_connection *con) |
2093 | { | 2247 | { |
2094 | if (test_bit(DEAD, &con->state)) { | ||
2095 | dout("queue_con %p ignoring: DEAD\n", | ||
2096 | con); | ||
2097 | return; | ||
2098 | } | ||
2099 | |||
2100 | if (!con->ops->get(con)) { | 2248 | if (!con->ops->get(con)) { |
2101 | dout("queue_con %p ref count 0\n", con); | 2249 | dout("queue_con %p ref count 0\n", con); |
2102 | return; | 2250 | return; |
@@ -2121,7 +2269,26 @@ static void con_work(struct work_struct *work) | |||
2121 | 2269 | ||
2122 | mutex_lock(&con->mutex); | 2270 | mutex_lock(&con->mutex); |
2123 | restart: | 2271 | restart: |
2124 | if (test_and_clear_bit(BACKOFF, &con->state)) { | 2272 | if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) { |
2273 | switch (con->state) { | ||
2274 | case CON_STATE_CONNECTING: | ||
2275 | con->error_msg = "connection failed"; | ||
2276 | break; | ||
2277 | case CON_STATE_NEGOTIATING: | ||
2278 | con->error_msg = "negotiation failed"; | ||
2279 | break; | ||
2280 | case CON_STATE_OPEN: | ||
2281 | con->error_msg = "socket closed"; | ||
2282 | break; | ||
2283 | default: | ||
2284 | dout("unrecognized con state %d\n", (int)con->state); | ||
2285 | con->error_msg = "unrecognized con state"; | ||
2286 | BUG(); | ||
2287 | } | ||
2288 | goto fault; | ||
2289 | } | ||
2290 | |||
2291 | if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { | ||
2125 | dout("con_work %p backing off\n", con); | 2292 | dout("con_work %p backing off\n", con); |
2126 | if (queue_delayed_work(ceph_msgr_wq, &con->work, | 2293 | if (queue_delayed_work(ceph_msgr_wq, &con->work, |
2127 | round_jiffies_relative(con->delay))) { | 2294 | round_jiffies_relative(con->delay))) { |
@@ -2135,35 +2302,35 @@ restart: | |||
2135 | } | 2302 | } |
2136 | } | 2303 | } |
2137 | 2304 | ||
2138 | if (test_bit(STANDBY, &con->state)) { | 2305 | if (con->state == CON_STATE_STANDBY) { |
2139 | dout("con_work %p STANDBY\n", con); | 2306 | dout("con_work %p STANDBY\n", con); |
2140 | goto done; | 2307 | goto done; |
2141 | } | 2308 | } |
2142 | if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ | 2309 | if (con->state == CON_STATE_CLOSED) { |
2143 | dout("con_work CLOSED\n"); | 2310 | dout("con_work %p CLOSED\n", con); |
2144 | con_close_socket(con); | 2311 | BUG_ON(con->sock); |
2145 | goto done; | 2312 | goto done; |
2146 | } | 2313 | } |
2147 | if (test_and_clear_bit(OPENING, &con->state)) { | 2314 | if (con->state == CON_STATE_PREOPEN) { |
2148 | /* reopen w/ new peer */ | ||
2149 | dout("con_work OPENING\n"); | 2315 | dout("con_work OPENING\n"); |
2150 | con_close_socket(con); | 2316 | BUG_ON(con->sock); |
2151 | } | 2317 | } |
2152 | 2318 | ||
2153 | if (test_and_clear_bit(SOCK_CLOSED, &con->state)) | ||
2154 | goto fault; | ||
2155 | |||
2156 | ret = try_read(con); | 2319 | ret = try_read(con); |
2157 | if (ret == -EAGAIN) | 2320 | if (ret == -EAGAIN) |
2158 | goto restart; | 2321 | goto restart; |
2159 | if (ret < 0) | 2322 | if (ret < 0) { |
2323 | con->error_msg = "socket error on read"; | ||
2160 | goto fault; | 2324 | goto fault; |
2325 | } | ||
2161 | 2326 | ||
2162 | ret = try_write(con); | 2327 | ret = try_write(con); |
2163 | if (ret == -EAGAIN) | 2328 | if (ret == -EAGAIN) |
2164 | goto restart; | 2329 | goto restart; |
2165 | if (ret < 0) | 2330 | if (ret < 0) { |
2331 | con->error_msg = "socket error on write"; | ||
2166 | goto fault; | 2332 | goto fault; |
2333 | } | ||
2167 | 2334 | ||
2168 | done: | 2335 | done: |
2169 | mutex_unlock(&con->mutex); | 2336 | mutex_unlock(&con->mutex); |
@@ -2172,7 +2339,6 @@ done_unlocked: | |||
2172 | return; | 2339 | return; |
2173 | 2340 | ||
2174 | fault: | 2341 | fault: |
2175 | mutex_unlock(&con->mutex); | ||
2176 | ceph_fault(con); /* error/fault path */ | 2342 | ceph_fault(con); /* error/fault path */ |
2177 | goto done_unlocked; | 2343 | goto done_unlocked; |
2178 | } | 2344 | } |
@@ -2183,26 +2349,31 @@ fault: | |||
2183 | * exponential backoff | 2349 | * exponential backoff |
2184 | */ | 2350 | */ |
2185 | static void ceph_fault(struct ceph_connection *con) | 2351 | static void ceph_fault(struct ceph_connection *con) |
2352 | __releases(con->mutex) | ||
2186 | { | 2353 | { |
2187 | pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), | 2354 | pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), |
2188 | ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); | 2355 | ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); |
2189 | dout("fault %p state %lu to peer %s\n", | 2356 | dout("fault %p state %lu to peer %s\n", |
2190 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); | 2357 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); |
2191 | 2358 | ||
2192 | if (test_bit(LOSSYTX, &con->state)) { | 2359 | BUG_ON(con->state != CON_STATE_CONNECTING && |
2193 | dout("fault on LOSSYTX channel\n"); | 2360 | con->state != CON_STATE_NEGOTIATING && |
2194 | goto out; | 2361 | con->state != CON_STATE_OPEN); |
2195 | } | ||
2196 | |||
2197 | mutex_lock(&con->mutex); | ||
2198 | if (test_bit(CLOSED, &con->state)) | ||
2199 | goto out_unlock; | ||
2200 | 2362 | ||
2201 | con_close_socket(con); | 2363 | con_close_socket(con); |
2202 | 2364 | ||
2365 | if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) { | ||
2366 | dout("fault on LOSSYTX channel, marking CLOSED\n"); | ||
2367 | con->state = CON_STATE_CLOSED; | ||
2368 | goto out_unlock; | ||
2369 | } | ||
2370 | |||
2203 | if (con->in_msg) { | 2371 | if (con->in_msg) { |
2372 | BUG_ON(con->in_msg->con != con); | ||
2373 | con->in_msg->con = NULL; | ||
2204 | ceph_msg_put(con->in_msg); | 2374 | ceph_msg_put(con->in_msg); |
2205 | con->in_msg = NULL; | 2375 | con->in_msg = NULL; |
2376 | con->ops->put(con); | ||
2206 | } | 2377 | } |
2207 | 2378 | ||
2208 | /* Requeue anything that hasn't been acked */ | 2379 | /* Requeue anything that hasn't been acked */ |
@@ -2211,12 +2382,13 @@ static void ceph_fault(struct ceph_connection *con) | |||
2211 | /* If there are no messages queued or keepalive pending, place | 2382 | /* If there are no messages queued or keepalive pending, place |
2212 | * the connection in a STANDBY state */ | 2383 | * the connection in a STANDBY state */ |
2213 | if (list_empty(&con->out_queue) && | 2384 | if (list_empty(&con->out_queue) && |
2214 | !test_bit(KEEPALIVE_PENDING, &con->state)) { | 2385 | !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) { |
2215 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); | 2386 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); |
2216 | clear_bit(WRITE_PENDING, &con->state); | 2387 | clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); |
2217 | set_bit(STANDBY, &con->state); | 2388 | con->state = CON_STATE_STANDBY; |
2218 | } else { | 2389 | } else { |
2219 | /* retry after a delay. */ | 2390 | /* retry after a delay. */ |
2391 | con->state = CON_STATE_PREOPEN; | ||
2220 | if (con->delay == 0) | 2392 | if (con->delay == 0) |
2221 | con->delay = BASE_DELAY_INTERVAL; | 2393 | con->delay = BASE_DELAY_INTERVAL; |
2222 | else if (con->delay < MAX_DELAY_INTERVAL) | 2394 | else if (con->delay < MAX_DELAY_INTERVAL) |
@@ -2237,13 +2409,12 @@ static void ceph_fault(struct ceph_connection *con) | |||
2237 | * that when con_work restarts we schedule the | 2409 | * that when con_work restarts we schedule the |
2238 | * delay then. | 2410 | * delay then. |
2239 | */ | 2411 | */ |
2240 | set_bit(BACKOFF, &con->state); | 2412 | set_bit(CON_FLAG_BACKOFF, &con->flags); |
2241 | } | 2413 | } |
2242 | } | 2414 | } |
2243 | 2415 | ||
2244 | out_unlock: | 2416 | out_unlock: |
2245 | mutex_unlock(&con->mutex); | 2417 | mutex_unlock(&con->mutex); |
2246 | out: | ||
2247 | /* | 2418 | /* |
2248 | * in case we faulted due to authentication, invalidate our | 2419 | * in case we faulted due to authentication, invalidate our |
2249 | * current tickets so that we can get new ones. | 2420 | * current tickets so that we can get new ones. |
@@ -2260,18 +2431,14 @@ out: | |||
2260 | 2431 | ||
2261 | 2432 | ||
2262 | /* | 2433 | /* |
2263 | * create a new messenger instance | 2434 | * initialize a new messenger instance |
2264 | */ | 2435 | */ |
2265 | struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, | 2436 | void ceph_messenger_init(struct ceph_messenger *msgr, |
2266 | u32 supported_features, | 2437 | struct ceph_entity_addr *myaddr, |
2267 | u32 required_features) | 2438 | u32 supported_features, |
2439 | u32 required_features, | ||
2440 | bool nocrc) | ||
2268 | { | 2441 | { |
2269 | struct ceph_messenger *msgr; | ||
2270 | |||
2271 | msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); | ||
2272 | if (msgr == NULL) | ||
2273 | return ERR_PTR(-ENOMEM); | ||
2274 | |||
2275 | msgr->supported_features = supported_features; | 2442 | msgr->supported_features = supported_features; |
2276 | msgr->required_features = required_features; | 2443 | msgr->required_features = required_features; |
2277 | 2444 | ||
@@ -2284,30 +2451,23 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, | |||
2284 | msgr->inst.addr.type = 0; | 2451 | msgr->inst.addr.type = 0; |
2285 | get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); | 2452 | get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); |
2286 | encode_my_addr(msgr); | 2453 | encode_my_addr(msgr); |
2454 | msgr->nocrc = nocrc; | ||
2287 | 2455 | ||
2288 | dout("messenger_create %p\n", msgr); | 2456 | atomic_set(&msgr->stopping, 0); |
2289 | return msgr; | ||
2290 | } | ||
2291 | EXPORT_SYMBOL(ceph_messenger_create); | ||
2292 | 2457 | ||
2293 | void ceph_messenger_destroy(struct ceph_messenger *msgr) | 2458 | dout("%s %p\n", __func__, msgr); |
2294 | { | ||
2295 | dout("destroy %p\n", msgr); | ||
2296 | kfree(msgr); | ||
2297 | dout("destroyed messenger %p\n", msgr); | ||
2298 | } | 2459 | } |
2299 | EXPORT_SYMBOL(ceph_messenger_destroy); | 2460 | EXPORT_SYMBOL(ceph_messenger_init); |
2300 | 2461 | ||
2301 | static void clear_standby(struct ceph_connection *con) | 2462 | static void clear_standby(struct ceph_connection *con) |
2302 | { | 2463 | { |
2303 | /* come back from STANDBY? */ | 2464 | /* come back from STANDBY? */ |
2304 | if (test_and_clear_bit(STANDBY, &con->state)) { | 2465 | if (con->state == CON_STATE_STANDBY) { |
2305 | mutex_lock(&con->mutex); | ||
2306 | dout("clear_standby %p and ++connect_seq\n", con); | 2466 | dout("clear_standby %p and ++connect_seq\n", con); |
2467 | con->state = CON_STATE_PREOPEN; | ||
2307 | con->connect_seq++; | 2468 | con->connect_seq++; |
2308 | WARN_ON(test_bit(WRITE_PENDING, &con->state)); | 2469 | WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags)); |
2309 | WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); | 2470 | WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)); |
2310 | mutex_unlock(&con->mutex); | ||
2311 | } | 2471 | } |
2312 | } | 2472 | } |
2313 | 2473 | ||
@@ -2316,21 +2476,24 @@ static void clear_standby(struct ceph_connection *con) | |||
2316 | */ | 2476 | */ |
2317 | void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | 2477 | void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) |
2318 | { | 2478 | { |
2319 | if (test_bit(CLOSED, &con->state)) { | ||
2320 | dout("con_send %p closed, dropping %p\n", con, msg); | ||
2321 | ceph_msg_put(msg); | ||
2322 | return; | ||
2323 | } | ||
2324 | |||
2325 | /* set src+dst */ | 2479 | /* set src+dst */ |
2326 | msg->hdr.src = con->msgr->inst.name; | 2480 | msg->hdr.src = con->msgr->inst.name; |
2327 | |||
2328 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); | 2481 | BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); |
2329 | |||
2330 | msg->needs_out_seq = true; | 2482 | msg->needs_out_seq = true; |
2331 | 2483 | ||
2332 | /* queue */ | ||
2333 | mutex_lock(&con->mutex); | 2484 | mutex_lock(&con->mutex); |
2485 | |||
2486 | if (con->state == CON_STATE_CLOSED) { | ||
2487 | dout("con_send %p closed, dropping %p\n", con, msg); | ||
2488 | ceph_msg_put(msg); | ||
2489 | mutex_unlock(&con->mutex); | ||
2490 | return; | ||
2491 | } | ||
2492 | |||
2493 | BUG_ON(msg->con != NULL); | ||
2494 | msg->con = con->ops->get(con); | ||
2495 | BUG_ON(msg->con == NULL); | ||
2496 | |||
2334 | BUG_ON(!list_empty(&msg->list_head)); | 2497 | BUG_ON(!list_empty(&msg->list_head)); |
2335 | list_add_tail(&msg->list_head, &con->out_queue); | 2498 | list_add_tail(&msg->list_head, &con->out_queue); |
2336 | dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, | 2499 | dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, |
@@ -2339,12 +2502,13 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
2339 | le32_to_cpu(msg->hdr.front_len), | 2502 | le32_to_cpu(msg->hdr.front_len), |
2340 | le32_to_cpu(msg->hdr.middle_len), | 2503 | le32_to_cpu(msg->hdr.middle_len), |
2341 | le32_to_cpu(msg->hdr.data_len)); | 2504 | le32_to_cpu(msg->hdr.data_len)); |
2505 | |||
2506 | clear_standby(con); | ||
2342 | mutex_unlock(&con->mutex); | 2507 | mutex_unlock(&con->mutex); |
2343 | 2508 | ||
2344 | /* if there wasn't anything waiting to send before, queue | 2509 | /* if there wasn't anything waiting to send before, queue |
2345 | * new work */ | 2510 | * new work */ |
2346 | clear_standby(con); | 2511 | if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) |
2347 | if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) | ||
2348 | queue_con(con); | 2512 | queue_con(con); |
2349 | } | 2513 | } |
2350 | EXPORT_SYMBOL(ceph_con_send); | 2514 | EXPORT_SYMBOL(ceph_con_send); |
@@ -2352,24 +2516,34 @@ EXPORT_SYMBOL(ceph_con_send); | |||
2352 | /* | 2516 | /* |
2353 | * Revoke a message that was previously queued for send | 2517 | * Revoke a message that was previously queued for send |
2354 | */ | 2518 | */ |
2355 | void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) | 2519 | void ceph_msg_revoke(struct ceph_msg *msg) |
2356 | { | 2520 | { |
2521 | struct ceph_connection *con = msg->con; | ||
2522 | |||
2523 | if (!con) | ||
2524 | return; /* Message not in our possession */ | ||
2525 | |||
2357 | mutex_lock(&con->mutex); | 2526 | mutex_lock(&con->mutex); |
2358 | if (!list_empty(&msg->list_head)) { | 2527 | if (!list_empty(&msg->list_head)) { |
2359 | dout("con_revoke %p msg %p - was on queue\n", con, msg); | 2528 | dout("%s %p msg %p - was on queue\n", __func__, con, msg); |
2360 | list_del_init(&msg->list_head); | 2529 | list_del_init(&msg->list_head); |
2361 | ceph_msg_put(msg); | 2530 | BUG_ON(msg->con == NULL); |
2531 | msg->con->ops->put(msg->con); | ||
2532 | msg->con = NULL; | ||
2362 | msg->hdr.seq = 0; | 2533 | msg->hdr.seq = 0; |
2534 | |||
2535 | ceph_msg_put(msg); | ||
2363 | } | 2536 | } |
2364 | if (con->out_msg == msg) { | 2537 | if (con->out_msg == msg) { |
2365 | dout("con_revoke %p msg %p - was sending\n", con, msg); | 2538 | dout("%s %p msg %p - was sending\n", __func__, con, msg); |
2366 | con->out_msg = NULL; | 2539 | con->out_msg = NULL; |
2367 | if (con->out_kvec_is_msg) { | 2540 | if (con->out_kvec_is_msg) { |
2368 | con->out_skip = con->out_kvec_bytes; | 2541 | con->out_skip = con->out_kvec_bytes; |
2369 | con->out_kvec_is_msg = false; | 2542 | con->out_kvec_is_msg = false; |
2370 | } | 2543 | } |
2371 | ceph_msg_put(msg); | ||
2372 | msg->hdr.seq = 0; | 2544 | msg->hdr.seq = 0; |
2545 | |||
2546 | ceph_msg_put(msg); | ||
2373 | } | 2547 | } |
2374 | mutex_unlock(&con->mutex); | 2548 | mutex_unlock(&con->mutex); |
2375 | } | 2549 | } |
@@ -2377,17 +2551,27 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) | |||
2377 | /* | 2551 | /* |
2378 | * Revoke a message that we may be reading data into | 2552 | * Revoke a message that we may be reading data into |
2379 | */ | 2553 | */ |
2380 | void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) | 2554 | void ceph_msg_revoke_incoming(struct ceph_msg *msg) |
2381 | { | 2555 | { |
2556 | struct ceph_connection *con; | ||
2557 | |||
2558 | BUG_ON(msg == NULL); | ||
2559 | if (!msg->con) { | ||
2560 | dout("%s msg %p null con\n", __func__, msg); | ||
2561 | |||
2562 | return; /* Message not in our possession */ | ||
2563 | } | ||
2564 | |||
2565 | con = msg->con; | ||
2382 | mutex_lock(&con->mutex); | 2566 | mutex_lock(&con->mutex); |
2383 | if (con->in_msg && con->in_msg == msg) { | 2567 | if (con->in_msg == msg) { |
2384 | unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); | 2568 | unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); |
2385 | unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); | 2569 | unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); |
2386 | unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); | 2570 | unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); |
2387 | 2571 | ||
2388 | /* skip rest of message */ | 2572 | /* skip rest of message */ |
2389 | dout("con_revoke_pages %p msg %p revoked\n", con, msg); | 2573 | dout("%s %p msg %p revoked\n", __func__, con, msg); |
2390 | con->in_base_pos = con->in_base_pos - | 2574 | con->in_base_pos = con->in_base_pos - |
2391 | sizeof(struct ceph_msg_header) - | 2575 | sizeof(struct ceph_msg_header) - |
2392 | front_len - | 2576 | front_len - |
2393 | middle_len - | 2577 | middle_len - |
@@ -2398,8 +2582,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) | |||
2398 | con->in_tag = CEPH_MSGR_TAG_READY; | 2582 | con->in_tag = CEPH_MSGR_TAG_READY; |
2399 | con->in_seq++; | 2583 | con->in_seq++; |
2400 | } else { | 2584 | } else { |
2401 | dout("con_revoke_pages %p msg %p pages %p no-op\n", | 2585 | dout("%s %p in_msg %p msg %p no-op\n", |
2402 | con, con->in_msg, msg); | 2586 | __func__, con, con->in_msg, msg); |
2403 | } | 2587 | } |
2404 | mutex_unlock(&con->mutex); | 2588 | mutex_unlock(&con->mutex); |
2405 | } | 2589 | } |
@@ -2410,9 +2594,11 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) | |||
2410 | void ceph_con_keepalive(struct ceph_connection *con) | 2594 | void ceph_con_keepalive(struct ceph_connection *con) |
2411 | { | 2595 | { |
2412 | dout("con_keepalive %p\n", con); | 2596 | dout("con_keepalive %p\n", con); |
2597 | mutex_lock(&con->mutex); | ||
2413 | clear_standby(con); | 2598 | clear_standby(con); |
2414 | if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && | 2599 | mutex_unlock(&con->mutex); |
2415 | test_and_set_bit(WRITE_PENDING, &con->state) == 0) | 2600 | if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 && |
2601 | test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) | ||
2416 | queue_con(con); | 2602 | queue_con(con); |
2417 | } | 2603 | } |
2418 | EXPORT_SYMBOL(ceph_con_keepalive); | 2604 | EXPORT_SYMBOL(ceph_con_keepalive); |
@@ -2431,6 +2617,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, | |||
2431 | if (m == NULL) | 2617 | if (m == NULL) |
2432 | goto out; | 2618 | goto out; |
2433 | kref_init(&m->kref); | 2619 | kref_init(&m->kref); |
2620 | |||
2621 | m->con = NULL; | ||
2434 | INIT_LIST_HEAD(&m->list_head); | 2622 | INIT_LIST_HEAD(&m->list_head); |
2435 | 2623 | ||
2436 | m->hdr.tid = 0; | 2624 | m->hdr.tid = 0; |
@@ -2526,46 +2714,77 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) | |||
2526 | } | 2714 | } |
2527 | 2715 | ||
2528 | /* | 2716 | /* |
2529 | * Generic message allocator, for incoming messages. | 2717 | * Allocate a message for receiving an incoming message on a |
2718 | * connection, and save the result in con->in_msg. Uses the | ||
2719 | * connection's private alloc_msg op if available. | ||
2720 | * | ||
2721 | * Returns 0 on success, or a negative error code. | ||
2722 | * | ||
2723 | * On success, if we set *skip = 1: | ||
2724 | * - the next message should be skipped and ignored. | ||
2725 | * - con->in_msg == NULL | ||
2726 | * or if we set *skip = 0: | ||
2727 | * - con->in_msg is non-null. | ||
2728 | * On error (ENOMEM, EAGAIN, ...), | ||
2729 | * - con->in_msg == NULL | ||
2530 | */ | 2730 | */ |
2531 | static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, | 2731 | static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) |
2532 | struct ceph_msg_header *hdr, | ||
2533 | int *skip) | ||
2534 | { | 2732 | { |
2733 | struct ceph_msg_header *hdr = &con->in_hdr; | ||
2535 | int type = le16_to_cpu(hdr->type); | 2734 | int type = le16_to_cpu(hdr->type); |
2536 | int front_len = le32_to_cpu(hdr->front_len); | 2735 | int front_len = le32_to_cpu(hdr->front_len); |
2537 | int middle_len = le32_to_cpu(hdr->middle_len); | 2736 | int middle_len = le32_to_cpu(hdr->middle_len); |
2538 | struct ceph_msg *msg = NULL; | 2737 | int ret = 0; |
2539 | int ret; | 2738 | |
2739 | BUG_ON(con->in_msg != NULL); | ||
2540 | 2740 | ||
2541 | if (con->ops->alloc_msg) { | 2741 | if (con->ops->alloc_msg) { |
2742 | struct ceph_msg *msg; | ||
2743 | |||
2542 | mutex_unlock(&con->mutex); | 2744 | mutex_unlock(&con->mutex); |
2543 | msg = con->ops->alloc_msg(con, hdr, skip); | 2745 | msg = con->ops->alloc_msg(con, hdr, skip); |
2544 | mutex_lock(&con->mutex); | 2746 | mutex_lock(&con->mutex); |
2545 | if (!msg || *skip) | 2747 | if (con->state != CON_STATE_OPEN) { |
2546 | return NULL; | 2748 | ceph_msg_put(msg); |
2749 | return -EAGAIN; | ||
2750 | } | ||
2751 | con->in_msg = msg; | ||
2752 | if (con->in_msg) { | ||
2753 | con->in_msg->con = con->ops->get(con); | ||
2754 | BUG_ON(con->in_msg->con == NULL); | ||
2755 | } | ||
2756 | if (*skip) { | ||
2757 | con->in_msg = NULL; | ||
2758 | return 0; | ||
2759 | } | ||
2760 | if (!con->in_msg) { | ||
2761 | con->error_msg = | ||
2762 | "error allocating memory for incoming message"; | ||
2763 | return -ENOMEM; | ||
2764 | } | ||
2547 | } | 2765 | } |
2548 | if (!msg) { | 2766 | if (!con->in_msg) { |
2549 | *skip = 0; | 2767 | con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false); |
2550 | msg = ceph_msg_new(type, front_len, GFP_NOFS, false); | 2768 | if (!con->in_msg) { |
2551 | if (!msg) { | ||
2552 | pr_err("unable to allocate msg type %d len %d\n", | 2769 | pr_err("unable to allocate msg type %d len %d\n", |
2553 | type, front_len); | 2770 | type, front_len); |
2554 | return NULL; | 2771 | return -ENOMEM; |
2555 | } | 2772 | } |
2556 | msg->page_alignment = le16_to_cpu(hdr->data_off); | 2773 | con->in_msg->con = con->ops->get(con); |
2774 | BUG_ON(con->in_msg->con == NULL); | ||
2775 | con->in_msg->page_alignment = le16_to_cpu(hdr->data_off); | ||
2557 | } | 2776 | } |
2558 | memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); | 2777 | memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); |
2559 | 2778 | ||
2560 | if (middle_len && !msg->middle) { | 2779 | if (middle_len && !con->in_msg->middle) { |
2561 | ret = ceph_alloc_middle(con, msg); | 2780 | ret = ceph_alloc_middle(con, con->in_msg); |
2562 | if (ret < 0) { | 2781 | if (ret < 0) { |
2563 | ceph_msg_put(msg); | 2782 | ceph_msg_put(con->in_msg); |
2564 | return NULL; | 2783 | con->in_msg = NULL; |
2565 | } | 2784 | } |
2566 | } | 2785 | } |
2567 | 2786 | ||
2568 | return msg; | 2787 | return ret; |
2569 | } | 2788 | } |
2570 | 2789 | ||
2571 | 2790 | ||