aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c82
-rw-r--r--net/ceph/osd_client.c34
-rw-r--r--net/ceph/osdmap.c13
3 files changed, 99 insertions, 30 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index e15a82ccc05f..78b55f49de7c 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -76,7 +76,8 @@ const char *ceph_pr_addr(const struct sockaddr_storage *ss)
76 break; 76 break;
77 77
78 default: 78 default:
79 sprintf(s, "(unknown sockaddr family %d)", (int)ss->ss_family); 79 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)",
80 (int)ss->ss_family);
80 } 81 }
81 82
82 return s; 83 return s;
@@ -598,7 +599,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
598 * Connection negotiation. 599 * Connection negotiation.
599 */ 600 */
600 601
601static void prepare_connect_authorizer(struct ceph_connection *con) 602static int prepare_connect_authorizer(struct ceph_connection *con)
602{ 603{
603 void *auth_buf; 604 void *auth_buf;
604 int auth_len = 0; 605 int auth_len = 0;
@@ -612,13 +613,20 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
612 con->auth_retry); 613 con->auth_retry);
613 mutex_lock(&con->mutex); 614 mutex_lock(&con->mutex);
614 615
616 if (test_bit(CLOSED, &con->state) ||
617 test_bit(OPENING, &con->state))
618 return -EAGAIN;
619
615 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 620 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
616 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 621 con->out_connect.authorizer_len = cpu_to_le32(auth_len);
617 622
618 con->out_kvec[con->out_kvec_left].iov_base = auth_buf; 623 if (auth_len) {
619 con->out_kvec[con->out_kvec_left].iov_len = auth_len; 624 con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
620 con->out_kvec_left++; 625 con->out_kvec[con->out_kvec_left].iov_len = auth_len;
621 con->out_kvec_bytes += auth_len; 626 con->out_kvec_left++;
627 con->out_kvec_bytes += auth_len;
628 }
629 return 0;
622} 630}
623 631
624/* 632/*
@@ -640,9 +648,9 @@ static void prepare_write_banner(struct ceph_messenger *msgr,
640 set_bit(WRITE_PENDING, &con->state); 648 set_bit(WRITE_PENDING, &con->state);
641} 649}
642 650
643static void prepare_write_connect(struct ceph_messenger *msgr, 651static int prepare_write_connect(struct ceph_messenger *msgr,
644 struct ceph_connection *con, 652 struct ceph_connection *con,
645 int after_banner) 653 int after_banner)
646{ 654{
647 unsigned global_seq = get_global_seq(con->msgr, 0); 655 unsigned global_seq = get_global_seq(con->msgr, 0);
648 int proto; 656 int proto;
@@ -683,7 +691,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
683 con->out_more = 0; 691 con->out_more = 0;
684 set_bit(WRITE_PENDING, &con->state); 692 set_bit(WRITE_PENDING, &con->state);
685 693
686 prepare_connect_authorizer(con); 694 return prepare_connect_authorizer(con);
687} 695}
688 696
689 697
@@ -1065,8 +1073,10 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)
1065 switch (ss->ss_family) { 1073 switch (ss->ss_family) {
1066 case AF_INET: 1074 case AF_INET:
1067 ((struct sockaddr_in *)ss)->sin_port = htons(p); 1075 ((struct sockaddr_in *)ss)->sin_port = htons(p);
1076 break;
1068 case AF_INET6: 1077 case AF_INET6:
1069 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p); 1078 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
1079 break;
1070 } 1080 }
1071} 1081}
1072 1082
@@ -1216,6 +1226,7 @@ static int process_connect(struct ceph_connection *con)
1216 u64 sup_feat = con->msgr->supported_features; 1226 u64 sup_feat = con->msgr->supported_features;
1217 u64 req_feat = con->msgr->required_features; 1227 u64 req_feat = con->msgr->required_features;
1218 u64 server_feat = le64_to_cpu(con->in_reply.features); 1228 u64 server_feat = le64_to_cpu(con->in_reply.features);
1229 int ret;
1219 1230
1220 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1231 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
1221 1232
@@ -1250,7 +1261,9 @@ static int process_connect(struct ceph_connection *con)
1250 return -1; 1261 return -1;
1251 } 1262 }
1252 con->auth_retry = 1; 1263 con->auth_retry = 1;
1253 prepare_write_connect(con->msgr, con, 0); 1264 ret = prepare_write_connect(con->msgr, con, 0);
1265 if (ret < 0)
1266 return ret;
1254 prepare_read_connect(con); 1267 prepare_read_connect(con);
1255 break; 1268 break;
1256 1269
@@ -1277,6 +1290,9 @@ static int process_connect(struct ceph_connection *con)
1277 if (con->ops->peer_reset) 1290 if (con->ops->peer_reset)
1278 con->ops->peer_reset(con); 1291 con->ops->peer_reset(con);
1279 mutex_lock(&con->mutex); 1292 mutex_lock(&con->mutex);
1293 if (test_bit(CLOSED, &con->state) ||
1294 test_bit(OPENING, &con->state))
1295 return -EAGAIN;
1280 break; 1296 break;
1281 1297
1282 case CEPH_MSGR_TAG_RETRY_SESSION: 1298 case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1341,7 +1357,9 @@ static int process_connect(struct ceph_connection *con)
1341 * to WAIT. This shouldn't happen if we are the 1357 * to WAIT. This shouldn't happen if we are the
1342 * client. 1358 * client.
1343 */ 1359 */
1344 pr_err("process_connect peer connecting WAIT\n"); 1360 pr_err("process_connect got WAIT as client\n");
1361 con->error_msg = "protocol error, got WAIT as client";
1362 return -1;
1345 1363
1346 default: 1364 default:
1347 pr_err("connect protocol error, will retry\n"); 1365 pr_err("connect protocol error, will retry\n");
@@ -1810,6 +1828,17 @@ static int try_read(struct ceph_connection *con)
1810more: 1828more:
1811 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1829 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1812 con->in_base_pos); 1830 con->in_base_pos);
1831
1832 /*
1833 * process_connect and process_message drop and re-take
1834 * con->mutex. make sure we handle a racing close or reopen.
1835 */
1836 if (test_bit(CLOSED, &con->state) ||
1837 test_bit(OPENING, &con->state)) {
1838 ret = -EAGAIN;
1839 goto out;
1840 }
1841
1813 if (test_bit(CONNECTING, &con->state)) { 1842 if (test_bit(CONNECTING, &con->state)) {
1814 if (!test_bit(NEGOTIATING, &con->state)) { 1843 if (!test_bit(NEGOTIATING, &con->state)) {
1815 dout("try_read connecting\n"); 1844 dout("try_read connecting\n");
@@ -1938,8 +1967,10 @@ static void con_work(struct work_struct *work)
1938{ 1967{
1939 struct ceph_connection *con = container_of(work, struct ceph_connection, 1968 struct ceph_connection *con = container_of(work, struct ceph_connection,
1940 work.work); 1969 work.work);
1970 int ret;
1941 1971
1942 mutex_lock(&con->mutex); 1972 mutex_lock(&con->mutex);
1973restart:
1943 if (test_and_clear_bit(BACKOFF, &con->state)) { 1974 if (test_and_clear_bit(BACKOFF, &con->state)) {
1944 dout("con_work %p backing off\n", con); 1975 dout("con_work %p backing off\n", con);
1945 if (queue_delayed_work(ceph_msgr_wq, &con->work, 1976 if (queue_delayed_work(ceph_msgr_wq, &con->work,
@@ -1969,18 +2000,31 @@ static void con_work(struct work_struct *work)
1969 con_close_socket(con); 2000 con_close_socket(con);
1970 } 2001 }
1971 2002
1972 if (test_and_clear_bit(SOCK_CLOSED, &con->state) || 2003 if (test_and_clear_bit(SOCK_CLOSED, &con->state))
1973 try_read(con) < 0 || 2004 goto fault;
1974 try_write(con) < 0) { 2005
1975 mutex_unlock(&con->mutex); 2006 ret = try_read(con);
1976 ceph_fault(con); /* error/fault path */ 2007 if (ret == -EAGAIN)
1977 goto done_unlocked; 2008 goto restart;
1978 } 2009 if (ret < 0)
2010 goto fault;
2011
2012 ret = try_write(con);
2013 if (ret == -EAGAIN)
2014 goto restart;
2015 if (ret < 0)
2016 goto fault;
1979 2017
1980done: 2018done:
1981 mutex_unlock(&con->mutex); 2019 mutex_unlock(&con->mutex);
1982done_unlocked: 2020done_unlocked:
1983 con->ops->put(con); 2021 con->ops->put(con);
2022 return;
2023
2024fault:
2025 mutex_unlock(&con->mutex);
2026 ceph_fault(con); /* error/fault path */
2027 goto done_unlocked;
1984} 2028}
1985 2029
1986 2030
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 6b5dda1cb5df..9cb627a4073a 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -124,7 +124,7 @@ static void calc_layout(struct ceph_osd_client *osdc,
124 ceph_calc_raw_layout(osdc, layout, vino.snap, off, 124 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
125 plen, &bno, req, op); 125 plen, &bno, req, op);
126 126
127 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno); 127 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
128 req->r_oid_len = strlen(req->r_oid); 128 req->r_oid_len = strlen(req->r_oid);
129} 129}
130 130
@@ -1144,6 +1144,13 @@ static void handle_osds_timeout(struct work_struct *work)
1144 round_jiffies_relative(delay)); 1144 round_jiffies_relative(delay));
1145} 1145}
1146 1146
1147static void complete_request(struct ceph_osd_request *req)
1148{
1149 if (req->r_safe_callback)
1150 req->r_safe_callback(req, NULL);
1151 complete_all(&req->r_safe_completion); /* fsync waiter */
1152}
1153
1147/* 1154/*
1148 * handle osd op reply. either call the callback if it is specified, 1155 * handle osd op reply. either call the callback if it is specified,
1149 * or do the completion to wake up the waiting thread. 1156 * or do the completion to wake up the waiting thread.
@@ -1226,11 +1233,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1226 else 1233 else
1227 complete_all(&req->r_completion); 1234 complete_all(&req->r_completion);
1228 1235
1229 if (flags & CEPH_OSD_FLAG_ONDISK) { 1236 if (flags & CEPH_OSD_FLAG_ONDISK)
1230 if (req->r_safe_callback) 1237 complete_request(req);
1231 req->r_safe_callback(req, msg);
1232 complete_all(&req->r_safe_completion); /* fsync waiter */
1233 }
1234 1238
1235done: 1239done:
1236 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1240 dout("req=%p req->r_linger=%d\n", req, req->r_linger);
@@ -1421,6 +1425,15 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1421done: 1425done:
1422 downgrade_write(&osdc->map_sem); 1426 downgrade_write(&osdc->map_sem);
1423 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1427 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1428
1429 /*
1430 * subscribe to subsequent osdmap updates if full to ensure
1431 * we find out when we are no longer full and stop returning
1432 * ENOSPC.
1433 */
1434 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1435 ceph_monc_request_next_osdmap(&osdc->client->monc);
1436
1424 send_queued(osdc); 1437 send_queued(osdc);
1425 up_read(&osdc->map_sem); 1438 up_read(&osdc->map_sem);
1426 wake_up_all(&osdc->client->auth_wq); 1439 wake_up_all(&osdc->client->auth_wq);
@@ -1677,8 +1690,14 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1677 */ 1690 */
1678 if (req->r_sent == 0) { 1691 if (req->r_sent == 0) {
1679 rc = __map_request(osdc, req); 1692 rc = __map_request(osdc, req);
1680 if (rc < 0) 1693 if (rc < 0) {
1694 if (nofail) {
1695 dout("osdc_start_request failed map, "
1696 " will retry %lld\n", req->r_tid);
1697 rc = 0;
1698 }
1681 goto out_unlock; 1699 goto out_unlock;
1700 }
1682 if (req->r_osd == NULL) { 1701 if (req->r_osd == NULL) {
1683 dout("send_request %p no up osds in pg\n", req); 1702 dout("send_request %p no up osds in pg\n", req);
1684 ceph_monc_request_next_osdmap(&osdc->client->monc); 1703 ceph_monc_request_next_osdmap(&osdc->client->monc);
@@ -1717,6 +1736,7 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1717 __cancel_request(req); 1736 __cancel_request(req);
1718 __unregister_request(osdc, req); 1737 __unregister_request(osdc, req);
1719 mutex_unlock(&osdc->request_mutex); 1738 mutex_unlock(&osdc->request_mutex);
1739 complete_request(req);
1720 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1740 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1721 return rc; 1741 return rc;
1722 } 1742 }
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 71603ac3dff5..e97c3588c3ec 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -765,7 +765,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
765 } 765 }
766 766
767 map->epoch++; 767 map->epoch++;
768 map->modified = map->modified; 768 map->modified = modified;
769 if (newcrush) { 769 if (newcrush) {
770 if (map->crush) 770 if (map->crush)
771 crush_destroy(map->crush); 771 crush_destroy(map->crush);
@@ -830,15 +830,20 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
830 map->osd_addr[osd] = addr; 830 map->osd_addr[osd] = addr;
831 } 831 }
832 832
833 /* new_down */ 833 /* new_state */
834 ceph_decode_32_safe(p, end, len, bad); 834 ceph_decode_32_safe(p, end, len, bad);
835 while (len--) { 835 while (len--) {
836 u32 osd; 836 u32 osd;
837 u8 xorstate;
837 ceph_decode_32_safe(p, end, osd, bad); 838 ceph_decode_32_safe(p, end, osd, bad);
839 xorstate = **(u8 **)p;
838 (*p)++; /* clean flag */ 840 (*p)++; /* clean flag */
839 pr_info("osd%d down\n", osd); 841 if (xorstate == 0)
842 xorstate = CEPH_OSD_UP;
843 if (xorstate & CEPH_OSD_UP)
844 pr_info("osd%d down\n", osd);
840 if (osd < map->max_osd) 845 if (osd < map->max_osd)
841 map->osd_state[osd] &= ~CEPH_OSD_UP; 846 map->osd_state[osd] ^= xorstate;
842 } 847 }
843 848
844 /* new_weight */ 849 /* new_weight */