aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/osd_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/osd_client.c')
-rw-r--r--fs/ceph/osd_client.c105
1 files changed, 41 insertions, 64 deletions
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 3514f71ff85f..d25b4add85b4 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -16,7 +16,7 @@
16#define OSD_OP_FRONT_LEN 4096 16#define OSD_OP_FRONT_LEN 4096
17#define OSD_OPREPLY_FRONT_LEN 512 17#define OSD_OPREPLY_FRONT_LEN 512
18 18
19const static struct ceph_connection_operations osd_con_ops; 19static const struct ceph_connection_operations osd_con_ops;
20static int __kick_requests(struct ceph_osd_client *osdc, 20static int __kick_requests(struct ceph_osd_client *osdc,
21 struct ceph_osd *kickosd); 21 struct ceph_osd *kickosd);
22 22
@@ -147,7 +147,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
147 req = kzalloc(sizeof(*req), GFP_NOFS); 147 req = kzalloc(sizeof(*req), GFP_NOFS);
148 } 148 }
149 if (req == NULL) 149 if (req == NULL)
150 return ERR_PTR(-ENOMEM); 150 return NULL;
151 151
152 req->r_osdc = osdc; 152 req->r_osdc = osdc;
153 req->r_mempool = use_mempool; 153 req->r_mempool = use_mempool;
@@ -164,10 +164,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
164 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 164 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
165 else 165 else
166 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 166 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
167 OSD_OPREPLY_FRONT_LEN, 0, 0, NULL); 167 OSD_OPREPLY_FRONT_LEN, GFP_NOFS);
168 if (IS_ERR(msg)) { 168 if (!msg) {
169 ceph_osdc_put_request(req); 169 ceph_osdc_put_request(req);
170 return ERR_PTR(PTR_ERR(msg)); 170 return NULL;
171 } 171 }
172 req->r_reply = msg; 172 req->r_reply = msg;
173 173
@@ -178,10 +178,10 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
178 if (use_mempool) 178 if (use_mempool)
179 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 179 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
180 else 180 else
181 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL); 181 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, GFP_NOFS);
182 if (IS_ERR(msg)) { 182 if (!msg) {
183 ceph_osdc_put_request(req); 183 ceph_osdc_put_request(req);
184 return ERR_PTR(PTR_ERR(msg)); 184 return NULL;
185 } 185 }
186 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); 186 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
187 memset(msg->front.iov_base, 0, msg->front.iov_len); 187 memset(msg->front.iov_base, 0, msg->front.iov_len);
@@ -361,8 +361,13 @@ static void put_osd(struct ceph_osd *osd)
361{ 361{
362 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 362 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
363 atomic_read(&osd->o_ref) - 1); 363 atomic_read(&osd->o_ref) - 1);
364 if (atomic_dec_and_test(&osd->o_ref)) 364 if (atomic_dec_and_test(&osd->o_ref)) {
365 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
366
367 if (osd->o_authorizer)
368 ac->ops->destroy_authorizer(ac, osd->o_authorizer);
365 kfree(osd); 369 kfree(osd);
370 }
366} 371}
367 372
368/* 373/*
@@ -715,7 +720,7 @@ static void handle_timeout(struct work_struct *work)
715 * should mark the osd as failed and we should find out about 720 * should mark the osd as failed and we should find out about
716 * it from an updated osd map. 721 * it from an updated osd map.
717 */ 722 */
718 while (!list_empty(&osdc->req_lru)) { 723 while (timeout && !list_empty(&osdc->req_lru)) {
719 req = list_entry(osdc->req_lru.next, struct ceph_osd_request, 724 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
720 r_req_lru_item); 725 r_req_lru_item);
721 726
@@ -1078,6 +1083,7 @@ done:
1078 if (newmap) 1083 if (newmap)
1079 kick_requests(osdc, NULL); 1084 kick_requests(osdc, NULL);
1080 up_read(&osdc->map_sem); 1085 up_read(&osdc->map_sem);
1086 wake_up(&osdc->client->auth_wq);
1081 return; 1087 return;
1082 1088
1083bad: 1089bad:
@@ -1087,45 +1093,6 @@ bad:
1087 return; 1093 return;
1088} 1094}
1089 1095
1090
1091/*
1092 * A read request prepares specific pages that data is to be read into.
1093 * When a message is being read off the wire, we call prepare_pages to
1094 * find those pages.
1095 * 0 = success, -1 failure.
1096 */
1097static int __prepare_pages(struct ceph_connection *con,
1098 struct ceph_msg_header *hdr,
1099 struct ceph_osd_request *req,
1100 u64 tid,
1101 struct ceph_msg *m)
1102{
1103 struct ceph_osd *osd = con->private;
1104 struct ceph_osd_client *osdc;
1105 int ret = -1;
1106 int data_len = le32_to_cpu(hdr->data_len);
1107 unsigned data_off = le16_to_cpu(hdr->data_off);
1108
1109 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1110
1111 if (!osd)
1112 return -1;
1113
1114 osdc = osd->o_osdc;
1115
1116 dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
1117 tid, req->r_num_pages, want);
1118 if (unlikely(req->r_num_pages < want))
1119 goto out;
1120 m->pages = req->r_pages;
1121 m->nr_pages = req->r_num_pages;
1122 ret = 0; /* success */
1123out:
1124 BUG_ON(ret < 0 || m->nr_pages < want);
1125
1126 return ret;
1127}
1128
1129/* 1096/*
1130 * Register request, send initial attempt. 1097 * Register request, send initial attempt.
1131 */ 1098 */
@@ -1252,11 +1219,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1252 if (!osdc->req_mempool) 1219 if (!osdc->req_mempool)
1253 goto out; 1220 goto out;
1254 1221
1255 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true); 1222 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1223 "osd_op");
1256 if (err < 0) 1224 if (err < 0)
1257 goto out_mempool; 1225 goto out_mempool;
1258 err = ceph_msgpool_init(&osdc->msgpool_op_reply, 1226 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1259 OSD_OPREPLY_FRONT_LEN, 10, true); 1227 OSD_OPREPLY_FRONT_LEN, 10, true,
1228 "osd_op_reply");
1260 if (err < 0) 1229 if (err < 0)
1261 goto out_msgpool; 1230 goto out_msgpool;
1262 return 0; 1231 return 0;
@@ -1302,8 +1271,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1302 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1271 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1303 NULL, 0, truncate_seq, truncate_size, NULL, 1272 NULL, 0, truncate_seq, truncate_size, NULL,
1304 false, 1); 1273 false, 1);
1305 if (IS_ERR(req)) 1274 if (!req)
1306 return PTR_ERR(req); 1275 return -ENOMEM;
1307 1276
1308 /* it may be a short read due to an object boundary */ 1277 /* it may be a short read due to an object boundary */
1309 req->r_pages = pages; 1278 req->r_pages = pages;
@@ -1345,8 +1314,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1345 snapc, do_sync, 1314 snapc, do_sync,
1346 truncate_seq, truncate_size, mtime, 1315 truncate_seq, truncate_size, mtime,
1347 nofail, 1); 1316 nofail, 1);
1348 if (IS_ERR(req)) 1317 if (!req)
1349 return PTR_ERR(req); 1318 return -ENOMEM;
1350 1319
1351 /* it may be a short write due to an object boundary */ 1320 /* it may be a short write due to an object boundary */
1352 req->r_pages = pages; 1321 req->r_pages = pages;
@@ -1394,7 +1363,8 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1394} 1363}
1395 1364
1396/* 1365/*
1397 * lookup and return message for incoming reply 1366 * lookup and return message for incoming reply. set up reply message
1367 * pages.
1398 */ 1368 */
1399static struct ceph_msg *get_reply(struct ceph_connection *con, 1369static struct ceph_msg *get_reply(struct ceph_connection *con,
1400 struct ceph_msg_header *hdr, 1370 struct ceph_msg_header *hdr,
@@ -1407,7 +1377,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1407 int front = le32_to_cpu(hdr->front_len); 1377 int front = le32_to_cpu(hdr->front_len);
1408 int data_len = le32_to_cpu(hdr->data_len); 1378 int data_len = le32_to_cpu(hdr->data_len);
1409 u64 tid; 1379 u64 tid;
1410 int err;
1411 1380
1412 tid = le64_to_cpu(hdr->tid); 1381 tid = le64_to_cpu(hdr->tid);
1413 mutex_lock(&osdc->request_mutex); 1382 mutex_lock(&osdc->request_mutex);
@@ -1425,13 +1394,14 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1425 req->r_reply, req->r_con_filling_msg); 1394 req->r_reply, req->r_con_filling_msg);
1426 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); 1395 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1427 ceph_con_put(req->r_con_filling_msg); 1396 ceph_con_put(req->r_con_filling_msg);
1397 req->r_con_filling_msg = NULL;
1428 } 1398 }
1429 1399
1430 if (front > req->r_reply->front.iov_len) { 1400 if (front > req->r_reply->front.iov_len) {
1431 pr_warning("get_reply front %d > preallocated %d\n", 1401 pr_warning("get_reply front %d > preallocated %d\n",
1432 front, (int)req->r_reply->front.iov_len); 1402 front, (int)req->r_reply->front.iov_len);
1433 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL); 1403 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
1434 if (IS_ERR(m)) 1404 if (!m)
1435 goto out; 1405 goto out;
1436 ceph_msg_put(req->r_reply); 1406 ceph_msg_put(req->r_reply);
1437 req->r_reply = m; 1407 req->r_reply = m;
@@ -1439,12 +1409,19 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1439 m = ceph_msg_get(req->r_reply); 1409 m = ceph_msg_get(req->r_reply);
1440 1410
1441 if (data_len > 0) { 1411 if (data_len > 0) {
1442 err = __prepare_pages(con, hdr, req, tid, m); 1412 unsigned data_off = le16_to_cpu(hdr->data_off);
1443 if (err < 0) { 1413 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1414
1415 if (unlikely(req->r_num_pages < want)) {
1416 pr_warning("tid %lld reply %d > expected %d pages\n",
1417 tid, want, m->nr_pages);
1444 *skip = 1; 1418 *skip = 1;
1445 ceph_msg_put(m); 1419 ceph_msg_put(m);
1446 m = ERR_PTR(err); 1420 m = NULL;
1421 goto out;
1447 } 1422 }
1423 m->pages = req->r_pages;
1424 m->nr_pages = req->r_num_pages;
1448 } 1425 }
1449 *skip = 0; 1426 *skip = 0;
1450 req->r_con_filling_msg = ceph_con_get(con); 1427 req->r_con_filling_msg = ceph_con_get(con);
@@ -1466,7 +1443,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1466 1443
1467 switch (type) { 1444 switch (type) {
1468 case CEPH_MSG_OSD_MAP: 1445 case CEPH_MSG_OSD_MAP:
1469 return ceph_msg_new(type, front, 0, 0, NULL); 1446 return ceph_msg_new(type, front, GFP_NOFS);
1470 case CEPH_MSG_OSD_OPREPLY: 1447 case CEPH_MSG_OSD_OPREPLY:
1471 return get_reply(con, hdr, skip); 1448 return get_reply(con, hdr, skip);
1472 default: 1449 default:
@@ -1552,7 +1529,7 @@ static int invalidate_authorizer(struct ceph_connection *con)
1552 return ceph_monc_validate_auth(&osdc->client->monc); 1529 return ceph_monc_validate_auth(&osdc->client->monc);
1553} 1530}
1554 1531
1555const static struct ceph_connection_operations osd_con_ops = { 1532static const struct ceph_connection_operations osd_con_ops = {
1556 .get = get_osd_con, 1533 .get = get_osd_con,
1557 .put = put_osd_con, 1534 .put = put_osd_con,
1558 .dispatch = dispatch, 1535 .dispatch = dispatch,