aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c136
1 files changed, 68 insertions, 68 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 159aa8bef9e7..5ccf87ed8d68 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -506,6 +506,7 @@ static void reset_connection(struct ceph_connection *con)
506{ 506{
507 /* reset connection, out_queue, msg_ and connect_seq */ 507 /* reset connection, out_queue, msg_ and connect_seq */
508 /* discard existing out_queue and msg_seq */ 508 /* discard existing out_queue and msg_seq */
509 dout("reset_connection %p\n", con);
509 ceph_msg_remove_list(&con->out_queue); 510 ceph_msg_remove_list(&con->out_queue);
510 ceph_msg_remove_list(&con->out_sent); 511 ceph_msg_remove_list(&con->out_sent);
511 512
@@ -561,7 +562,7 @@ void ceph_con_open(struct ceph_connection *con,
561 mutex_lock(&con->mutex); 562 mutex_lock(&con->mutex);
562 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 563 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
563 564
564 BUG_ON(con->state != CON_STATE_CLOSED); 565 WARN_ON(con->state != CON_STATE_CLOSED);
565 con->state = CON_STATE_PREOPEN; 566 con->state = CON_STATE_PREOPEN;
566 567
567 con->peer_name.type = (__u8) entity_type; 568 con->peer_name.type = (__u8) entity_type;
@@ -1506,13 +1507,6 @@ static int process_banner(struct ceph_connection *con)
1506 return 0; 1507 return 0;
1507} 1508}
1508 1509
1509static void fail_protocol(struct ceph_connection *con)
1510{
1511 reset_connection(con);
1512 BUG_ON(con->state != CON_STATE_NEGOTIATING);
1513 con->state = CON_STATE_CLOSED;
1514}
1515
1516static int process_connect(struct ceph_connection *con) 1510static int process_connect(struct ceph_connection *con)
1517{ 1511{
1518 u64 sup_feat = con->msgr->supported_features; 1512 u64 sup_feat = con->msgr->supported_features;
@@ -1530,7 +1524,7 @@ static int process_connect(struct ceph_connection *con)
1530 ceph_pr_addr(&con->peer_addr.in_addr), 1524 ceph_pr_addr(&con->peer_addr.in_addr),
1531 sup_feat, server_feat, server_feat & ~sup_feat); 1525 sup_feat, server_feat, server_feat & ~sup_feat);
1532 con->error_msg = "missing required protocol features"; 1526 con->error_msg = "missing required protocol features";
1533 fail_protocol(con); 1527 reset_connection(con);
1534 return -1; 1528 return -1;
1535 1529
1536 case CEPH_MSGR_TAG_BADPROTOVER: 1530 case CEPH_MSGR_TAG_BADPROTOVER:
@@ -1541,7 +1535,7 @@ static int process_connect(struct ceph_connection *con)
1541 le32_to_cpu(con->out_connect.protocol_version), 1535 le32_to_cpu(con->out_connect.protocol_version),
1542 le32_to_cpu(con->in_reply.protocol_version)); 1536 le32_to_cpu(con->in_reply.protocol_version));
1543 con->error_msg = "protocol version mismatch"; 1537 con->error_msg = "protocol version mismatch";
1544 fail_protocol(con); 1538 reset_connection(con);
1545 return -1; 1539 return -1;
1546 1540
1547 case CEPH_MSGR_TAG_BADAUTHORIZER: 1541 case CEPH_MSGR_TAG_BADAUTHORIZER:
@@ -1631,11 +1625,11 @@ static int process_connect(struct ceph_connection *con)
1631 ceph_pr_addr(&con->peer_addr.in_addr), 1625 ceph_pr_addr(&con->peer_addr.in_addr),
1632 req_feat, server_feat, req_feat & ~server_feat); 1626 req_feat, server_feat, req_feat & ~server_feat);
1633 con->error_msg = "missing required protocol features"; 1627 con->error_msg = "missing required protocol features";
1634 fail_protocol(con); 1628 reset_connection(con);
1635 return -1; 1629 return -1;
1636 } 1630 }
1637 1631
1638 BUG_ON(con->state != CON_STATE_NEGOTIATING); 1632 WARN_ON(con->state != CON_STATE_NEGOTIATING);
1639 con->state = CON_STATE_OPEN; 1633 con->state = CON_STATE_OPEN;
1640 1634
1641 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1635 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
@@ -2132,7 +2126,6 @@ more:
2132 if (ret < 0) 2126 if (ret < 0)
2133 goto out; 2127 goto out;
2134 2128
2135 BUG_ON(con->state != CON_STATE_CONNECTING);
2136 con->state = CON_STATE_NEGOTIATING; 2129 con->state = CON_STATE_NEGOTIATING;
2137 2130
2138 /* 2131 /*
@@ -2160,7 +2153,7 @@ more:
2160 goto more; 2153 goto more;
2161 } 2154 }
2162 2155
2163 BUG_ON(con->state != CON_STATE_OPEN); 2156 WARN_ON(con->state != CON_STATE_OPEN);
2164 2157
2165 if (con->in_base_pos < 0) { 2158 if (con->in_base_pos < 0) {
2166 /* 2159 /*
@@ -2244,22 +2237,62 @@ bad_tag:
2244 2237
2245 2238
2246/* 2239/*
2247 * Atomically queue work on a connection. Bump @con reference to 2240 * Atomically queue work on a connection after the specified delay.
2248 * avoid races with connection teardown. 2241 * Bump @con reference to avoid races with connection teardown.
2242 * Returns 0 if work was queued, or an error code otherwise.
2249 */ 2243 */
2250static void queue_con(struct ceph_connection *con) 2244static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
2251{ 2245{
2252 if (!con->ops->get(con)) { 2246 if (!con->ops->get(con)) {
2253 dout("queue_con %p ref count 0\n", con); 2247 dout("%s %p ref count 0\n", __func__, con);
2254 return; 2248
2249 return -ENOENT;
2255 } 2250 }
2256 2251
2257 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 2252 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
2258 dout("queue_con %p - already queued\n", con); 2253 dout("%s %p - already queued\n", __func__, con);
2259 con->ops->put(con); 2254 con->ops->put(con);
2260 } else { 2255
2261 dout("queue_con %p\n", con); 2256 return -EBUSY;
2257 }
2258
2259 dout("%s %p %lu\n", __func__, con, delay);
2260
2261 return 0;
2262}
2263
2264static void queue_con(struct ceph_connection *con)
2265{
2266 (void) queue_con_delay(con, 0);
2267}
2268
2269static bool con_sock_closed(struct ceph_connection *con)
2270{
2271 if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
2272 return false;
2273
2274#define CASE(x) \
2275 case CON_STATE_ ## x: \
2276 con->error_msg = "socket closed (con state " #x ")"; \
2277 break;
2278
2279 switch (con->state) {
2280 CASE(CLOSED);
2281 CASE(PREOPEN);
2282 CASE(CONNECTING);
2283 CASE(NEGOTIATING);
2284 CASE(OPEN);
2285 CASE(STANDBY);
2286 default:
2287 pr_warning("%s con %p unrecognized state %lu\n",
2288 __func__, con, con->state);
2289 con->error_msg = "unrecognized con state";
2290 BUG();
2291 break;
2262 } 2292 }
2293#undef CASE
2294
2295 return true;
2263} 2296}
2264 2297
2265/* 2298/*
@@ -2273,37 +2306,19 @@ static void con_work(struct work_struct *work)
2273 2306
2274 mutex_lock(&con->mutex); 2307 mutex_lock(&con->mutex);
2275restart: 2308restart:
2276 if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) { 2309 if (con_sock_closed(con))
2277 switch (con->state) {
2278 case CON_STATE_CONNECTING:
2279 con->error_msg = "connection failed";
2280 break;
2281 case CON_STATE_NEGOTIATING:
2282 con->error_msg = "negotiation failed";
2283 break;
2284 case CON_STATE_OPEN:
2285 con->error_msg = "socket closed";
2286 break;
2287 default:
2288 dout("unrecognized con state %d\n", (int)con->state);
2289 con->error_msg = "unrecognized con state";
2290 BUG();
2291 }
2292 goto fault; 2310 goto fault;
2293 }
2294 2311
2295 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { 2312 if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
2296 dout("con_work %p backing off\n", con); 2313 dout("con_work %p backing off\n", con);
2297 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2314 ret = queue_con_delay(con, round_jiffies_relative(con->delay));
2298 round_jiffies_relative(con->delay))) { 2315 if (ret) {
2299 dout("con_work %p backoff %lu\n", con, con->delay);
2300 mutex_unlock(&con->mutex);
2301 return;
2302 } else {
2303 con->ops->put(con);
2304 dout("con_work %p FAILED to back off %lu\n", con, 2316 dout("con_work %p FAILED to back off %lu\n", con,
2305 con->delay); 2317 con->delay);
2318 BUG_ON(ret == -ENOENT);
2319 set_bit(CON_FLAG_BACKOFF, &con->flags);
2306 } 2320 }
2321 goto done;
2307 } 2322 }
2308 2323
2309 if (con->state == CON_STATE_STANDBY) { 2324 if (con->state == CON_STATE_STANDBY) {
@@ -2355,12 +2370,12 @@ fault:
2355static void ceph_fault(struct ceph_connection *con) 2370static void ceph_fault(struct ceph_connection *con)
2356 __releases(con->mutex) 2371 __releases(con->mutex)
2357{ 2372{
2358 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2373 pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2359 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2374 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
2360 dout("fault %p state %lu to peer %s\n", 2375 dout("fault %p state %lu to peer %s\n",
2361 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2376 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2362 2377
2363 BUG_ON(con->state != CON_STATE_CONNECTING && 2378 WARN_ON(con->state != CON_STATE_CONNECTING &&
2364 con->state != CON_STATE_NEGOTIATING && 2379 con->state != CON_STATE_NEGOTIATING &&
2365 con->state != CON_STATE_OPEN); 2380 con->state != CON_STATE_OPEN);
2366 2381
@@ -2397,24 +2412,8 @@ static void ceph_fault(struct ceph_connection *con)
2397 con->delay = BASE_DELAY_INTERVAL; 2412 con->delay = BASE_DELAY_INTERVAL;
2398 else if (con->delay < MAX_DELAY_INTERVAL) 2413 else if (con->delay < MAX_DELAY_INTERVAL)
2399 con->delay *= 2; 2414 con->delay *= 2;
2400 con->ops->get(con); 2415 set_bit(CON_FLAG_BACKOFF, &con->flags);
2401 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2416 queue_con(con);
2402 round_jiffies_relative(con->delay))) {
2403 dout("fault queued %p delay %lu\n", con, con->delay);
2404 } else {
2405 con->ops->put(con);
2406 dout("fault failed to queue %p delay %lu, backoff\n",
2407 con, con->delay);
2408 /*
2409 * In many cases we see a socket state change
2410 * while con_work is running and end up
2411 * queuing (non-delayed) work, such that we
2412 * can't backoff with a delay. Set a flag so
2413 * that when con_work restarts we schedule the
2414 * delay then.
2415 */
2416 set_bit(CON_FLAG_BACKOFF, &con->flags);
2417 }
2418 } 2417 }
2419 2418
2420out_unlock: 2419out_unlock:
@@ -2749,7 +2748,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
2749 msg = con->ops->alloc_msg(con, hdr, skip); 2748 msg = con->ops->alloc_msg(con, hdr, skip);
2750 mutex_lock(&con->mutex); 2749 mutex_lock(&con->mutex);
2751 if (con->state != CON_STATE_OPEN) { 2750 if (con->state != CON_STATE_OPEN) {
2752 ceph_msg_put(msg); 2751 if (msg)
2752 ceph_msg_put(msg);
2753 return -EAGAIN; 2753 return -EAGAIN;
2754 } 2754 }
2755 con->in_msg = msg; 2755 con->in_msg = msg;