aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-07-31 17:35:28 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2012-07-31 17:35:28 -0400
commitcc8362b1f6d724e46f515121d442779924b19fec (patch)
tree86fb5c3767e538ec9ded57dd7b3ce5d69dcde691 /net
parent2e3ee613480563a6d5c01b57d342e65cc58c06df (diff)
parent1fe5e9932156f6122c3b1ff6ba7541c27c86718c (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil: "Lots of stuff this time around: - lots of cleanup and refactoring in the libceph messenger code, and many hard to hit races and bugs closed as a result. - lots of cleanup and refactoring in the rbd code from Alex Elder, mostly in preparation for the layering functionality that will be coming in 3.7. - some misc rbd cleanups from Josh Durgin that are finally going upstream - support for CRUSH tunables (used by newer clusters to improve the data placement) - some cleanup in our use of d_parent that Al brought up a while back - a random collection of fixes across the tree There is another patch coming that fixes up our ->atomic_open() behavior, but I'm going to hammer on it a bit more before sending it." Fix up conflicts due to commits that were already committed earlier in drivers/block/rbd.c, net/ceph/{messenger.c, osd_client.c} * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (132 commits) rbd: create rbd_refresh_helper() rbd: return obj version in __rbd_refresh_header() rbd: fixes in rbd_header_from_disk() rbd: always pass ops array to rbd_req_sync_op() rbd: pass null version pointer in add_snap() rbd: make rbd_create_rw_ops() return a pointer rbd: have __rbd_add_snap_dev() return a pointer libceph: recheck con state after allocating incoming message libceph: change ceph_con_in_msg_alloc convention to be less weird libceph: avoid dropping con mutex before fault libceph: verify state after retaking con lock after dispatch libceph: revoke mon_client messages on session restart libceph: fix handling of immediate socket connect failure ceph: update MAINTAINERS file libceph: be less chatty about stray replies libceph: clear all flags on con_close libceph: clean up con flags libceph: replace connection state bits with states libceph: drop unnecessary CLOSED check in socket state change callback libceph: close socket directly from ceph_con_close() ...
Diffstat (limited to 'net')
-rw-r--r--net/ceph/ceph_common.c25
-rw-r--r--net/ceph/crush/mapper.c13
-rw-r--r--net/ceph/messenger.c925
-rw-r--r--net/ceph/mon_client.c76
-rw-r--r--net/ceph/msgpool.c7
-rw-r--r--net/ceph/osd_client.c77
-rw-r--r--net/ceph/osdmap.c59
7 files changed, 737 insertions, 445 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index ba4323bce0e9..69e38db28e5f 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -17,6 +17,7 @@
17#include <linux/string.h> 17#include <linux/string.h>
18 18
19 19
20#include <linux/ceph/ceph_features.h>
20#include <linux/ceph/libceph.h> 21#include <linux/ceph/libceph.h>
21#include <linux/ceph/debugfs.h> 22#include <linux/ceph/debugfs.h>
22#include <linux/ceph/decode.h> 23#include <linux/ceph/decode.h>
@@ -460,27 +461,23 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
460 client->auth_err = 0; 461 client->auth_err = 0;
461 462
462 client->extra_mon_dispatch = NULL; 463 client->extra_mon_dispatch = NULL;
463 client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT | 464 client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
464 supported_features; 465 supported_features;
465 client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT | 466 client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT |
466 required_features; 467 required_features;
467 468
468 /* msgr */ 469 /* msgr */
469 if (ceph_test_opt(client, MYIP)) 470 if (ceph_test_opt(client, MYIP))
470 myaddr = &client->options->my_addr; 471 myaddr = &client->options->my_addr;
471 client->msgr = ceph_messenger_create(myaddr, 472 ceph_messenger_init(&client->msgr, myaddr,
472 client->supported_features, 473 client->supported_features,
473 client->required_features); 474 client->required_features,
474 if (IS_ERR(client->msgr)) { 475 ceph_test_opt(client, NOCRC));
475 err = PTR_ERR(client->msgr);
476 goto fail;
477 }
478 client->msgr->nocrc = ceph_test_opt(client, NOCRC);
479 476
480 /* subsystems */ 477 /* subsystems */
481 err = ceph_monc_init(&client->monc, client); 478 err = ceph_monc_init(&client->monc, client);
482 if (err < 0) 479 if (err < 0)
483 goto fail_msgr; 480 goto fail;
484 err = ceph_osdc_init(&client->osdc, client); 481 err = ceph_osdc_init(&client->osdc, client);
485 if (err < 0) 482 if (err < 0)
486 goto fail_monc; 483 goto fail_monc;
@@ -489,8 +486,6 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
489 486
490fail_monc: 487fail_monc:
491 ceph_monc_stop(&client->monc); 488 ceph_monc_stop(&client->monc);
492fail_msgr:
493 ceph_messenger_destroy(client->msgr);
494fail: 489fail:
495 kfree(client); 490 kfree(client);
496 return ERR_PTR(err); 491 return ERR_PTR(err);
@@ -501,6 +496,8 @@ void ceph_destroy_client(struct ceph_client *client)
501{ 496{
502 dout("destroy_client %p\n", client); 497 dout("destroy_client %p\n", client);
503 498
499 atomic_set(&client->msgr.stopping, 1);
500
504 /* unmount */ 501 /* unmount */
505 ceph_osdc_stop(&client->osdc); 502 ceph_osdc_stop(&client->osdc);
506 503
@@ -508,8 +505,6 @@ void ceph_destroy_client(struct ceph_client *client)
508 505
509 ceph_debugfs_client_cleanup(client); 506 ceph_debugfs_client_cleanup(client);
510 507
511 ceph_messenger_destroy(client->msgr);
512
513 ceph_destroy_options(client->options); 508 ceph_destroy_options(client->options);
514 509
515 kfree(client); 510 kfree(client);
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index d7edc24333b8..35fce755ce10 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -306,7 +306,6 @@ static int crush_choose(const struct crush_map *map,
306 int item = 0; 306 int item = 0;
307 int itemtype; 307 int itemtype;
308 int collide, reject; 308 int collide, reject;
309 const unsigned int orig_tries = 5; /* attempts before we fall back to search */
310 309
311 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "", 310 dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
312 bucket->id, x, outpos, numrep); 311 bucket->id, x, outpos, numrep);
@@ -351,8 +350,9 @@ static int crush_choose(const struct crush_map *map,
351 reject = 1; 350 reject = 1;
352 goto reject; 351 goto reject;
353 } 352 }
354 if (flocal >= (in->size>>1) && 353 if (map->choose_local_fallback_tries > 0 &&
355 flocal > orig_tries) 354 flocal >= (in->size>>1) &&
355 flocal > map->choose_local_fallback_tries)
356 item = bucket_perm_choose(in, x, r); 356 item = bucket_perm_choose(in, x, r);
357 else 357 else
358 item = crush_bucket_choose(in, x, r); 358 item = crush_bucket_choose(in, x, r);
@@ -422,13 +422,14 @@ reject:
422 ftotal++; 422 ftotal++;
423 flocal++; 423 flocal++;
424 424
425 if (collide && flocal < 3) 425 if (collide && flocal <= map->choose_local_tries)
426 /* retry locally a few times */ 426 /* retry locally a few times */
427 retry_bucket = 1; 427 retry_bucket = 1;
428 else if (flocal <= in->size + orig_tries) 428 else if (map->choose_local_fallback_tries > 0 &&
429 flocal <= in->size + map->choose_local_fallback_tries)
429 /* exhaustive bucket search */ 430 /* exhaustive bucket search */
430 retry_bucket = 1; 431 retry_bucket = 1;
431 else if (ftotal < 20) 432 else if (ftotal <= map->choose_total_tries)
432 /* then retry descent */ 433 /* then retry descent */
433 retry_descent = 1; 434 retry_descent = 1;
434 else 435 else
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 10255e81be79..b9796750034a 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -29,6 +29,74 @@
29 * the sender. 29 * the sender.
30 */ 30 */
31 31
32/*
33 * We track the state of the socket on a given connection using
34 * values defined below. The transition to a new socket state is
35 * handled by a function which verifies we aren't coming from an
36 * unexpected state.
37 *
38 * --------
39 * | NEW* | transient initial state
40 * --------
41 * | con_sock_state_init()
42 * v
43 * ----------
44 * | CLOSED | initialized, but no socket (and no
45 * ---------- TCP connection)
46 * ^ \
47 * | \ con_sock_state_connecting()
48 * | ----------------------
49 * | \
50 * + con_sock_state_closed() \
51 * |+--------------------------- \
52 * | \ \ \
53 * | ----------- \ \
54 * | | CLOSING | socket event; \ \
55 * | ----------- await close \ \
56 * | ^ \ |
57 * | | \ |
58 * | + con_sock_state_closing() \ |
59 * | / \ | |
60 * | / --------------- | |
61 * | / \ v v
62 * | / --------------
63 * | / -----------------| CONNECTING | socket created, TCP
64 * | | / -------------- connect initiated
65 * | | | con_sock_state_connected()
66 * | | v
67 * -------------
68 * | CONNECTED | TCP connection established
69 * -------------
70 *
71 * State values for ceph_connection->sock_state; NEW is assumed to be 0.
72 */
73
74#define CON_SOCK_STATE_NEW 0 /* -> CLOSED */
75#define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */
76#define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */
77#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
78#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */
79
80/*
81 * connection states
82 */
83#define CON_STATE_CLOSED 1 /* -> PREOPEN */
84#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */
85#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */
86#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */
87#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */
88#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */
89
90/*
91 * ceph_connection flag bits
92 */
93#define CON_FLAG_LOSSYTX 0 /* we can close channel or drop
94 * messages on errors */
95#define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */
96#define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */
97#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */
98#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */
99
32/* static tag bytes (protocol control messages) */ 100/* static tag bytes (protocol control messages) */
33static char tag_msg = CEPH_MSGR_TAG_MSG; 101static char tag_msg = CEPH_MSGR_TAG_MSG;
34static char tag_ack = CEPH_MSGR_TAG_ACK; 102static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -147,72 +215,130 @@ void ceph_msgr_flush(void)
147} 215}
148EXPORT_SYMBOL(ceph_msgr_flush); 216EXPORT_SYMBOL(ceph_msgr_flush);
149 217
218/* Connection socket state transition functions */
219
220static void con_sock_state_init(struct ceph_connection *con)
221{
222 int old_state;
223
224 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
225 if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
226 printk("%s: unexpected old state %d\n", __func__, old_state);
227 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
228 CON_SOCK_STATE_CLOSED);
229}
230
231static void con_sock_state_connecting(struct ceph_connection *con)
232{
233 int old_state;
234
235 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
236 if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
237 printk("%s: unexpected old state %d\n", __func__, old_state);
238 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
239 CON_SOCK_STATE_CONNECTING);
240}
241
242static void con_sock_state_connected(struct ceph_connection *con)
243{
244 int old_state;
245
246 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
247 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
248 printk("%s: unexpected old state %d\n", __func__, old_state);
249 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
250 CON_SOCK_STATE_CONNECTED);
251}
252
253static void con_sock_state_closing(struct ceph_connection *con)
254{
255 int old_state;
256
257 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
258 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
259 old_state != CON_SOCK_STATE_CONNECTED &&
260 old_state != CON_SOCK_STATE_CLOSING))
261 printk("%s: unexpected old state %d\n", __func__, old_state);
262 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
263 CON_SOCK_STATE_CLOSING);
264}
265
266static void con_sock_state_closed(struct ceph_connection *con)
267{
268 int old_state;
269
270 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
271 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
272 old_state != CON_SOCK_STATE_CLOSING &&
273 old_state != CON_SOCK_STATE_CONNECTING &&
274 old_state != CON_SOCK_STATE_CLOSED))
275 printk("%s: unexpected old state %d\n", __func__, old_state);
276 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
277 CON_SOCK_STATE_CLOSED);
278}
150 279
151/* 280/*
152 * socket callback functions 281 * socket callback functions
153 */ 282 */
154 283
155/* data available on socket, or listen socket received a connect */ 284/* data available on socket, or listen socket received a connect */
156static void ceph_data_ready(struct sock *sk, int count_unused) 285static void ceph_sock_data_ready(struct sock *sk, int count_unused)
157{ 286{
158 struct ceph_connection *con = sk->sk_user_data; 287 struct ceph_connection *con = sk->sk_user_data;
288 if (atomic_read(&con->msgr->stopping)) {
289 return;
290 }
159 291
160 if (sk->sk_state != TCP_CLOSE_WAIT) { 292 if (sk->sk_state != TCP_CLOSE_WAIT) {
161 dout("ceph_data_ready on %p state = %lu, queueing work\n", 293 dout("%s on %p state = %lu, queueing work\n", __func__,
162 con, con->state); 294 con, con->state);
163 queue_con(con); 295 queue_con(con);
164 } 296 }
165} 297}
166 298
167/* socket has buffer space for writing */ 299/* socket has buffer space for writing */
168static void ceph_write_space(struct sock *sk) 300static void ceph_sock_write_space(struct sock *sk)
169{ 301{
170 struct ceph_connection *con = sk->sk_user_data; 302 struct ceph_connection *con = sk->sk_user_data;
171 303
172 /* only queue to workqueue if there is data we want to write, 304 /* only queue to workqueue if there is data we want to write,
173 * and there is sufficient space in the socket buffer to accept 305 * and there is sufficient space in the socket buffer to accept
174 * more data. clear SOCK_NOSPACE so that ceph_write_space() 306 * more data. clear SOCK_NOSPACE so that ceph_sock_write_space()
175 * doesn't get called again until try_write() fills the socket 307 * doesn't get called again until try_write() fills the socket
176 * buffer. See net/ipv4/tcp_input.c:tcp_check_space() 308 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
177 * and net/core/stream.c:sk_stream_write_space(). 309 * and net/core/stream.c:sk_stream_write_space().
178 */ 310 */
179 if (test_bit(WRITE_PENDING, &con->state)) { 311 if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
180 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { 312 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
181 dout("ceph_write_space %p queueing write work\n", con); 313 dout("%s %p queueing write work\n", __func__, con);
182 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 314 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
183 queue_con(con); 315 queue_con(con);
184 } 316 }
185 } else { 317 } else {
186 dout("ceph_write_space %p nothing to write\n", con); 318 dout("%s %p nothing to write\n", __func__, con);
187 } 319 }
188} 320}
189 321
190/* socket's state has changed */ 322/* socket's state has changed */
191static void ceph_state_change(struct sock *sk) 323static void ceph_sock_state_change(struct sock *sk)
192{ 324{
193 struct ceph_connection *con = sk->sk_user_data; 325 struct ceph_connection *con = sk->sk_user_data;
194 326
195 dout("ceph_state_change %p state = %lu sk_state = %u\n", 327 dout("%s %p state = %lu sk_state = %u\n", __func__,
196 con, con->state, sk->sk_state); 328 con, con->state, sk->sk_state);
197 329
198 if (test_bit(CLOSED, &con->state))
199 return;
200
201 switch (sk->sk_state) { 330 switch (sk->sk_state) {
202 case TCP_CLOSE: 331 case TCP_CLOSE:
203 dout("ceph_state_change TCP_CLOSE\n"); 332 dout("%s TCP_CLOSE\n", __func__);
204 case TCP_CLOSE_WAIT: 333 case TCP_CLOSE_WAIT:
205 dout("ceph_state_change TCP_CLOSE_WAIT\n"); 334 dout("%s TCP_CLOSE_WAIT\n", __func__);
206 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 335 con_sock_state_closing(con);
207 if (test_bit(CONNECTING, &con->state)) 336 set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
208 con->error_msg = "connection failed"; 337 queue_con(con);
209 else
210 con->error_msg = "socket closed";
211 queue_con(con);
212 }
213 break; 338 break;
214 case TCP_ESTABLISHED: 339 case TCP_ESTABLISHED:
215 dout("ceph_state_change TCP_ESTABLISHED\n"); 340 dout("%s TCP_ESTABLISHED\n", __func__);
341 con_sock_state_connected(con);
216 queue_con(con); 342 queue_con(con);
217 break; 343 break;
218 default: /* Everything else is uninteresting */ 344 default: /* Everything else is uninteresting */
@@ -228,9 +354,9 @@ static void set_sock_callbacks(struct socket *sock,
228{ 354{
229 struct sock *sk = sock->sk; 355 struct sock *sk = sock->sk;
230 sk->sk_user_data = con; 356 sk->sk_user_data = con;
231 sk->sk_data_ready = ceph_data_ready; 357 sk->sk_data_ready = ceph_sock_data_ready;
232 sk->sk_write_space = ceph_write_space; 358 sk->sk_write_space = ceph_sock_write_space;
233 sk->sk_state_change = ceph_state_change; 359 sk->sk_state_change = ceph_sock_state_change;
234} 360}
235 361
236 362
@@ -262,6 +388,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
262 388
263 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); 389 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
264 390
391 con_sock_state_connecting(con);
265 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), 392 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
266 O_NONBLOCK); 393 O_NONBLOCK);
267 if (ret == -EINPROGRESS) { 394 if (ret == -EINPROGRESS) {
@@ -277,7 +404,6 @@ static int ceph_tcp_connect(struct ceph_connection *con)
277 return ret; 404 return ret;
278 } 405 }
279 con->sock = sock; 406 con->sock = sock;
280
281 return 0; 407 return 0;
282} 408}
283 409
@@ -333,16 +459,24 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
333 */ 459 */
334static int con_close_socket(struct ceph_connection *con) 460static int con_close_socket(struct ceph_connection *con)
335{ 461{
336 int rc; 462 int rc = 0;
337 463
338 dout("con_close_socket on %p sock %p\n", con, con->sock); 464 dout("con_close_socket on %p sock %p\n", con, con->sock);
339 if (!con->sock) 465 if (con->sock) {
340 return 0; 466 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
341 set_bit(SOCK_CLOSED, &con->state); 467 sock_release(con->sock);
342 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 468 con->sock = NULL;
343 sock_release(con->sock); 469 }
344 con->sock = NULL; 470
345 clear_bit(SOCK_CLOSED, &con->state); 471 /*
472 * Forcibly clear the SOCK_CLOSED flag. It gets set
473 * independent of the connection mutex, and we could have
474 * received a socket close event before we had the chance to
475 * shut the socket down.
476 */
477 clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
478
479 con_sock_state_closed(con);
346 return rc; 480 return rc;
347} 481}
348 482
@@ -353,6 +487,10 @@ static int con_close_socket(struct ceph_connection *con)
353static void ceph_msg_remove(struct ceph_msg *msg) 487static void ceph_msg_remove(struct ceph_msg *msg)
354{ 488{
355 list_del_init(&msg->list_head); 489 list_del_init(&msg->list_head);
490 BUG_ON(msg->con == NULL);
491 msg->con->ops->put(msg->con);
492 msg->con = NULL;
493
356 ceph_msg_put(msg); 494 ceph_msg_put(msg);
357} 495}
358static void ceph_msg_remove_list(struct list_head *head) 496static void ceph_msg_remove_list(struct list_head *head)
@@ -372,8 +510,11 @@ static void reset_connection(struct ceph_connection *con)
372 ceph_msg_remove_list(&con->out_sent); 510 ceph_msg_remove_list(&con->out_sent);
373 511
374 if (con->in_msg) { 512 if (con->in_msg) {
513 BUG_ON(con->in_msg->con != con);
514 con->in_msg->con = NULL;
375 ceph_msg_put(con->in_msg); 515 ceph_msg_put(con->in_msg);
376 con->in_msg = NULL; 516 con->in_msg = NULL;
517 con->ops->put(con);
377 } 518 }
378 519
379 con->connect_seq = 0; 520 con->connect_seq = 0;
@@ -391,32 +532,44 @@ static void reset_connection(struct ceph_connection *con)
391 */ 532 */
392void ceph_con_close(struct ceph_connection *con) 533void ceph_con_close(struct ceph_connection *con)
393{ 534{
535 mutex_lock(&con->mutex);
394 dout("con_close %p peer %s\n", con, 536 dout("con_close %p peer %s\n", con,
395 ceph_pr_addr(&con->peer_addr.in_addr)); 537 ceph_pr_addr(&con->peer_addr.in_addr));
396 set_bit(CLOSED, &con->state); /* in case there's queued work */ 538 con->state = CON_STATE_CLOSED;
397 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 539
398 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 540 clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
399 clear_bit(KEEPALIVE_PENDING, &con->state); 541 clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
400 clear_bit(WRITE_PENDING, &con->state); 542 clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
401 mutex_lock(&con->mutex); 543 clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
544 clear_bit(CON_FLAG_BACKOFF, &con->flags);
545
402 reset_connection(con); 546 reset_connection(con);
403 con->peer_global_seq = 0; 547 con->peer_global_seq = 0;
404 cancel_delayed_work(&con->work); 548 cancel_delayed_work(&con->work);
549 con_close_socket(con);
405 mutex_unlock(&con->mutex); 550 mutex_unlock(&con->mutex);
406 queue_con(con);
407} 551}
408EXPORT_SYMBOL(ceph_con_close); 552EXPORT_SYMBOL(ceph_con_close);
409 553
410/* 554/*
411 * Reopen a closed connection, with a new peer address. 555 * Reopen a closed connection, with a new peer address.
412 */ 556 */
413void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) 557void ceph_con_open(struct ceph_connection *con,
558 __u8 entity_type, __u64 entity_num,
559 struct ceph_entity_addr *addr)
414{ 560{
561 mutex_lock(&con->mutex);
415 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 562 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
416 set_bit(OPENING, &con->state); 563
417 clear_bit(CLOSED, &con->state); 564 BUG_ON(con->state != CON_STATE_CLOSED);
565 con->state = CON_STATE_PREOPEN;
566
567 con->peer_name.type = (__u8) entity_type;
568 con->peer_name.num = cpu_to_le64(entity_num);
569
418 memcpy(&con->peer_addr, addr, sizeof(*addr)); 570 memcpy(&con->peer_addr, addr, sizeof(*addr));
419 con->delay = 0; /* reset backoff memory */ 571 con->delay = 0; /* reset backoff memory */
572 mutex_unlock(&con->mutex);
420 queue_con(con); 573 queue_con(con);
421} 574}
422EXPORT_SYMBOL(ceph_con_open); 575EXPORT_SYMBOL(ceph_con_open);
@@ -430,42 +583,26 @@ bool ceph_con_opened(struct ceph_connection *con)
430} 583}
431 584
432/* 585/*
433 * generic get/put
434 */
435struct ceph_connection *ceph_con_get(struct ceph_connection *con)
436{
437 int nref = __atomic_add_unless(&con->nref, 1, 0);
438
439 dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
440
441 return nref ? con : NULL;
442}
443
444void ceph_con_put(struct ceph_connection *con)
445{
446 int nref = atomic_dec_return(&con->nref);
447
448 BUG_ON(nref < 0);
449 if (nref == 0) {
450 BUG_ON(con->sock);
451 kfree(con);
452 }
453 dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
454}
455
456/*
457 * initialize a new connection. 586 * initialize a new connection.
458 */ 587 */
459void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) 588void ceph_con_init(struct ceph_connection *con, void *private,
589 const struct ceph_connection_operations *ops,
590 struct ceph_messenger *msgr)
460{ 591{
461 dout("con_init %p\n", con); 592 dout("con_init %p\n", con);
462 memset(con, 0, sizeof(*con)); 593 memset(con, 0, sizeof(*con));
463 atomic_set(&con->nref, 1); 594 con->private = private;
595 con->ops = ops;
464 con->msgr = msgr; 596 con->msgr = msgr;
597
598 con_sock_state_init(con);
599
465 mutex_init(&con->mutex); 600 mutex_init(&con->mutex);
466 INIT_LIST_HEAD(&con->out_queue); 601 INIT_LIST_HEAD(&con->out_queue);
467 INIT_LIST_HEAD(&con->out_sent); 602 INIT_LIST_HEAD(&con->out_sent);
468 INIT_DELAYED_WORK(&con->work, con_work); 603 INIT_DELAYED_WORK(&con->work, con_work);
604
605 con->state = CON_STATE_CLOSED;
469} 606}
470EXPORT_SYMBOL(ceph_con_init); 607EXPORT_SYMBOL(ceph_con_init);
471 608
@@ -486,14 +623,14 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
486 return ret; 623 return ret;
487} 624}
488 625
489static void ceph_con_out_kvec_reset(struct ceph_connection *con) 626static void con_out_kvec_reset(struct ceph_connection *con)
490{ 627{
491 con->out_kvec_left = 0; 628 con->out_kvec_left = 0;
492 con->out_kvec_bytes = 0; 629 con->out_kvec_bytes = 0;
493 con->out_kvec_cur = &con->out_kvec[0]; 630 con->out_kvec_cur = &con->out_kvec[0];
494} 631}
495 632
496static void ceph_con_out_kvec_add(struct ceph_connection *con, 633static void con_out_kvec_add(struct ceph_connection *con,
497 size_t size, void *data) 634 size_t size, void *data)
498{ 635{
499 int index; 636 int index;
@@ -507,6 +644,53 @@ static void ceph_con_out_kvec_add(struct ceph_connection *con,
507 con->out_kvec_bytes += size; 644 con->out_kvec_bytes += size;
508} 645}
509 646
647#ifdef CONFIG_BLOCK
648static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
649{
650 if (!bio) {
651 *iter = NULL;
652 *seg = 0;
653 return;
654 }
655 *iter = bio;
656 *seg = bio->bi_idx;
657}
658
659static void iter_bio_next(struct bio **bio_iter, int *seg)
660{
661 if (*bio_iter == NULL)
662 return;
663
664 BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
665
666 (*seg)++;
667 if (*seg == (*bio_iter)->bi_vcnt)
668 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
669}
670#endif
671
672static void prepare_write_message_data(struct ceph_connection *con)
673{
674 struct ceph_msg *msg = con->out_msg;
675
676 BUG_ON(!msg);
677 BUG_ON(!msg->hdr.data_len);
678
679 /* initialize page iterator */
680 con->out_msg_pos.page = 0;
681 if (msg->pages)
682 con->out_msg_pos.page_pos = msg->page_alignment;
683 else
684 con->out_msg_pos.page_pos = 0;
685#ifdef CONFIG_BLOCK
686 if (msg->bio)
687 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
688#endif
689 con->out_msg_pos.data_pos = 0;
690 con->out_msg_pos.did_page_crc = false;
691 con->out_more = 1; /* data + footer will follow */
692}
693
510/* 694/*
511 * Prepare footer for currently outgoing message, and finish things 695 * Prepare footer for currently outgoing message, and finish things
512 * off. Assumes out_kvec* are already valid.. we just add on to the end. 696 * off. Assumes out_kvec* are already valid.. we just add on to the end.
@@ -516,6 +700,8 @@ static void prepare_write_message_footer(struct ceph_connection *con)
516 struct ceph_msg *m = con->out_msg; 700 struct ceph_msg *m = con->out_msg;
517 int v = con->out_kvec_left; 701 int v = con->out_kvec_left;
518 702
703 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
704
519 dout("prepare_write_message_footer %p\n", con); 705 dout("prepare_write_message_footer %p\n", con);
520 con->out_kvec_is_msg = true; 706 con->out_kvec_is_msg = true;
521 con->out_kvec[v].iov_base = &m->footer; 707 con->out_kvec[v].iov_base = &m->footer;
@@ -534,7 +720,7 @@ static void prepare_write_message(struct ceph_connection *con)
534 struct ceph_msg *m; 720 struct ceph_msg *m;
535 u32 crc; 721 u32 crc;
536 722
537 ceph_con_out_kvec_reset(con); 723 con_out_kvec_reset(con);
538 con->out_kvec_is_msg = true; 724 con->out_kvec_is_msg = true;
539 con->out_msg_done = false; 725 con->out_msg_done = false;
540 726
@@ -542,14 +728,16 @@ static void prepare_write_message(struct ceph_connection *con)
542 * TCP packet that's a good thing. */ 728 * TCP packet that's a good thing. */
543 if (con->in_seq > con->in_seq_acked) { 729 if (con->in_seq > con->in_seq_acked) {
544 con->in_seq_acked = con->in_seq; 730 con->in_seq_acked = con->in_seq;
545 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 731 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
546 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 732 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
547 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), 733 con_out_kvec_add(con, sizeof (con->out_temp_ack),
548 &con->out_temp_ack); 734 &con->out_temp_ack);
549 } 735 }
550 736
737 BUG_ON(list_empty(&con->out_queue));
551 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); 738 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
552 con->out_msg = m; 739 con->out_msg = m;
740 BUG_ON(m->con != con);
553 741
554 /* put message on sent list */ 742 /* put message on sent list */
555 ceph_msg_get(m); 743 ceph_msg_get(m);
@@ -576,18 +764,18 @@ static void prepare_write_message(struct ceph_connection *con)
576 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 764 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
577 765
578 /* tag + hdr + front + middle */ 766 /* tag + hdr + front + middle */
579 ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 767 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
580 ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); 768 con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
581 ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 769 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
582 770
583 if (m->middle) 771 if (m->middle)
584 ceph_con_out_kvec_add(con, m->middle->vec.iov_len, 772 con_out_kvec_add(con, m->middle->vec.iov_len,
585 m->middle->vec.iov_base); 773 m->middle->vec.iov_base);
586 774
587 /* fill in crc (except data pages), footer */ 775 /* fill in crc (except data pages), footer */
588 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 776 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
589 con->out_msg->hdr.crc = cpu_to_le32(crc); 777 con->out_msg->hdr.crc = cpu_to_le32(crc);
590 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 778 con->out_msg->footer.flags = 0;
591 779
592 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 780 crc = crc32c(0, m->front.iov_base, m->front.iov_len);
593 con->out_msg->footer.front_crc = cpu_to_le32(crc); 781 con->out_msg->footer.front_crc = cpu_to_le32(crc);
@@ -597,28 +785,19 @@ static void prepare_write_message(struct ceph_connection *con)
597 con->out_msg->footer.middle_crc = cpu_to_le32(crc); 785 con->out_msg->footer.middle_crc = cpu_to_le32(crc);
598 } else 786 } else
599 con->out_msg->footer.middle_crc = 0; 787 con->out_msg->footer.middle_crc = 0;
600 con->out_msg->footer.data_crc = 0; 788 dout("%s front_crc %u middle_crc %u\n", __func__,
601 dout("prepare_write_message front_crc %u data_crc %u\n",
602 le32_to_cpu(con->out_msg->footer.front_crc), 789 le32_to_cpu(con->out_msg->footer.front_crc),
603 le32_to_cpu(con->out_msg->footer.middle_crc)); 790 le32_to_cpu(con->out_msg->footer.middle_crc));
604 791
605 /* is there a data payload? */ 792 /* is there a data payload? */
606 if (le32_to_cpu(m->hdr.data_len) > 0) { 793 con->out_msg->footer.data_crc = 0;
607 /* initialize page iterator */ 794 if (m->hdr.data_len)
608 con->out_msg_pos.page = 0; 795 prepare_write_message_data(con);
609 if (m->pages) 796 else
610 con->out_msg_pos.page_pos = m->page_alignment;
611 else
612 con->out_msg_pos.page_pos = 0;
613 con->out_msg_pos.data_pos = 0;
614 con->out_msg_pos.did_page_crc = false;
615 con->out_more = 1; /* data + footer will follow */
616 } else {
617 /* no, queue up footer too and be done */ 797 /* no, queue up footer too and be done */
618 prepare_write_message_footer(con); 798 prepare_write_message_footer(con);
619 }
620 799
621 set_bit(WRITE_PENDING, &con->state); 800 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
622} 801}
623 802
624/* 803/*
@@ -630,16 +809,16 @@ static void prepare_write_ack(struct ceph_connection *con)
630 con->in_seq_acked, con->in_seq); 809 con->in_seq_acked, con->in_seq);
631 con->in_seq_acked = con->in_seq; 810 con->in_seq_acked = con->in_seq;
632 811
633 ceph_con_out_kvec_reset(con); 812 con_out_kvec_reset(con);
634 813
635 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 814 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
636 815
637 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 816 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
638 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), 817 con_out_kvec_add(con, sizeof (con->out_temp_ack),
639 &con->out_temp_ack); 818 &con->out_temp_ack);
640 819
641 con->out_more = 1; /* more will follow.. eventually.. */ 820 con->out_more = 1; /* more will follow.. eventually.. */
642 set_bit(WRITE_PENDING, &con->state); 821 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
643} 822}
644 823
645/* 824/*
@@ -648,9 +827,9 @@ static void prepare_write_ack(struct ceph_connection *con)
648static void prepare_write_keepalive(struct ceph_connection *con) 827static void prepare_write_keepalive(struct ceph_connection *con)
649{ 828{
650 dout("prepare_write_keepalive %p\n", con); 829 dout("prepare_write_keepalive %p\n", con);
651 ceph_con_out_kvec_reset(con); 830 con_out_kvec_reset(con);
652 ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); 831 con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
653 set_bit(WRITE_PENDING, &con->state); 832 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
654} 833}
655 834
656/* 835/*
@@ -665,27 +844,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
665 if (!con->ops->get_authorizer) { 844 if (!con->ops->get_authorizer) {
666 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 845 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
667 con->out_connect.authorizer_len = 0; 846 con->out_connect.authorizer_len = 0;
668
669 return NULL; 847 return NULL;
670 } 848 }
671 849
672 /* Can't hold the mutex while getting authorizer */ 850 /* Can't hold the mutex while getting authorizer */
673
674 mutex_unlock(&con->mutex); 851 mutex_unlock(&con->mutex);
675
676 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); 852 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
677
678 mutex_lock(&con->mutex); 853 mutex_lock(&con->mutex);
679 854
680 if (IS_ERR(auth)) 855 if (IS_ERR(auth))
681 return auth; 856 return auth;
682 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) 857 if (con->state != CON_STATE_NEGOTIATING)
683 return ERR_PTR(-EAGAIN); 858 return ERR_PTR(-EAGAIN);
684 859
685 con->auth_reply_buf = auth->authorizer_reply_buf; 860 con->auth_reply_buf = auth->authorizer_reply_buf;
686 con->auth_reply_buf_len = auth->authorizer_reply_buf_len; 861 con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
687
688
689 return auth; 862 return auth;
690} 863}
691 864
@@ -694,12 +867,12 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
694 */ 867 */
695static void prepare_write_banner(struct ceph_connection *con) 868static void prepare_write_banner(struct ceph_connection *con)
696{ 869{
697 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 870 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
698 ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 871 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
699 &con->msgr->my_enc_addr); 872 &con->msgr->my_enc_addr);
700 873
701 con->out_more = 0; 874 con->out_more = 0;
702 set_bit(WRITE_PENDING, &con->state); 875 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
703} 876}
704 877
705static int prepare_write_connect(struct ceph_connection *con) 878static int prepare_write_connect(struct ceph_connection *con)
@@ -742,14 +915,15 @@ static int prepare_write_connect(struct ceph_connection *con)
742 con->out_connect.authorizer_len = auth ? 915 con->out_connect.authorizer_len = auth ?
743 cpu_to_le32(auth->authorizer_buf_len) : 0; 916 cpu_to_le32(auth->authorizer_buf_len) : 0;
744 917
745 ceph_con_out_kvec_add(con, sizeof (con->out_connect), 918 con_out_kvec_reset(con);
919 con_out_kvec_add(con, sizeof (con->out_connect),
746 &con->out_connect); 920 &con->out_connect);
747 if (auth && auth->authorizer_buf_len) 921 if (auth && auth->authorizer_buf_len)
748 ceph_con_out_kvec_add(con, auth->authorizer_buf_len, 922 con_out_kvec_add(con, auth->authorizer_buf_len,
749 auth->authorizer_buf); 923 auth->authorizer_buf);
750 924
751 con->out_more = 0; 925 con->out_more = 0;
752 set_bit(WRITE_PENDING, &con->state); 926 set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
753 927
754 return 0; 928 return 0;
755} 929}
@@ -797,30 +971,34 @@ out:
797 return ret; /* done! */ 971 return ret; /* done! */
798} 972}
799 973
800#ifdef CONFIG_BLOCK 974static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
801static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) 975 size_t len, size_t sent, bool in_trail)
802{ 976{
803 if (!bio) { 977 struct ceph_msg *msg = con->out_msg;
804 *iter = NULL;
805 *seg = 0;
806 return;
807 }
808 *iter = bio;
809 *seg = bio->bi_idx;
810}
811 978
812static void iter_bio_next(struct bio **bio_iter, int *seg) 979 BUG_ON(!msg);
813{ 980 BUG_ON(!sent);
814 if (*bio_iter == NULL)
815 return;
816 981
817 BUG_ON(*seg >= (*bio_iter)->bi_vcnt); 982 con->out_msg_pos.data_pos += sent;
983 con->out_msg_pos.page_pos += sent;
984 if (sent < len)
985 return;
818 986
819 (*seg)++; 987 BUG_ON(sent != len);
820 if (*seg == (*bio_iter)->bi_vcnt) 988 con->out_msg_pos.page_pos = 0;
821 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 989 con->out_msg_pos.page++;
822} 990 con->out_msg_pos.did_page_crc = false;
991 if (in_trail)
992 list_move_tail(&page->lru,
993 &msg->trail->head);
994 else if (msg->pagelist)
995 list_move_tail(&page->lru,
996 &msg->pagelist->head);
997#ifdef CONFIG_BLOCK
998 else if (msg->bio)
999 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
823#endif 1000#endif
1001}
824 1002
825/* 1003/*
826 * Write as much message data payload as we can. If we finish, queue 1004 * Write as much message data payload as we can. If we finish, queue
@@ -837,41 +1015,36 @@ static int write_partial_msg_pages(struct ceph_connection *con)
837 bool do_datacrc = !con->msgr->nocrc; 1015 bool do_datacrc = !con->msgr->nocrc;
838 int ret; 1016 int ret;
839 int total_max_write; 1017 int total_max_write;
840 int in_trail = 0; 1018 bool in_trail = false;
841 size_t trail_len = (msg->trail ? msg->trail->length : 0); 1019 const size_t trail_len = (msg->trail ? msg->trail->length : 0);
1020 const size_t trail_off = data_len - trail_len;
842 1021
843 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 1022 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
844 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 1023 con, msg, con->out_msg_pos.page, msg->nr_pages,
845 con->out_msg_pos.page_pos); 1024 con->out_msg_pos.page_pos);
846 1025
847#ifdef CONFIG_BLOCK 1026 /*
848 if (msg->bio && !msg->bio_iter) 1027 * Iterate through each page that contains data to be
849 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); 1028 * written, and send as much as possible for each.
850#endif 1029 *
851 1030 * If we are calculating the data crc (the default), we will
1031 * need to map the page. If we have no pages, they have
1032 * been revoked, so use the zero page.
1033 */
852 while (data_len > con->out_msg_pos.data_pos) { 1034 while (data_len > con->out_msg_pos.data_pos) {
853 struct page *page = NULL; 1035 struct page *page = NULL;
854 int max_write = PAGE_SIZE; 1036 int max_write = PAGE_SIZE;
855 int bio_offset = 0; 1037 int bio_offset = 0;
856 1038
857 total_max_write = data_len - trail_len - 1039 in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
858 con->out_msg_pos.data_pos; 1040 if (!in_trail)
859 1041 total_max_write = trail_off - con->out_msg_pos.data_pos;
860 /*
861 * if we are calculating the data crc (the default), we need
862 * to map the page. if our pages[] has been revoked, use the
863 * zero page.
864 */
865
866 /* have we reached the trail part of the data? */
867 if (con->out_msg_pos.data_pos >= data_len - trail_len) {
868 in_trail = 1;
869 1042
1043 if (in_trail) {
870 total_max_write = data_len - con->out_msg_pos.data_pos; 1044 total_max_write = data_len - con->out_msg_pos.data_pos;
871 1045
872 page = list_first_entry(&msg->trail->head, 1046 page = list_first_entry(&msg->trail->head,
873 struct page, lru); 1047 struct page, lru);
874 max_write = PAGE_SIZE;
875 } else if (msg->pages) { 1048 } else if (msg->pages) {
876 page = msg->pages[con->out_msg_pos.page]; 1049 page = msg->pages[con->out_msg_pos.page];
877 } else if (msg->pagelist) { 1050 } else if (msg->pagelist) {
@@ -894,15 +1067,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
894 1067
895 if (do_datacrc && !con->out_msg_pos.did_page_crc) { 1068 if (do_datacrc && !con->out_msg_pos.did_page_crc) {
896 void *base; 1069 void *base;
897 u32 crc; 1070 u32 crc = le32_to_cpu(msg->footer.data_crc);
898 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
899 char *kaddr; 1071 char *kaddr;
900 1072
901 kaddr = kmap(page); 1073 kaddr = kmap(page);
902 BUG_ON(kaddr == NULL); 1074 BUG_ON(kaddr == NULL);
903 base = kaddr + con->out_msg_pos.page_pos + bio_offset; 1075 base = kaddr + con->out_msg_pos.page_pos + bio_offset;
904 crc = crc32c(tmpcrc, base, len); 1076 crc = crc32c(crc, base, len);
905 con->out_msg->footer.data_crc = cpu_to_le32(crc); 1077 msg->footer.data_crc = cpu_to_le32(crc);
906 con->out_msg_pos.did_page_crc = true; 1078 con->out_msg_pos.did_page_crc = true;
907 } 1079 }
908 ret = ceph_tcp_sendpage(con->sock, page, 1080 ret = ceph_tcp_sendpage(con->sock, page,
@@ -915,31 +1087,15 @@ static int write_partial_msg_pages(struct ceph_connection *con)
915 if (ret <= 0) 1087 if (ret <= 0)
916 goto out; 1088 goto out;
917 1089
918 con->out_msg_pos.data_pos += ret; 1090 out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
919 con->out_msg_pos.page_pos += ret;
920 if (ret == len) {
921 con->out_msg_pos.page_pos = 0;
922 con->out_msg_pos.page++;
923 con->out_msg_pos.did_page_crc = false;
924 if (in_trail)
925 list_move_tail(&page->lru,
926 &msg->trail->head);
927 else if (msg->pagelist)
928 list_move_tail(&page->lru,
929 &msg->pagelist->head);
930#ifdef CONFIG_BLOCK
931 else if (msg->bio)
932 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
933#endif
934 }
935 } 1091 }
936 1092
937 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 1093 dout("write_partial_msg_pages %p msg %p done\n", con, msg);
938 1094
939 /* prepare and queue up footer, too */ 1095 /* prepare and queue up footer, too */
940 if (!do_datacrc) 1096 if (!do_datacrc)
941 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 1097 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
942 ceph_con_out_kvec_reset(con); 1098 con_out_kvec_reset(con);
943 prepare_write_message_footer(con); 1099 prepare_write_message_footer(con);
944 ret = 1; 1100 ret = 1;
945out: 1101out:
@@ -1351,20 +1507,14 @@ static int process_banner(struct ceph_connection *con)
1351 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); 1507 ceph_pr_addr(&con->msgr->inst.addr.in_addr));
1352 } 1508 }
1353 1509
1354 set_bit(NEGOTIATING, &con->state);
1355 prepare_read_connect(con);
1356 return 0; 1510 return 0;
1357} 1511}
1358 1512
1359static void fail_protocol(struct ceph_connection *con) 1513static void fail_protocol(struct ceph_connection *con)
1360{ 1514{
1361 reset_connection(con); 1515 reset_connection(con);
1362 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1516 BUG_ON(con->state != CON_STATE_NEGOTIATING);
1363 1517 con->state = CON_STATE_CLOSED;
1364 mutex_unlock(&con->mutex);
1365 if (con->ops->bad_proto)
1366 con->ops->bad_proto(con);
1367 mutex_lock(&con->mutex);
1368} 1518}
1369 1519
1370static int process_connect(struct ceph_connection *con) 1520static int process_connect(struct ceph_connection *con)
@@ -1407,7 +1557,6 @@ static int process_connect(struct ceph_connection *con)
1407 return -1; 1557 return -1;
1408 } 1558 }
1409 con->auth_retry = 1; 1559 con->auth_retry = 1;
1410 ceph_con_out_kvec_reset(con);
1411 ret = prepare_write_connect(con); 1560 ret = prepare_write_connect(con);
1412 if (ret < 0) 1561 if (ret < 0)
1413 return ret; 1562 return ret;
@@ -1428,7 +1577,6 @@ static int process_connect(struct ceph_connection *con)
1428 ENTITY_NAME(con->peer_name), 1577 ENTITY_NAME(con->peer_name),
1429 ceph_pr_addr(&con->peer_addr.in_addr)); 1578 ceph_pr_addr(&con->peer_addr.in_addr));
1430 reset_connection(con); 1579 reset_connection(con);
1431 ceph_con_out_kvec_reset(con);
1432 ret = prepare_write_connect(con); 1580 ret = prepare_write_connect(con);
1433 if (ret < 0) 1581 if (ret < 0)
1434 return ret; 1582 return ret;
@@ -1440,8 +1588,7 @@ static int process_connect(struct ceph_connection *con)
1440 if (con->ops->peer_reset) 1588 if (con->ops->peer_reset)
1441 con->ops->peer_reset(con); 1589 con->ops->peer_reset(con);
1442 mutex_lock(&con->mutex); 1590 mutex_lock(&con->mutex);
1443 if (test_bit(CLOSED, &con->state) || 1591 if (con->state != CON_STATE_NEGOTIATING)
1444 test_bit(OPENING, &con->state))
1445 return -EAGAIN; 1592 return -EAGAIN;
1446 break; 1593 break;
1447 1594
@@ -1454,7 +1601,6 @@ static int process_connect(struct ceph_connection *con)
1454 le32_to_cpu(con->out_connect.connect_seq), 1601 le32_to_cpu(con->out_connect.connect_seq),
1455 le32_to_cpu(con->in_reply.connect_seq)); 1602 le32_to_cpu(con->in_reply.connect_seq));
1456 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); 1603 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
1457 ceph_con_out_kvec_reset(con);
1458 ret = prepare_write_connect(con); 1604 ret = prepare_write_connect(con);
1459 if (ret < 0) 1605 if (ret < 0)
1460 return ret; 1606 return ret;
@@ -1471,7 +1617,6 @@ static int process_connect(struct ceph_connection *con)
1471 le32_to_cpu(con->in_reply.global_seq)); 1617 le32_to_cpu(con->in_reply.global_seq));
1472 get_global_seq(con->msgr, 1618 get_global_seq(con->msgr,
1473 le32_to_cpu(con->in_reply.global_seq)); 1619 le32_to_cpu(con->in_reply.global_seq));
1474 ceph_con_out_kvec_reset(con);
1475 ret = prepare_write_connect(con); 1620 ret = prepare_write_connect(con);
1476 if (ret < 0) 1621 if (ret < 0)
1477 return ret; 1622 return ret;
@@ -1489,7 +1634,10 @@ static int process_connect(struct ceph_connection *con)
1489 fail_protocol(con); 1634 fail_protocol(con);
1490 return -1; 1635 return -1;
1491 } 1636 }
1492 clear_bit(CONNECTING, &con->state); 1637
1638 BUG_ON(con->state != CON_STATE_NEGOTIATING);
1639 con->state = CON_STATE_OPEN;
1640
1493 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1641 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1494 con->connect_seq++; 1642 con->connect_seq++;
1495 con->peer_features = server_feat; 1643 con->peer_features = server_feat;
@@ -1501,7 +1649,9 @@ static int process_connect(struct ceph_connection *con)
1501 le32_to_cpu(con->in_reply.connect_seq)); 1649 le32_to_cpu(con->in_reply.connect_seq));
1502 1650
1503 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1651 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1504 set_bit(LOSSYTX, &con->state); 1652 set_bit(CON_FLAG_LOSSYTX, &con->flags);
1653
1654 con->delay = 0; /* reset backoff memory */
1505 1655
1506 prepare_read_tag(con); 1656 prepare_read_tag(con);
1507 break; 1657 break;
@@ -1587,10 +1737,7 @@ static int read_partial_message_section(struct ceph_connection *con,
1587 return 1; 1737 return 1;
1588} 1738}
1589 1739
1590static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1740static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
1591 struct ceph_msg_header *hdr,
1592 int *skip);
1593
1594 1741
1595static int read_partial_message_pages(struct ceph_connection *con, 1742static int read_partial_message_pages(struct ceph_connection *con,
1596 struct page **pages, 1743 struct page **pages,
@@ -1633,9 +1780,6 @@ static int read_partial_message_bio(struct ceph_connection *con,
1633 void *p; 1780 void *p;
1634 int ret, left; 1781 int ret, left;
1635 1782
1636 if (IS_ERR(bv))
1637 return PTR_ERR(bv);
1638
1639 left = min((int)(data_len - con->in_msg_pos.data_pos), 1783 left = min((int)(data_len - con->in_msg_pos.data_pos),
1640 (int)(bv->bv_len - con->in_msg_pos.page_pos)); 1784 (int)(bv->bv_len - con->in_msg_pos.page_pos));
1641 1785
@@ -1672,7 +1816,6 @@ static int read_partial_message(struct ceph_connection *con)
1672 int ret; 1816 int ret;
1673 unsigned int front_len, middle_len, data_len; 1817 unsigned int front_len, middle_len, data_len;
1674 bool do_datacrc = !con->msgr->nocrc; 1818 bool do_datacrc = !con->msgr->nocrc;
1675 int skip;
1676 u64 seq; 1819 u64 seq;
1677 u32 crc; 1820 u32 crc;
1678 1821
@@ -1723,10 +1866,13 @@ static int read_partial_message(struct ceph_connection *con)
1723 1866
1724 /* allocate message? */ 1867 /* allocate message? */
1725 if (!con->in_msg) { 1868 if (!con->in_msg) {
1869 int skip = 0;
1870
1726 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1871 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1727 con->in_hdr.front_len, con->in_hdr.data_len); 1872 con->in_hdr.front_len, con->in_hdr.data_len);
1728 skip = 0; 1873 ret = ceph_con_in_msg_alloc(con, &skip);
1729 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1874 if (ret < 0)
1875 return ret;
1730 if (skip) { 1876 if (skip) {
1731 /* skip this message */ 1877 /* skip this message */
1732 dout("alloc_msg said skip message\n"); 1878 dout("alloc_msg said skip message\n");
@@ -1737,11 +1883,9 @@ static int read_partial_message(struct ceph_connection *con)
1737 con->in_seq++; 1883 con->in_seq++;
1738 return 0; 1884 return 0;
1739 } 1885 }
1740 if (!con->in_msg) { 1886
1741 con->error_msg = 1887 BUG_ON(!con->in_msg);
1742 "error allocating memory for incoming message"; 1888 BUG_ON(con->in_msg->con != con);
1743 return -ENOMEM;
1744 }
1745 m = con->in_msg; 1889 m = con->in_msg;
1746 m->front.iov_len = 0; /* haven't read it yet */ 1890 m->front.iov_len = 0; /* haven't read it yet */
1747 if (m->middle) 1891 if (m->middle)
@@ -1753,6 +1897,11 @@ static int read_partial_message(struct ceph_connection *con)
1753 else 1897 else
1754 con->in_msg_pos.page_pos = 0; 1898 con->in_msg_pos.page_pos = 0;
1755 con->in_msg_pos.data_pos = 0; 1899 con->in_msg_pos.data_pos = 0;
1900
1901#ifdef CONFIG_BLOCK
1902 if (m->bio)
1903 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1904#endif
1756 } 1905 }
1757 1906
1758 /* front */ 1907 /* front */
@@ -1769,10 +1918,6 @@ static int read_partial_message(struct ceph_connection *con)
1769 if (ret <= 0) 1918 if (ret <= 0)
1770 return ret; 1919 return ret;
1771 } 1920 }
1772#ifdef CONFIG_BLOCK
1773 if (m->bio && !m->bio_iter)
1774 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1775#endif
1776 1921
1777 /* (page) data */ 1922 /* (page) data */
1778 while (con->in_msg_pos.data_pos < data_len) { 1923 while (con->in_msg_pos.data_pos < data_len) {
@@ -1783,7 +1928,7 @@ static int read_partial_message(struct ceph_connection *con)
1783 return ret; 1928 return ret;
1784#ifdef CONFIG_BLOCK 1929#ifdef CONFIG_BLOCK
1785 } else if (m->bio) { 1930 } else if (m->bio) {
1786 1931 BUG_ON(!m->bio_iter);
1787 ret = read_partial_message_bio(con, 1932 ret = read_partial_message_bio(con,
1788 &m->bio_iter, &m->bio_seg, 1933 &m->bio_iter, &m->bio_seg,
1789 data_len, do_datacrc); 1934 data_len, do_datacrc);
@@ -1837,8 +1982,11 @@ static void process_message(struct ceph_connection *con)
1837{ 1982{
1838 struct ceph_msg *msg; 1983 struct ceph_msg *msg;
1839 1984
1985 BUG_ON(con->in_msg->con != con);
1986 con->in_msg->con = NULL;
1840 msg = con->in_msg; 1987 msg = con->in_msg;
1841 con->in_msg = NULL; 1988 con->in_msg = NULL;
1989 con->ops->put(con);
1842 1990
1843 /* if first message, set peer_name */ 1991 /* if first message, set peer_name */
1844 if (con->peer_name.type == 0) 1992 if (con->peer_name.type == 0)
@@ -1858,7 +2006,6 @@ static void process_message(struct ceph_connection *con)
1858 con->ops->dispatch(con, msg); 2006 con->ops->dispatch(con, msg);
1859 2007
1860 mutex_lock(&con->mutex); 2008 mutex_lock(&con->mutex);
1861 prepare_read_tag(con);
1862} 2009}
1863 2010
1864 2011
@@ -1870,22 +2017,19 @@ static int try_write(struct ceph_connection *con)
1870{ 2017{
1871 int ret = 1; 2018 int ret = 1;
1872 2019
1873 dout("try_write start %p state %lu nref %d\n", con, con->state, 2020 dout("try_write start %p state %lu\n", con, con->state);
1874 atomic_read(&con->nref));
1875 2021
1876more: 2022more:
1877 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 2023 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1878 2024
1879 /* open the socket first? */ 2025 /* open the socket first? */
1880 if (con->sock == NULL) { 2026 if (con->state == CON_STATE_PREOPEN) {
1881 ceph_con_out_kvec_reset(con); 2027 BUG_ON(con->sock);
2028 con->state = CON_STATE_CONNECTING;
2029
2030 con_out_kvec_reset(con);
1882 prepare_write_banner(con); 2031 prepare_write_banner(con);
1883 ret = prepare_write_connect(con);
1884 if (ret < 0)
1885 goto out;
1886 prepare_read_banner(con); 2032 prepare_read_banner(con);
1887 set_bit(CONNECTING, &con->state);
1888 clear_bit(NEGOTIATING, &con->state);
1889 2033
1890 BUG_ON(con->in_msg); 2034 BUG_ON(con->in_msg);
1891 con->in_tag = CEPH_MSGR_TAG_READY; 2035 con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1932,7 +2076,7 @@ more_kvec:
1932 } 2076 }
1933 2077
1934do_next: 2078do_next:
1935 if (!test_bit(CONNECTING, &con->state)) { 2079 if (con->state == CON_STATE_OPEN) {
1936 /* is anything else pending? */ 2080 /* is anything else pending? */
1937 if (!list_empty(&con->out_queue)) { 2081 if (!list_empty(&con->out_queue)) {
1938 prepare_write_message(con); 2082 prepare_write_message(con);
@@ -1942,14 +2086,15 @@ do_next:
1942 prepare_write_ack(con); 2086 prepare_write_ack(con);
1943 goto more; 2087 goto more;
1944 } 2088 }
1945 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 2089 if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
2090 &con->flags)) {
1946 prepare_write_keepalive(con); 2091 prepare_write_keepalive(con);
1947 goto more; 2092 goto more;
1948 } 2093 }
1949 } 2094 }
1950 2095
1951 /* Nothing to do! */ 2096 /* Nothing to do! */
1952 clear_bit(WRITE_PENDING, &con->state); 2097 clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
1953 dout("try_write nothing else to write.\n"); 2098 dout("try_write nothing else to write.\n");
1954 ret = 0; 2099 ret = 0;
1955out: 2100out:
@@ -1966,38 +2111,42 @@ static int try_read(struct ceph_connection *con)
1966{ 2111{
1967 int ret = -1; 2112 int ret = -1;
1968 2113
1969 if (!con->sock) 2114more:
1970 return 0; 2115 dout("try_read start on %p state %lu\n", con, con->state);
1971 2116 if (con->state != CON_STATE_CONNECTING &&
1972 if (test_bit(STANDBY, &con->state)) 2117 con->state != CON_STATE_NEGOTIATING &&
2118 con->state != CON_STATE_OPEN)
1973 return 0; 2119 return 0;
1974 2120
1975 dout("try_read start on %p\n", con); 2121 BUG_ON(!con->sock);
1976 2122
1977more:
1978 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 2123 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1979 con->in_base_pos); 2124 con->in_base_pos);
1980 2125
1981 /* 2126 if (con->state == CON_STATE_CONNECTING) {
1982 * process_connect and process_message drop and re-take 2127 dout("try_read connecting\n");
1983 * con->mutex. make sure we handle a racing close or reopen. 2128 ret = read_partial_banner(con);
1984 */ 2129 if (ret <= 0)
1985 if (test_bit(CLOSED, &con->state) || 2130 goto out;
1986 test_bit(OPENING, &con->state)) { 2131 ret = process_banner(con);
1987 ret = -EAGAIN; 2132 if (ret < 0)
2133 goto out;
2134
2135 BUG_ON(con->state != CON_STATE_CONNECTING);
2136 con->state = CON_STATE_NEGOTIATING;
2137
2138 /* Banner is good, exchange connection info */
2139 ret = prepare_write_connect(con);
2140 if (ret < 0)
2141 goto out;
2142 prepare_read_connect(con);
2143
2144 /* Send connection info before awaiting response */
1988 goto out; 2145 goto out;
1989 } 2146 }
1990 2147
1991 if (test_bit(CONNECTING, &con->state)) { 2148 if (con->state == CON_STATE_NEGOTIATING) {
1992 if (!test_bit(NEGOTIATING, &con->state)) { 2149 dout("try_read negotiating\n");
1993 dout("try_read connecting\n");
1994 ret = read_partial_banner(con);
1995 if (ret <= 0)
1996 goto out;
1997 ret = process_banner(con);
1998 if (ret < 0)
1999 goto out;
2000 }
2001 ret = read_partial_connect(con); 2150 ret = read_partial_connect(con);
2002 if (ret <= 0) 2151 if (ret <= 0)
2003 goto out; 2152 goto out;
@@ -2007,6 +2156,8 @@ more:
2007 goto more; 2156 goto more;
2008 } 2157 }
2009 2158
2159 BUG_ON(con->state != CON_STATE_OPEN);
2160
2010 if (con->in_base_pos < 0) { 2161 if (con->in_base_pos < 0) {
2011 /* 2162 /*
2012 * skipping + discarding content. 2163 * skipping + discarding content.
@@ -2040,7 +2191,8 @@ more:
2040 prepare_read_ack(con); 2191 prepare_read_ack(con);
2041 break; 2192 break;
2042 case CEPH_MSGR_TAG_CLOSE: 2193 case CEPH_MSGR_TAG_CLOSE:
2043 set_bit(CLOSED, &con->state); /* fixme */ 2194 con_close_socket(con);
2195 con->state = CON_STATE_CLOSED;
2044 goto out; 2196 goto out;
2045 default: 2197 default:
2046 goto bad_tag; 2198 goto bad_tag;
@@ -2063,6 +2215,8 @@ more:
2063 if (con->in_tag == CEPH_MSGR_TAG_READY) 2215 if (con->in_tag == CEPH_MSGR_TAG_READY)
2064 goto more; 2216 goto more;
2065 process_message(con); 2217 process_message(con);
2218 if (con->state == CON_STATE_OPEN)
2219 prepare_read_tag(con);
2066 goto more; 2220 goto more;
2067 } 2221 }
2068 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 2222 if (con->in_tag == CEPH_MSGR_TAG_ACK) {
@@ -2091,12 +2245,6 @@ bad_tag:
2091 */ 2245 */
2092static void queue_con(struct ceph_connection *con) 2246static void queue_con(struct ceph_connection *con)
2093{ 2247{
2094 if (test_bit(DEAD, &con->state)) {
2095 dout("queue_con %p ignoring: DEAD\n",
2096 con);
2097 return;
2098 }
2099
2100 if (!con->ops->get(con)) { 2248 if (!con->ops->get(con)) {
2101 dout("queue_con %p ref count 0\n", con); 2249 dout("queue_con %p ref count 0\n", con);
2102 return; 2250 return;
@@ -2121,7 +2269,26 @@ static void con_work(struct work_struct *work)
2121 2269
2122 mutex_lock(&con->mutex); 2270 mutex_lock(&con->mutex);
2123restart: 2271restart:
2124 if (test_and_clear_bit(BACKOFF, &con->state)) { 2272 if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
2273 switch (con->state) {
2274 case CON_STATE_CONNECTING:
2275 con->error_msg = "connection failed";
2276 break;
2277 case CON_STATE_NEGOTIATING:
2278 con->error_msg = "negotiation failed";
2279 break;
2280 case CON_STATE_OPEN:
2281 con->error_msg = "socket closed";
2282 break;
2283 default:
2284 dout("unrecognized con state %d\n", (int)con->state);
2285 con->error_msg = "unrecognized con state";
2286 BUG();
2287 }
2288 goto fault;
2289 }
2290
2291 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
2125 dout("con_work %p backing off\n", con); 2292 dout("con_work %p backing off\n", con);
2126 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2293 if (queue_delayed_work(ceph_msgr_wq, &con->work,
2127 round_jiffies_relative(con->delay))) { 2294 round_jiffies_relative(con->delay))) {
@@ -2135,35 +2302,35 @@ restart:
2135 } 2302 }
2136 } 2303 }
2137 2304
2138 if (test_bit(STANDBY, &con->state)) { 2305 if (con->state == CON_STATE_STANDBY) {
2139 dout("con_work %p STANDBY\n", con); 2306 dout("con_work %p STANDBY\n", con);
2140 goto done; 2307 goto done;
2141 } 2308 }
2142 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 2309 if (con->state == CON_STATE_CLOSED) {
2143 dout("con_work CLOSED\n"); 2310 dout("con_work %p CLOSED\n", con);
2144 con_close_socket(con); 2311 BUG_ON(con->sock);
2145 goto done; 2312 goto done;
2146 } 2313 }
2147 if (test_and_clear_bit(OPENING, &con->state)) { 2314 if (con->state == CON_STATE_PREOPEN) {
2148 /* reopen w/ new peer */
2149 dout("con_work OPENING\n"); 2315 dout("con_work OPENING\n");
2150 con_close_socket(con); 2316 BUG_ON(con->sock);
2151 } 2317 }
2152 2318
2153 if (test_and_clear_bit(SOCK_CLOSED, &con->state))
2154 goto fault;
2155
2156 ret = try_read(con); 2319 ret = try_read(con);
2157 if (ret == -EAGAIN) 2320 if (ret == -EAGAIN)
2158 goto restart; 2321 goto restart;
2159 if (ret < 0) 2322 if (ret < 0) {
2323 con->error_msg = "socket error on read";
2160 goto fault; 2324 goto fault;
2325 }
2161 2326
2162 ret = try_write(con); 2327 ret = try_write(con);
2163 if (ret == -EAGAIN) 2328 if (ret == -EAGAIN)
2164 goto restart; 2329 goto restart;
2165 if (ret < 0) 2330 if (ret < 0) {
2331 con->error_msg = "socket error on write";
2166 goto fault; 2332 goto fault;
2333 }
2167 2334
2168done: 2335done:
2169 mutex_unlock(&con->mutex); 2336 mutex_unlock(&con->mutex);
@@ -2172,7 +2339,6 @@ done_unlocked:
2172 return; 2339 return;
2173 2340
2174fault: 2341fault:
2175 mutex_unlock(&con->mutex);
2176 ceph_fault(con); /* error/fault path */ 2342 ceph_fault(con); /* error/fault path */
2177 goto done_unlocked; 2343 goto done_unlocked;
2178} 2344}
@@ -2183,26 +2349,31 @@ fault:
2183 * exponential backoff 2349 * exponential backoff
2184 */ 2350 */
2185static void ceph_fault(struct ceph_connection *con) 2351static void ceph_fault(struct ceph_connection *con)
2352 __releases(con->mutex)
2186{ 2353{
2187 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2354 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2188 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2355 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2189 dout("fault %p state %lu to peer %s\n", 2356 dout("fault %p state %lu to peer %s\n",
2190 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2357 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2191 2358
2192 if (test_bit(LOSSYTX, &con->state)) { 2359 BUG_ON(con->state != CON_STATE_CONNECTING &&
2193 dout("fault on LOSSYTX channel\n"); 2360 con->state != CON_STATE_NEGOTIATING &&
2194 goto out; 2361 con->state != CON_STATE_OPEN);
2195 }
2196
2197 mutex_lock(&con->mutex);
2198 if (test_bit(CLOSED, &con->state))
2199 goto out_unlock;
2200 2362
2201 con_close_socket(con); 2363 con_close_socket(con);
2202 2364
2365 if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
2366 dout("fault on LOSSYTX channel, marking CLOSED\n");
2367 con->state = CON_STATE_CLOSED;
2368 goto out_unlock;
2369 }
2370
2203 if (con->in_msg) { 2371 if (con->in_msg) {
2372 BUG_ON(con->in_msg->con != con);
2373 con->in_msg->con = NULL;
2204 ceph_msg_put(con->in_msg); 2374 ceph_msg_put(con->in_msg);
2205 con->in_msg = NULL; 2375 con->in_msg = NULL;
2376 con->ops->put(con);
2206 } 2377 }
2207 2378
2208 /* Requeue anything that hasn't been acked */ 2379 /* Requeue anything that hasn't been acked */
@@ -2211,12 +2382,13 @@ static void ceph_fault(struct ceph_connection *con)
2211 /* If there are no messages queued or keepalive pending, place 2382 /* If there are no messages queued or keepalive pending, place
2212 * the connection in a STANDBY state */ 2383 * the connection in a STANDBY state */
2213 if (list_empty(&con->out_queue) && 2384 if (list_empty(&con->out_queue) &&
2214 !test_bit(KEEPALIVE_PENDING, &con->state)) { 2385 !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
2215 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2386 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2216 clear_bit(WRITE_PENDING, &con->state); 2387 clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
2217 set_bit(STANDBY, &con->state); 2388 con->state = CON_STATE_STANDBY;
2218 } else { 2389 } else {
2219 /* retry after a delay. */ 2390 /* retry after a delay. */
2391 con->state = CON_STATE_PREOPEN;
2220 if (con->delay == 0) 2392 if (con->delay == 0)
2221 con->delay = BASE_DELAY_INTERVAL; 2393 con->delay = BASE_DELAY_INTERVAL;
2222 else if (con->delay < MAX_DELAY_INTERVAL) 2394 else if (con->delay < MAX_DELAY_INTERVAL)
@@ -2237,13 +2409,12 @@ static void ceph_fault(struct ceph_connection *con)
2237 * that when con_work restarts we schedule the 2409 * that when con_work restarts we schedule the
2238 * delay then. 2410 * delay then.
2239 */ 2411 */
2240 set_bit(BACKOFF, &con->state); 2412 set_bit(CON_FLAG_BACKOFF, &con->flags);
2241 } 2413 }
2242 } 2414 }
2243 2415
2244out_unlock: 2416out_unlock:
2245 mutex_unlock(&con->mutex); 2417 mutex_unlock(&con->mutex);
2246out:
2247 /* 2418 /*
2248 * in case we faulted due to authentication, invalidate our 2419 * in case we faulted due to authentication, invalidate our
2249 * current tickets so that we can get new ones. 2420 * current tickets so that we can get new ones.
@@ -2260,18 +2431,14 @@ out:
2260 2431
2261 2432
2262/* 2433/*
2263 * create a new messenger instance 2434 * initialize a new messenger instance
2264 */ 2435 */
2265struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, 2436void ceph_messenger_init(struct ceph_messenger *msgr,
2266 u32 supported_features, 2437 struct ceph_entity_addr *myaddr,
2267 u32 required_features) 2438 u32 supported_features,
2439 u32 required_features,
2440 bool nocrc)
2268{ 2441{
2269 struct ceph_messenger *msgr;
2270
2271 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
2272 if (msgr == NULL)
2273 return ERR_PTR(-ENOMEM);
2274
2275 msgr->supported_features = supported_features; 2442 msgr->supported_features = supported_features;
2276 msgr->required_features = required_features; 2443 msgr->required_features = required_features;
2277 2444
@@ -2284,30 +2451,23 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
2284 msgr->inst.addr.type = 0; 2451 msgr->inst.addr.type = 0;
2285 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2452 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
2286 encode_my_addr(msgr); 2453 encode_my_addr(msgr);
2454 msgr->nocrc = nocrc;
2287 2455
2288 dout("messenger_create %p\n", msgr); 2456 atomic_set(&msgr->stopping, 0);
2289 return msgr;
2290}
2291EXPORT_SYMBOL(ceph_messenger_create);
2292 2457
2293void ceph_messenger_destroy(struct ceph_messenger *msgr) 2458 dout("%s %p\n", __func__, msgr);
2294{
2295 dout("destroy %p\n", msgr);
2296 kfree(msgr);
2297 dout("destroyed messenger %p\n", msgr);
2298} 2459}
2299EXPORT_SYMBOL(ceph_messenger_destroy); 2460EXPORT_SYMBOL(ceph_messenger_init);
2300 2461
2301static void clear_standby(struct ceph_connection *con) 2462static void clear_standby(struct ceph_connection *con)
2302{ 2463{
2303 /* come back from STANDBY? */ 2464 /* come back from STANDBY? */
2304 if (test_and_clear_bit(STANDBY, &con->state)) { 2465 if (con->state == CON_STATE_STANDBY) {
2305 mutex_lock(&con->mutex);
2306 dout("clear_standby %p and ++connect_seq\n", con); 2466 dout("clear_standby %p and ++connect_seq\n", con);
2467 con->state = CON_STATE_PREOPEN;
2307 con->connect_seq++; 2468 con->connect_seq++;
2308 WARN_ON(test_bit(WRITE_PENDING, &con->state)); 2469 WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
2309 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); 2470 WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
2310 mutex_unlock(&con->mutex);
2311 } 2471 }
2312} 2472}
2313 2473
@@ -2316,21 +2476,24 @@ static void clear_standby(struct ceph_connection *con)
2316 */ 2476 */
2317void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 2477void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2318{ 2478{
2319 if (test_bit(CLOSED, &con->state)) {
2320 dout("con_send %p closed, dropping %p\n", con, msg);
2321 ceph_msg_put(msg);
2322 return;
2323 }
2324
2325 /* set src+dst */ 2479 /* set src+dst */
2326 msg->hdr.src = con->msgr->inst.name; 2480 msg->hdr.src = con->msgr->inst.name;
2327
2328 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 2481 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
2329
2330 msg->needs_out_seq = true; 2482 msg->needs_out_seq = true;
2331 2483
2332 /* queue */
2333 mutex_lock(&con->mutex); 2484 mutex_lock(&con->mutex);
2485
2486 if (con->state == CON_STATE_CLOSED) {
2487 dout("con_send %p closed, dropping %p\n", con, msg);
2488 ceph_msg_put(msg);
2489 mutex_unlock(&con->mutex);
2490 return;
2491 }
2492
2493 BUG_ON(msg->con != NULL);
2494 msg->con = con->ops->get(con);
2495 BUG_ON(msg->con == NULL);
2496
2334 BUG_ON(!list_empty(&msg->list_head)); 2497 BUG_ON(!list_empty(&msg->list_head));
2335 list_add_tail(&msg->list_head, &con->out_queue); 2498 list_add_tail(&msg->list_head, &con->out_queue);
2336 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2499 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -2339,12 +2502,13 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2339 le32_to_cpu(msg->hdr.front_len), 2502 le32_to_cpu(msg->hdr.front_len),
2340 le32_to_cpu(msg->hdr.middle_len), 2503 le32_to_cpu(msg->hdr.middle_len),
2341 le32_to_cpu(msg->hdr.data_len)); 2504 le32_to_cpu(msg->hdr.data_len));
2505
2506 clear_standby(con);
2342 mutex_unlock(&con->mutex); 2507 mutex_unlock(&con->mutex);
2343 2508
2344 /* if there wasn't anything waiting to send before, queue 2509 /* if there wasn't anything waiting to send before, queue
2345 * new work */ 2510 * new work */
2346 clear_standby(con); 2511 if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
2347 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
2348 queue_con(con); 2512 queue_con(con);
2349} 2513}
2350EXPORT_SYMBOL(ceph_con_send); 2514EXPORT_SYMBOL(ceph_con_send);
@@ -2352,24 +2516,34 @@ EXPORT_SYMBOL(ceph_con_send);
2352/* 2516/*
2353 * Revoke a message that was previously queued for send 2517 * Revoke a message that was previously queued for send
2354 */ 2518 */
2355void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 2519void ceph_msg_revoke(struct ceph_msg *msg)
2356{ 2520{
2521 struct ceph_connection *con = msg->con;
2522
2523 if (!con)
2524 return; /* Message not in our possession */
2525
2357 mutex_lock(&con->mutex); 2526 mutex_lock(&con->mutex);
2358 if (!list_empty(&msg->list_head)) { 2527 if (!list_empty(&msg->list_head)) {
2359 dout("con_revoke %p msg %p - was on queue\n", con, msg); 2528 dout("%s %p msg %p - was on queue\n", __func__, con, msg);
2360 list_del_init(&msg->list_head); 2529 list_del_init(&msg->list_head);
2361 ceph_msg_put(msg); 2530 BUG_ON(msg->con == NULL);
2531 msg->con->ops->put(msg->con);
2532 msg->con = NULL;
2362 msg->hdr.seq = 0; 2533 msg->hdr.seq = 0;
2534
2535 ceph_msg_put(msg);
2363 } 2536 }
2364 if (con->out_msg == msg) { 2537 if (con->out_msg == msg) {
2365 dout("con_revoke %p msg %p - was sending\n", con, msg); 2538 dout("%s %p msg %p - was sending\n", __func__, con, msg);
2366 con->out_msg = NULL; 2539 con->out_msg = NULL;
2367 if (con->out_kvec_is_msg) { 2540 if (con->out_kvec_is_msg) {
2368 con->out_skip = con->out_kvec_bytes; 2541 con->out_skip = con->out_kvec_bytes;
2369 con->out_kvec_is_msg = false; 2542 con->out_kvec_is_msg = false;
2370 } 2543 }
2371 ceph_msg_put(msg);
2372 msg->hdr.seq = 0; 2544 msg->hdr.seq = 0;
2545
2546 ceph_msg_put(msg);
2373 } 2547 }
2374 mutex_unlock(&con->mutex); 2548 mutex_unlock(&con->mutex);
2375} 2549}
@@ -2377,17 +2551,27 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
2377/* 2551/*
2378 * Revoke a message that we may be reading data into 2552 * Revoke a message that we may be reading data into
2379 */ 2553 */
2380void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) 2554void ceph_msg_revoke_incoming(struct ceph_msg *msg)
2381{ 2555{
2556 struct ceph_connection *con;
2557
2558 BUG_ON(msg == NULL);
2559 if (!msg->con) {
2560 dout("%s msg %p null con\n", __func__, msg);
2561
2562 return; /* Message not in our possession */
2563 }
2564
2565 con = msg->con;
2382 mutex_lock(&con->mutex); 2566 mutex_lock(&con->mutex);
2383 if (con->in_msg && con->in_msg == msg) { 2567 if (con->in_msg == msg) {
2384 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); 2568 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
2385 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); 2569 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
2386 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); 2570 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
2387 2571
2388 /* skip rest of message */ 2572 /* skip rest of message */
2389 dout("con_revoke_pages %p msg %p revoked\n", con, msg); 2573 dout("%s %p msg %p revoked\n", __func__, con, msg);
2390 con->in_base_pos = con->in_base_pos - 2574 con->in_base_pos = con->in_base_pos -
2391 sizeof(struct ceph_msg_header) - 2575 sizeof(struct ceph_msg_header) -
2392 front_len - 2576 front_len -
2393 middle_len - 2577 middle_len -
@@ -2398,8 +2582,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
2398 con->in_tag = CEPH_MSGR_TAG_READY; 2582 con->in_tag = CEPH_MSGR_TAG_READY;
2399 con->in_seq++; 2583 con->in_seq++;
2400 } else { 2584 } else {
2401 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2585 dout("%s %p in_msg %p msg %p no-op\n",
2402 con, con->in_msg, msg); 2586 __func__, con, con->in_msg, msg);
2403 } 2587 }
2404 mutex_unlock(&con->mutex); 2588 mutex_unlock(&con->mutex);
2405} 2589}
@@ -2410,9 +2594,11 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
2410void ceph_con_keepalive(struct ceph_connection *con) 2594void ceph_con_keepalive(struct ceph_connection *con)
2411{ 2595{
2412 dout("con_keepalive %p\n", con); 2596 dout("con_keepalive %p\n", con);
2597 mutex_lock(&con->mutex);
2413 clear_standby(con); 2598 clear_standby(con);
2414 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2599 mutex_unlock(&con->mutex);
2415 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2600 if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
2601 test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
2416 queue_con(con); 2602 queue_con(con);
2417} 2603}
2418EXPORT_SYMBOL(ceph_con_keepalive); 2604EXPORT_SYMBOL(ceph_con_keepalive);
@@ -2431,6 +2617,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
2431 if (m == NULL) 2617 if (m == NULL)
2432 goto out; 2618 goto out;
2433 kref_init(&m->kref); 2619 kref_init(&m->kref);
2620
2621 m->con = NULL;
2434 INIT_LIST_HEAD(&m->list_head); 2622 INIT_LIST_HEAD(&m->list_head);
2435 2623
2436 m->hdr.tid = 0; 2624 m->hdr.tid = 0;
@@ -2526,46 +2714,77 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
2526} 2714}
2527 2715
2528/* 2716/*
2529 * Generic message allocator, for incoming messages. 2717 * Allocate a message for receiving an incoming message on a
2718 * connection, and save the result in con->in_msg. Uses the
2719 * connection's private alloc_msg op if available.
2720 *
2721 * Returns 0 on success, or a negative error code.
2722 *
2723 * On success, if we set *skip = 1:
2724 * - the next message should be skipped and ignored.
2725 * - con->in_msg == NULL
2726 * or if we set *skip = 0:
2727 * - con->in_msg is non-null.
2728 * On error (ENOMEM, EAGAIN, ...),
2729 * - con->in_msg == NULL
2530 */ 2730 */
2531static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 2731static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
2532 struct ceph_msg_header *hdr,
2533 int *skip)
2534{ 2732{
2733 struct ceph_msg_header *hdr = &con->in_hdr;
2535 int type = le16_to_cpu(hdr->type); 2734 int type = le16_to_cpu(hdr->type);
2536 int front_len = le32_to_cpu(hdr->front_len); 2735 int front_len = le32_to_cpu(hdr->front_len);
2537 int middle_len = le32_to_cpu(hdr->middle_len); 2736 int middle_len = le32_to_cpu(hdr->middle_len);
2538 struct ceph_msg *msg = NULL; 2737 int ret = 0;
2539 int ret; 2738
2739 BUG_ON(con->in_msg != NULL);
2540 2740
2541 if (con->ops->alloc_msg) { 2741 if (con->ops->alloc_msg) {
2742 struct ceph_msg *msg;
2743
2542 mutex_unlock(&con->mutex); 2744 mutex_unlock(&con->mutex);
2543 msg = con->ops->alloc_msg(con, hdr, skip); 2745 msg = con->ops->alloc_msg(con, hdr, skip);
2544 mutex_lock(&con->mutex); 2746 mutex_lock(&con->mutex);
2545 if (!msg || *skip) 2747 if (con->state != CON_STATE_OPEN) {
2546 return NULL; 2748 ceph_msg_put(msg);
2749 return -EAGAIN;
2750 }
2751 con->in_msg = msg;
2752 if (con->in_msg) {
2753 con->in_msg->con = con->ops->get(con);
2754 BUG_ON(con->in_msg->con == NULL);
2755 }
2756 if (*skip) {
2757 con->in_msg = NULL;
2758 return 0;
2759 }
2760 if (!con->in_msg) {
2761 con->error_msg =
2762 "error allocating memory for incoming message";
2763 return -ENOMEM;
2764 }
2547 } 2765 }
2548 if (!msg) { 2766 if (!con->in_msg) {
2549 *skip = 0; 2767 con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
2550 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 2768 if (!con->in_msg) {
2551 if (!msg) {
2552 pr_err("unable to allocate msg type %d len %d\n", 2769 pr_err("unable to allocate msg type %d len %d\n",
2553 type, front_len); 2770 type, front_len);
2554 return NULL; 2771 return -ENOMEM;
2555 } 2772 }
2556 msg->page_alignment = le16_to_cpu(hdr->data_off); 2773 con->in_msg->con = con->ops->get(con);
2774 BUG_ON(con->in_msg->con == NULL);
2775 con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
2557 } 2776 }
2558 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2777 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
2559 2778
2560 if (middle_len && !msg->middle) { 2779 if (middle_len && !con->in_msg->middle) {
2561 ret = ceph_alloc_middle(con, msg); 2780 ret = ceph_alloc_middle(con, con->in_msg);
2562 if (ret < 0) { 2781 if (ret < 0) {
2563 ceph_msg_put(msg); 2782 ceph_msg_put(con->in_msg);
2564 return NULL; 2783 con->in_msg = NULL;
2565 } 2784 }
2566 } 2785 }
2567 2786
2568 return msg; 2787 return ret;
2569} 2788}
2570 2789
2571 2790
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index d0649a9655be..105d533b55f3 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
106 monc->pending_auth = 1; 106 monc->pending_auth = 1;
107 monc->m_auth->front.iov_len = len; 107 monc->m_auth->front.iov_len = len;
108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 108 monc->m_auth->hdr.front_len = cpu_to_le32(len);
109 ceph_con_revoke(monc->con, monc->m_auth); 109 ceph_msg_revoke(monc->m_auth);
110 ceph_msg_get(monc->m_auth); /* keep our ref */ 110 ceph_msg_get(monc->m_auth); /* keep our ref */
111 ceph_con_send(monc->con, monc->m_auth); 111 ceph_con_send(&monc->con, monc->m_auth);
112} 112}
113 113
114/* 114/*
@@ -117,8 +117,11 @@ static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
117static void __close_session(struct ceph_mon_client *monc) 117static void __close_session(struct ceph_mon_client *monc)
118{ 118{
119 dout("__close_session closing mon%d\n", monc->cur_mon); 119 dout("__close_session closing mon%d\n", monc->cur_mon);
120 ceph_con_revoke(monc->con, monc->m_auth); 120 ceph_msg_revoke(monc->m_auth);
121 ceph_con_close(monc->con); 121 ceph_msg_revoke_incoming(monc->m_auth_reply);
122 ceph_msg_revoke(monc->m_subscribe);
123 ceph_msg_revoke_incoming(monc->m_subscribe_ack);
124 ceph_con_close(&monc->con);
122 monc->cur_mon = -1; 125 monc->cur_mon = -1;
123 monc->pending_auth = 0; 126 monc->pending_auth = 0;
124 ceph_auth_reset(monc->auth); 127 ceph_auth_reset(monc->auth);
@@ -142,9 +145,8 @@ static int __open_session(struct ceph_mon_client *monc)
142 monc->want_next_osdmap = !!monc->want_next_osdmap; 145 monc->want_next_osdmap = !!monc->want_next_osdmap;
143 146
144 dout("open_session mon%d opening\n", monc->cur_mon); 147 dout("open_session mon%d opening\n", monc->cur_mon);
145 monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON; 148 ceph_con_open(&monc->con,
146 monc->con->peer_name.num = cpu_to_le64(monc->cur_mon); 149 CEPH_ENTITY_TYPE_MON, monc->cur_mon,
147 ceph_con_open(monc->con,
148 &monc->monmap->mon_inst[monc->cur_mon].addr); 150 &monc->monmap->mon_inst[monc->cur_mon].addr);
149 151
150 /* initiatiate authentication handshake */ 152 /* initiatiate authentication handshake */
@@ -226,8 +228,8 @@ static void __send_subscribe(struct ceph_mon_client *monc)
226 228
227 msg->front.iov_len = p - msg->front.iov_base; 229 msg->front.iov_len = p - msg->front.iov_base;
228 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 230 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
229 ceph_con_revoke(monc->con, msg); 231 ceph_msg_revoke(msg);
230 ceph_con_send(monc->con, ceph_msg_get(msg)); 232 ceph_con_send(&monc->con, ceph_msg_get(msg));
231 233
232 monc->sub_sent = jiffies | 1; /* never 0 */ 234 monc->sub_sent = jiffies | 1; /* never 0 */
233 } 235 }
@@ -247,7 +249,7 @@ static void handle_subscribe_ack(struct ceph_mon_client *monc,
247 if (monc->hunting) { 249 if (monc->hunting) {
248 pr_info("mon%d %s session established\n", 250 pr_info("mon%d %s session established\n",
249 monc->cur_mon, 251 monc->cur_mon,
250 ceph_pr_addr(&monc->con->peer_addr.in_addr)); 252 ceph_pr_addr(&monc->con.peer_addr.in_addr));
251 monc->hunting = false; 253 monc->hunting = false;
252 } 254 }
253 dout("handle_subscribe_ack after %d seconds\n", seconds); 255 dout("handle_subscribe_ack after %d seconds\n", seconds);
@@ -439,6 +441,7 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
439 m = NULL; 441 m = NULL;
440 } else { 442 } else {
441 dout("get_generic_reply %lld got %p\n", tid, req->reply); 443 dout("get_generic_reply %lld got %p\n", tid, req->reply);
444 *skip = 0;
442 m = ceph_msg_get(req->reply); 445 m = ceph_msg_get(req->reply);
443 /* 446 /*
444 * we don't need to track the connection reading into 447 * we don't need to track the connection reading into
@@ -461,7 +464,7 @@ static int do_generic_request(struct ceph_mon_client *monc,
461 req->request->hdr.tid = cpu_to_le64(req->tid); 464 req->request->hdr.tid = cpu_to_le64(req->tid);
462 __insert_generic_request(monc, req); 465 __insert_generic_request(monc, req);
463 monc->num_generic_requests++; 466 monc->num_generic_requests++;
464 ceph_con_send(monc->con, ceph_msg_get(req->request)); 467 ceph_con_send(&monc->con, ceph_msg_get(req->request));
465 mutex_unlock(&monc->mutex); 468 mutex_unlock(&monc->mutex);
466 469
467 err = wait_for_completion_interruptible(&req->completion); 470 err = wait_for_completion_interruptible(&req->completion);
@@ -684,8 +687,9 @@ static void __resend_generic_request(struct ceph_mon_client *monc)
684 687
685 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 688 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
686 req = rb_entry(p, struct ceph_mon_generic_request, node); 689 req = rb_entry(p, struct ceph_mon_generic_request, node);
687 ceph_con_revoke(monc->con, req->request); 690 ceph_msg_revoke(req->request);
688 ceph_con_send(monc->con, ceph_msg_get(req->request)); 691 ceph_msg_revoke_incoming(req->reply);
692 ceph_con_send(&monc->con, ceph_msg_get(req->request));
689 } 693 }
690} 694}
691 695
@@ -705,7 +709,7 @@ static void delayed_work(struct work_struct *work)
705 __close_session(monc); 709 __close_session(monc);
706 __open_session(monc); /* continue hunting */ 710 __open_session(monc); /* continue hunting */
707 } else { 711 } else {
708 ceph_con_keepalive(monc->con); 712 ceph_con_keepalive(&monc->con);
709 713
710 __validate_auth(monc); 714 __validate_auth(monc);
711 715
@@ -760,19 +764,12 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
760 goto out; 764 goto out;
761 765
762 /* connection */ 766 /* connection */
763 monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
764 if (!monc->con)
765 goto out_monmap;
766 ceph_con_init(monc->client->msgr, monc->con);
767 monc->con->private = monc;
768 monc->con->ops = &mon_con_ops;
769
770 /* authentication */ 767 /* authentication */
771 monc->auth = ceph_auth_init(cl->options->name, 768 monc->auth = ceph_auth_init(cl->options->name,
772 cl->options->key); 769 cl->options->key);
773 if (IS_ERR(monc->auth)) { 770 if (IS_ERR(monc->auth)) {
774 err = PTR_ERR(monc->auth); 771 err = PTR_ERR(monc->auth);
775 goto out_con; 772 goto out_monmap;
776 } 773 }
777 monc->auth->want_keys = 774 monc->auth->want_keys =
778 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 775 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
@@ -801,6 +798,9 @@ int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
801 if (!monc->m_auth) 798 if (!monc->m_auth)
802 goto out_auth_reply; 799 goto out_auth_reply;
803 800
801 ceph_con_init(&monc->con, monc, &mon_con_ops,
802 &monc->client->msgr);
803
804 monc->cur_mon = -1; 804 monc->cur_mon = -1;
805 monc->hunting = true; 805 monc->hunting = true;
806 monc->sub_renew_after = jiffies; 806 monc->sub_renew_after = jiffies;
@@ -824,8 +824,6 @@ out_subscribe_ack:
824 ceph_msg_put(monc->m_subscribe_ack); 824 ceph_msg_put(monc->m_subscribe_ack);
825out_auth: 825out_auth:
826 ceph_auth_destroy(monc->auth); 826 ceph_auth_destroy(monc->auth);
827out_con:
828 monc->con->ops->put(monc->con);
829out_monmap: 827out_monmap:
830 kfree(monc->monmap); 828 kfree(monc->monmap);
831out: 829out:
@@ -841,10 +839,6 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
841 mutex_lock(&monc->mutex); 839 mutex_lock(&monc->mutex);
842 __close_session(monc); 840 __close_session(monc);
843 841
844 monc->con->private = NULL;
845 monc->con->ops->put(monc->con);
846 monc->con = NULL;
847
848 mutex_unlock(&monc->mutex); 842 mutex_unlock(&monc->mutex);
849 843
850 /* 844 /*
@@ -888,8 +882,8 @@ static void handle_auth_reply(struct ceph_mon_client *monc,
888 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { 882 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
889 dout("authenticated, starting session\n"); 883 dout("authenticated, starting session\n");
890 884
891 monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 885 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
892 monc->client->msgr->inst.name.num = 886 monc->client->msgr.inst.name.num =
893 cpu_to_le64(monc->auth->global_id); 887 cpu_to_le64(monc->auth->global_id);
894 888
895 __send_subscribe(monc); 889 __send_subscribe(monc);
@@ -1000,6 +994,8 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1000 case CEPH_MSG_MDS_MAP: 994 case CEPH_MSG_MDS_MAP:
1001 case CEPH_MSG_OSD_MAP: 995 case CEPH_MSG_OSD_MAP:
1002 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 996 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
997 if (!m)
998 return NULL; /* ENOMEM--return skip == 0 */
1003 break; 999 break;
1004 } 1000 }
1005 1001
@@ -1029,7 +1025,7 @@ static void mon_fault(struct ceph_connection *con)
1029 if (!monc->hunting) 1025 if (!monc->hunting)
1030 pr_info("mon%d %s session lost, " 1026 pr_info("mon%d %s session lost, "
1031 "hunting for new mon\n", monc->cur_mon, 1027 "hunting for new mon\n", monc->cur_mon,
1032 ceph_pr_addr(&monc->con->peer_addr.in_addr)); 1028 ceph_pr_addr(&monc->con.peer_addr.in_addr));
1033 1029
1034 __close_session(monc); 1030 __close_session(monc);
1035 if (!monc->hunting) { 1031 if (!monc->hunting) {
@@ -1044,9 +1040,23 @@ out:
1044 mutex_unlock(&monc->mutex); 1040 mutex_unlock(&monc->mutex);
1045} 1041}
1046 1042
1043/*
1044 * We can ignore refcounting on the connection struct, as all references
1045 * will come from the messenger workqueue, which is drained prior to
1046 * mon_client destruction.
1047 */
1048static struct ceph_connection *con_get(struct ceph_connection *con)
1049{
1050 return con;
1051}
1052
1053static void con_put(struct ceph_connection *con)
1054{
1055}
1056
1047static const struct ceph_connection_operations mon_con_ops = { 1057static const struct ceph_connection_operations mon_con_ops = {
1048 .get = ceph_con_get, 1058 .get = con_get,
1049 .put = ceph_con_put, 1059 .put = con_put,
1050 .dispatch = dispatch, 1060 .dispatch = dispatch,
1051 .fault = mon_fault, 1061 .fault = mon_fault,
1052 .alloc_msg = mon_alloc_msg, 1062 .alloc_msg = mon_alloc_msg,
diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c
index 11d5f4196a73..ddec1c10ac80 100644
--- a/net/ceph/msgpool.c
+++ b/net/ceph/msgpool.c
@@ -12,7 +12,7 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
12 struct ceph_msgpool *pool = arg; 12 struct ceph_msgpool *pool = arg;
13 struct ceph_msg *msg; 13 struct ceph_msg *msg;
14 14
15 msg = ceph_msg_new(0, pool->front_len, gfp_mask, true); 15 msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true);
16 if (!msg) { 16 if (!msg) {
17 dout("msgpool_alloc %s failed\n", pool->name); 17 dout("msgpool_alloc %s failed\n", pool->name);
18 } else { 18 } else {
@@ -32,10 +32,11 @@ static void msgpool_free(void *element, void *arg)
32 ceph_msg_put(msg); 32 ceph_msg_put(msg);
33} 33}
34 34
35int ceph_msgpool_init(struct ceph_msgpool *pool, 35int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
36 int front_len, int size, bool blocking, const char *name) 36 int front_len, int size, bool blocking, const char *name)
37{ 37{
38 dout("msgpool %s init\n", name); 38 dout("msgpool %s init\n", name);
39 pool->type = type;
39 pool->front_len = front_len; 40 pool->front_len = front_len;
40 pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); 41 pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool);
41 if (!pool->pool) 42 if (!pool->pool)
@@ -61,7 +62,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
61 WARN_ON(1); 62 WARN_ON(1);
62 63
63 /* try to alloc a fresh message */ 64 /* try to alloc a fresh message */
64 return ceph_msg_new(0, front_len, GFP_NOFS, false); 65 return ceph_msg_new(pool->type, front_len, GFP_NOFS, false);
65 } 66 }
66 67
67 msg = mempool_alloc(pool->pool, GFP_NOFS); 68 msg = mempool_alloc(pool->pool, GFP_NOFS);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index ca59e66c9787..42119c05e82c 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref)
140 if (req->r_request) 140 if (req->r_request)
141 ceph_msg_put(req->r_request); 141 ceph_msg_put(req->r_request);
142 if (req->r_con_filling_msg) { 142 if (req->r_con_filling_msg) {
143 dout("release_request revoking pages %p from con %p\n", 143 dout("%s revoking pages %p from con %p\n", __func__,
144 req->r_pages, req->r_con_filling_msg); 144 req->r_pages, req->r_con_filling_msg);
145 ceph_con_revoke_message(req->r_con_filling_msg, 145 ceph_msg_revoke_incoming(req->r_reply);
146 req->r_reply);
147 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 146 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
148 } 147 }
149 if (req->r_reply) 148 if (req->r_reply)
@@ -214,10 +213,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
214 kref_init(&req->r_kref); 213 kref_init(&req->r_kref);
215 init_completion(&req->r_completion); 214 init_completion(&req->r_completion);
216 init_completion(&req->r_safe_completion); 215 init_completion(&req->r_safe_completion);
216 rb_init_node(&req->r_node);
217 INIT_LIST_HEAD(&req->r_unsafe_item); 217 INIT_LIST_HEAD(&req->r_unsafe_item);
218 INIT_LIST_HEAD(&req->r_linger_item); 218 INIT_LIST_HEAD(&req->r_linger_item);
219 INIT_LIST_HEAD(&req->r_linger_osd); 219 INIT_LIST_HEAD(&req->r_linger_osd);
220 INIT_LIST_HEAD(&req->r_req_lru_item); 220 INIT_LIST_HEAD(&req->r_req_lru_item);
221 INIT_LIST_HEAD(&req->r_osd_item);
222
221 req->r_flags = flags; 223 req->r_flags = flags;
222 224
223 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 225 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -243,6 +245,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
243 } 245 }
244 ceph_pagelist_init(req->r_trail); 246 ceph_pagelist_init(req->r_trail);
245 } 247 }
248
246 /* create request message; allow space for oid */ 249 /* create request message; allow space for oid */
247 msg_size += MAX_OBJ_NAME_SIZE; 250 msg_size += MAX_OBJ_NAME_SIZE;
248 if (snapc) 251 if (snapc)
@@ -256,7 +259,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
256 return NULL; 259 return NULL;
257 } 260 }
258 261
259 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
260 memset(msg->front.iov_base, 0, msg->front.iov_len); 262 memset(msg->front.iov_base, 0, msg->front.iov_len);
261 263
262 req->r_request = msg; 264 req->r_request = msg;
@@ -624,7 +626,7 @@ static void osd_reset(struct ceph_connection *con)
624/* 626/*
625 * Track open sessions with osds. 627 * Track open sessions with osds.
626 */ 628 */
627static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) 629static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
628{ 630{
629 struct ceph_osd *osd; 631 struct ceph_osd *osd;
630 632
@@ -634,15 +636,13 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
634 636
635 atomic_set(&osd->o_ref, 1); 637 atomic_set(&osd->o_ref, 1);
636 osd->o_osdc = osdc; 638 osd->o_osdc = osdc;
639 osd->o_osd = onum;
637 INIT_LIST_HEAD(&osd->o_requests); 640 INIT_LIST_HEAD(&osd->o_requests);
638 INIT_LIST_HEAD(&osd->o_linger_requests); 641 INIT_LIST_HEAD(&osd->o_linger_requests);
639 INIT_LIST_HEAD(&osd->o_osd_lru); 642 INIT_LIST_HEAD(&osd->o_osd_lru);
640 osd->o_incarnation = 1; 643 osd->o_incarnation = 1;
641 644
642 ceph_con_init(osdc->client->msgr, &osd->o_con); 645 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
643 osd->o_con.private = osd;
644 osd->o_con.ops = &osd_con_ops;
645 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
646 646
647 INIT_LIST_HEAD(&osd->o_keepalive_item); 647 INIT_LIST_HEAD(&osd->o_keepalive_item);
648 return osd; 648 return osd;
@@ -688,7 +688,7 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
688 688
689static void remove_all_osds(struct ceph_osd_client *osdc) 689static void remove_all_osds(struct ceph_osd_client *osdc)
690{ 690{
691 dout("__remove_old_osds %p\n", osdc); 691 dout("%s %p\n", __func__, osdc);
692 mutex_lock(&osdc->request_mutex); 692 mutex_lock(&osdc->request_mutex);
693 while (!RB_EMPTY_ROOT(&osdc->osds)) { 693 while (!RB_EMPTY_ROOT(&osdc->osds)) {
694 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 694 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
@@ -752,7 +752,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
752 ret = -EAGAIN; 752 ret = -EAGAIN;
753 } else { 753 } else {
754 ceph_con_close(&osd->o_con); 754 ceph_con_close(&osd->o_con);
755 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]); 755 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
756 &osdc->osdmap->osd_addr[osd->o_osd]);
756 osd->o_incarnation++; 757 osd->o_incarnation++;
757 } 758 }
758 return ret; 759 return ret;
@@ -853,7 +854,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
853 854
854 if (req->r_osd) { 855 if (req->r_osd) {
855 /* make sure the original request isn't in flight. */ 856 /* make sure the original request isn't in flight. */
856 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 857 ceph_msg_revoke(req->r_request);
857 858
858 list_del_init(&req->r_osd_item); 859 list_del_init(&req->r_osd_item);
859 if (list_empty(&req->r_osd->o_requests) && 860 if (list_empty(&req->r_osd->o_requests) &&
@@ -880,7 +881,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
880static void __cancel_request(struct ceph_osd_request *req) 881static void __cancel_request(struct ceph_osd_request *req)
881{ 882{
882 if (req->r_sent && req->r_osd) { 883 if (req->r_sent && req->r_osd) {
883 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 884 ceph_msg_revoke(req->r_request);
884 req->r_sent = 0; 885 req->r_sent = 0;
885 } 886 }
886} 887}
@@ -890,7 +891,9 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
890{ 891{
891 dout("__register_linger_request %p\n", req); 892 dout("__register_linger_request %p\n", req);
892 list_add_tail(&req->r_linger_item, &osdc->req_linger); 893 list_add_tail(&req->r_linger_item, &osdc->req_linger);
893 list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests); 894 if (req->r_osd)
895 list_add_tail(&req->r_linger_osd,
896 &req->r_osd->o_linger_requests);
894} 897}
895 898
896static void __unregister_linger_request(struct ceph_osd_client *osdc, 899static void __unregister_linger_request(struct ceph_osd_client *osdc,
@@ -998,18 +1001,18 @@ static int __map_request(struct ceph_osd_client *osdc,
998 req->r_osd = __lookup_osd(osdc, o); 1001 req->r_osd = __lookup_osd(osdc, o);
999 if (!req->r_osd && o >= 0) { 1002 if (!req->r_osd && o >= 0) {
1000 err = -ENOMEM; 1003 err = -ENOMEM;
1001 req->r_osd = create_osd(osdc); 1004 req->r_osd = create_osd(osdc, o);
1002 if (!req->r_osd) { 1005 if (!req->r_osd) {
1003 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1006 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1004 goto out; 1007 goto out;
1005 } 1008 }
1006 1009
1007 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1010 dout("map_request osd %p is osd%d\n", req->r_osd, o);
1008 req->r_osd->o_osd = o;
1009 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
1010 __insert_osd(osdc, req->r_osd); 1011 __insert_osd(osdc, req->r_osd);
1011 1012
1012 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]); 1013 ceph_con_open(&req->r_osd->o_con,
1014 CEPH_ENTITY_TYPE_OSD, o,
1015 &osdc->osdmap->osd_addr[o]);
1013 } 1016 }
1014 1017
1015 if (req->r_osd) { 1018 if (req->r_osd) {
@@ -1304,8 +1307,9 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1304 1307
1305 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1308 dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1306 mutex_lock(&osdc->request_mutex); 1309 mutex_lock(&osdc->request_mutex);
1307 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 1310 for (p = rb_first(&osdc->requests); p; ) {
1308 req = rb_entry(p, struct ceph_osd_request, r_node); 1311 req = rb_entry(p, struct ceph_osd_request, r_node);
1312 p = rb_next(p);
1309 err = __map_request(osdc, req, force_resend); 1313 err = __map_request(osdc, req, force_resend);
1310 if (err < 0) 1314 if (err < 0)
1311 continue; /* error */ 1315 continue; /* error */
@@ -1313,10 +1317,23 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1313 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1317 dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1314 needmap++; /* request a newer map */ 1318 needmap++; /* request a newer map */
1315 } else if (err > 0) { 1319 } else if (err > 0) {
1316 dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, 1320 if (!req->r_linger) {
1317 req->r_osd ? req->r_osd->o_osd : -1); 1321 dout("%p tid %llu requeued on osd%d\n", req,
1318 if (!req->r_linger) 1322 req->r_tid,
1323 req->r_osd ? req->r_osd->o_osd : -1);
1319 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1324 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1325 }
1326 }
1327 if (req->r_linger && list_empty(&req->r_linger_item)) {
1328 /*
1329 * register as a linger so that we will
1330 * re-submit below and get a new tid
1331 */
1332 dout("%p tid %llu restart on osd%d\n",
1333 req, req->r_tid,
1334 req->r_osd ? req->r_osd->o_osd : -1);
1335 __register_linger_request(osdc, req);
1336 __unregister_request(osdc, req);
1320 } 1337 }
1321 } 1338 }
1322 1339
@@ -1391,7 +1408,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1391 epoch, maplen); 1408 epoch, maplen);
1392 newmap = osdmap_apply_incremental(&p, next, 1409 newmap = osdmap_apply_incremental(&p, next,
1393 osdc->osdmap, 1410 osdc->osdmap,
1394 osdc->client->msgr); 1411 &osdc->client->msgr);
1395 if (IS_ERR(newmap)) { 1412 if (IS_ERR(newmap)) {
1396 err = PTR_ERR(newmap); 1413 err = PTR_ERR(newmap);
1397 goto bad; 1414 goto bad;
@@ -1839,11 +1856,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1839 if (!osdc->req_mempool) 1856 if (!osdc->req_mempool)
1840 goto out; 1857 goto out;
1841 1858
1842 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true, 1859 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
1860 OSD_OP_FRONT_LEN, 10, true,
1843 "osd_op"); 1861 "osd_op");
1844 if (err < 0) 1862 if (err < 0)
1845 goto out_mempool; 1863 goto out_mempool;
1846 err = ceph_msgpool_init(&osdc->msgpool_op_reply, 1864 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
1847 OSD_OPREPLY_FRONT_LEN, 10, true, 1865 OSD_OPREPLY_FRONT_LEN, 10, true,
1848 "osd_op_reply"); 1866 "osd_op_reply");
1849 if (err < 0) 1867 if (err < 0)
@@ -2019,15 +2037,15 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
2019 if (!req) { 2037 if (!req) {
2020 *skip = 1; 2038 *skip = 1;
2021 m = NULL; 2039 m = NULL;
2022 pr_info("get_reply unknown tid %llu from osd%d\n", tid, 2040 dout("get_reply unknown tid %llu from osd%d\n", tid,
2023 osd->o_osd); 2041 osd->o_osd);
2024 goto out; 2042 goto out;
2025 } 2043 }
2026 2044
2027 if (req->r_con_filling_msg) { 2045 if (req->r_con_filling_msg) {
2028 dout("get_reply revoking msg %p from old con %p\n", 2046 dout("%s revoking msg %p from old con %p\n", __func__,
2029 req->r_reply, req->r_con_filling_msg); 2047 req->r_reply, req->r_con_filling_msg);
2030 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); 2048 ceph_msg_revoke_incoming(req->r_reply);
2031 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 2049 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2032 req->r_con_filling_msg = NULL; 2050 req->r_con_filling_msg = NULL;
2033 } 2051 }
@@ -2080,6 +2098,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2080 int type = le16_to_cpu(hdr->type); 2098 int type = le16_to_cpu(hdr->type);
2081 int front = le32_to_cpu(hdr->front_len); 2099 int front = le32_to_cpu(hdr->front_len);
2082 2100
2101 *skip = 0;
2083 switch (type) { 2102 switch (type) {
2084 case CEPH_MSG_OSD_MAP: 2103 case CEPH_MSG_OSD_MAP:
2085 case CEPH_MSG_WATCH_NOTIFY: 2104 case CEPH_MSG_WATCH_NOTIFY:
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 81e3b84a77ef..3124b71a8883 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -135,6 +135,21 @@ bad:
135 return -EINVAL; 135 return -EINVAL;
136} 136}
137 137
138static int skip_name_map(void **p, void *end)
139{
140 int len;
141 ceph_decode_32_safe(p, end, len ,bad);
142 while (len--) {
143 int strlen;
144 *p += sizeof(u32);
145 ceph_decode_32_safe(p, end, strlen, bad);
146 *p += strlen;
147}
148 return 0;
149bad:
150 return -EINVAL;
151}
152
138static struct crush_map *crush_decode(void *pbyval, void *end) 153static struct crush_map *crush_decode(void *pbyval, void *end)
139{ 154{
140 struct crush_map *c; 155 struct crush_map *c;
@@ -143,6 +158,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
143 void **p = &pbyval; 158 void **p = &pbyval;
144 void *start = pbyval; 159 void *start = pbyval;
145 u32 magic; 160 u32 magic;
161 u32 num_name_maps;
146 162
147 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 163 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
148 164
@@ -150,6 +166,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
150 if (c == NULL) 166 if (c == NULL)
151 return ERR_PTR(-ENOMEM); 167 return ERR_PTR(-ENOMEM);
152 168
169 /* set tunables to default values */
170 c->choose_local_tries = 2;
171 c->choose_local_fallback_tries = 5;
172 c->choose_total_tries = 19;
173
153 ceph_decode_need(p, end, 4*sizeof(u32), bad); 174 ceph_decode_need(p, end, 4*sizeof(u32), bad);
154 magic = ceph_decode_32(p); 175 magic = ceph_decode_32(p);
155 if (magic != CRUSH_MAGIC) { 176 if (magic != CRUSH_MAGIC) {
@@ -297,7 +318,25 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
297 } 318 }
298 319
299 /* ignore trailing name maps. */ 320 /* ignore trailing name maps. */
321 for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) {
322 err = skip_name_map(p, end);
323 if (err < 0)
324 goto done;
325 }
326
327 /* tunables */
328 ceph_decode_need(p, end, 3*sizeof(u32), done);
329 c->choose_local_tries = ceph_decode_32(p);
330 c->choose_local_fallback_tries = ceph_decode_32(p);
331 c->choose_total_tries = ceph_decode_32(p);
332 dout("crush decode tunable choose_local_tries = %d",
333 c->choose_local_tries);
334 dout("crush decode tunable choose_local_fallback_tries = %d",
335 c->choose_local_fallback_tries);
336 dout("crush decode tunable choose_total_tries = %d",
337 c->choose_total_tries);
300 338
339done:
301 dout("crush_decode success\n"); 340 dout("crush_decode success\n");
302 return c; 341 return c;
303 342
@@ -488,15 +527,16 @@ static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
488 ceph_decode_32_safe(p, end, pool, bad); 527 ceph_decode_32_safe(p, end, pool, bad);
489 ceph_decode_32_safe(p, end, len, bad); 528 ceph_decode_32_safe(p, end, len, bad);
490 dout(" pool %d len %d\n", pool, len); 529 dout(" pool %d len %d\n", pool, len);
530 ceph_decode_need(p, end, len, bad);
491 pi = __lookup_pg_pool(&map->pg_pools, pool); 531 pi = __lookup_pg_pool(&map->pg_pools, pool);
492 if (pi) { 532 if (pi) {
533 char *name = kstrndup(*p, len, GFP_NOFS);
534
535 if (!name)
536 return -ENOMEM;
493 kfree(pi->name); 537 kfree(pi->name);
494 pi->name = kmalloc(len + 1, GFP_NOFS); 538 pi->name = name;
495 if (pi->name) { 539 dout(" name is %s\n", pi->name);
496 memcpy(pi->name, *p, len);
497 pi->name[len] = '\0';
498 dout(" name is %s\n", pi->name);
499 }
500 } 540 }
501 *p += len; 541 *p += len;
502 } 542 }
@@ -666,6 +706,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
666 ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); 706 ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad);
667 ceph_decode_copy(p, &pgid, sizeof(pgid)); 707 ceph_decode_copy(p, &pgid, sizeof(pgid));
668 n = ceph_decode_32(p); 708 n = ceph_decode_32(p);
709 err = -EINVAL;
710 if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
711 goto bad;
669 ceph_decode_need(p, end, n * sizeof(u32), bad); 712 ceph_decode_need(p, end, n * sizeof(u32), bad);
670 err = -ENOMEM; 713 err = -ENOMEM;
671 pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS); 714 pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
@@ -889,6 +932,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
889 (void) __remove_pg_mapping(&map->pg_temp, pgid); 932 (void) __remove_pg_mapping(&map->pg_temp, pgid);
890 933
891 /* insert */ 934 /* insert */
935 if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
936 err = -EINVAL;
937 goto bad;
938 }
892 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS); 939 pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
893 if (!pg) { 940 if (!pg) {
894 err = -ENOMEM; 941 err = -ENOMEM;