aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-12-22 14:24:33 -0500
committerSage Weil <sage@newdream.net>2009-12-23 11:17:22 -0500
commit6df058c025ce343052c5516b1d8a9a7e73cddd64 (patch)
tree58230bd258f71c2c3adf56a55d11ed39f404d12a
parent0cf90ab5b075821940873e73cdbfeb8edc3dabe8 (diff)
ceph: include transaction id in ceph_msg_header (protocol change)
Many (most?) message types include a transaction id. By including it in the fixed size header, we always have it available even when we are unable to allocate memory for the (larger, variable sized) message body. This will allow us to error out the appropriate request instead of (silently) dropping the reply. Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/caps.c16
-rw-r--r--fs/ceph/ceph_fs.h8
-rw-r--r--fs/ceph/mds_client.c5
-rw-r--r--fs/ceph/mon_client.c4
-rw-r--r--fs/ceph/msgr.h3
-rw-r--r--fs/ceph/osd_client.c9
-rw-r--r--fs/ceph/rados.h2
7 files changed, 20 insertions, 27 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 93c1afe3f0b..847ae64346f 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -922,14 +922,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
922 if (IS_ERR(msg)) 922 if (IS_ERR(msg))
923 return PTR_ERR(msg); 923 return PTR_ERR(msg);
924 924
925 fc = msg->front.iov_base; 925 msg->hdr.tid = cpu_to_le64(flush_tid);
926 926
927 fc = msg->front.iov_base;
927 memset(fc, 0, sizeof(*fc)); 928 memset(fc, 0, sizeof(*fc));
928 929
929 fc->cap_id = cpu_to_le64(cid); 930 fc->cap_id = cpu_to_le64(cid);
930 fc->op = cpu_to_le32(op); 931 fc->op = cpu_to_le32(op);
931 fc->seq = cpu_to_le32(seq); 932 fc->seq = cpu_to_le32(seq);
932 fc->client_tid = cpu_to_le64(flush_tid);
933 fc->issue_seq = cpu_to_le32(issue_seq); 933 fc->issue_seq = cpu_to_le32(issue_seq);
934 fc->migrate_seq = cpu_to_le32(mseq); 934 fc->migrate_seq = cpu_to_le32(mseq);
935 fc->caps = cpu_to_le32(caps); 935 fc->caps = cpu_to_le32(caps);
@@ -2329,7 +2329,7 @@ restart:
2329 * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the 2329 * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
2330 * MDS has been safely committed. 2330 * MDS has been safely committed.
2331 */ 2331 */
2332static void handle_cap_flush_ack(struct inode *inode, 2332static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
2333 struct ceph_mds_caps *m, 2333 struct ceph_mds_caps *m,
2334 struct ceph_mds_session *session, 2334 struct ceph_mds_session *session,
2335 struct ceph_cap *cap) 2335 struct ceph_cap *cap)
@@ -2340,7 +2340,6 @@ static void handle_cap_flush_ack(struct inode *inode,
2340 unsigned seq = le32_to_cpu(m->seq); 2340 unsigned seq = le32_to_cpu(m->seq);
2341 int dirty = le32_to_cpu(m->dirty); 2341 int dirty = le32_to_cpu(m->dirty);
2342 int cleaned = 0; 2342 int cleaned = 0;
2343 u64 flush_tid = le64_to_cpu(m->client_tid);
2344 int drop = 0; 2343 int drop = 0;
2345 int i; 2344 int i;
2346 2345
@@ -2396,13 +2395,12 @@ out:
2396 * 2395 *
2397 * Caller hold s_mutex. 2396 * Caller hold s_mutex.
2398 */ 2397 */
2399static void handle_cap_flushsnap_ack(struct inode *inode, 2398static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
2400 struct ceph_mds_caps *m, 2399 struct ceph_mds_caps *m,
2401 struct ceph_mds_session *session) 2400 struct ceph_mds_session *session)
2402{ 2401{
2403 struct ceph_inode_info *ci = ceph_inode(inode); 2402 struct ceph_inode_info *ci = ceph_inode(inode);
2404 u64 follows = le64_to_cpu(m->snap_follows); 2403 u64 follows = le64_to_cpu(m->snap_follows);
2405 u64 flush_tid = le64_to_cpu(m->client_tid);
2406 struct ceph_cap_snap *capsnap; 2404 struct ceph_cap_snap *capsnap;
2407 int drop = 0; 2405 int drop = 0;
2408 2406
@@ -2587,12 +2585,14 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2587 struct ceph_vino vino; 2585 struct ceph_vino vino;
2588 u64 cap_id; 2586 u64 cap_id;
2589 u64 size, max_size; 2587 u64 size, max_size;
2588 u64 tid;
2590 int check_caps = 0; 2589 int check_caps = 0;
2591 int r; 2590 int r;
2592 2591
2593 dout("handle_caps from mds%d\n", mds); 2592 dout("handle_caps from mds%d\n", mds);
2594 2593
2595 /* decode */ 2594 /* decode */
2595 tid = le64_to_cpu(msg->hdr.tid);
2596 if (msg->front.iov_len < sizeof(*h)) 2596 if (msg->front.iov_len < sizeof(*h))
2597 goto bad; 2597 goto bad;
2598 h = msg->front.iov_base; 2598 h = msg->front.iov_base;
@@ -2621,7 +2621,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2621 /* these will work even if we don't have a cap yet */ 2621 /* these will work even if we don't have a cap yet */
2622 switch (op) { 2622 switch (op) {
2623 case CEPH_CAP_OP_FLUSHSNAP_ACK: 2623 case CEPH_CAP_OP_FLUSHSNAP_ACK:
2624 handle_cap_flushsnap_ack(inode, h, session); 2624 handle_cap_flushsnap_ack(inode, tid, h, session);
2625 goto done; 2625 goto done;
2626 2626
2627 case CEPH_CAP_OP_EXPORT: 2627 case CEPH_CAP_OP_EXPORT:
@@ -2662,7 +2662,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
2662 break; 2662 break;
2663 2663
2664 case CEPH_CAP_OP_FLUSH_ACK: 2664 case CEPH_CAP_OP_FLUSH_ACK:
2665 handle_cap_flush_ack(inode, h, session, cap); 2665 handle_cap_flush_ack(inode, tid, h, session, cap);
2666 break; 2666 break;
2667 2667
2668 case CEPH_CAP_OP_TRUNC: 2668 case CEPH_CAP_OP_TRUNC:
diff --git a/fs/ceph/ceph_fs.h b/fs/ceph/ceph_fs.h
index e2fd0247827..e87dfa6ec8e 100644
--- a/fs/ceph/ceph_fs.h
+++ b/fs/ceph/ceph_fs.h
@@ -35,7 +35,7 @@
35 * internal cluster protocols separately from the public, 35 * internal cluster protocols separately from the public,
36 * client-facing protocol. 36 * client-facing protocol.
37 */ 37 */
38#define CEPH_OSD_PROTOCOL 7 /* cluster internal */ 38#define CEPH_OSD_PROTOCOL 8 /* cluster internal */
39#define CEPH_MDS_PROTOCOL 9 /* cluster internal */ 39#define CEPH_MDS_PROTOCOL 9 /* cluster internal */
40#define CEPH_MON_PROTOCOL 5 /* cluster internal */ 40#define CEPH_MON_PROTOCOL 5 /* cluster internal */
41#define CEPH_OSDC_PROTOCOL 22 /* server/client */ 41#define CEPH_OSDC_PROTOCOL 22 /* server/client */
@@ -136,7 +136,6 @@ struct ceph_mon_request_header {
136struct ceph_mon_statfs { 136struct ceph_mon_statfs {
137 struct ceph_mon_request_header monhdr; 137 struct ceph_mon_request_header monhdr;
138 struct ceph_fsid fsid; 138 struct ceph_fsid fsid;
139 __le64 tid;
140} __attribute__ ((packed)); 139} __attribute__ ((packed));
141 140
142struct ceph_statfs { 141struct ceph_statfs {
@@ -146,7 +145,6 @@ struct ceph_statfs {
146 145
147struct ceph_mon_statfs_reply { 146struct ceph_mon_statfs_reply {
148 struct ceph_fsid fsid; 147 struct ceph_fsid fsid;
149 __le64 tid;
150 __le64 version; 148 __le64 version;
151 struct ceph_statfs st; 149 struct ceph_statfs st;
152} __attribute__ ((packed)); 150} __attribute__ ((packed));
@@ -333,7 +331,7 @@ union ceph_mds_request_args {
333#define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */ 331#define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */
334 332
335struct ceph_mds_request_head { 333struct ceph_mds_request_head {
336 __le64 tid, oldest_client_tid; 334 __le64 oldest_client_tid;
337 __le32 mdsmap_epoch; /* on client */ 335 __le32 mdsmap_epoch; /* on client */
338 __le32 flags; /* CEPH_MDS_FLAG_* */ 336 __le32 flags; /* CEPH_MDS_FLAG_* */
339 __u8 num_retry, num_fwd; /* count retry, fwd attempts */ 337 __u8 num_retry, num_fwd; /* count retry, fwd attempts */
@@ -356,7 +354,6 @@ struct ceph_mds_request_release {
356 354
357/* client reply */ 355/* client reply */
358struct ceph_mds_reply_head { 356struct ceph_mds_reply_head {
359 __le64 tid;
360 __le32 op; 357 __le32 op;
361 __le32 result; 358 __le32 result;
362 __le32 mdsmap_epoch; 359 __le32 mdsmap_epoch;
@@ -542,7 +539,6 @@ struct ceph_mds_caps {
542 __le32 migrate_seq; 539 __le32 migrate_seq;
543 __le64 snap_follows; 540 __le64 snap_follows;
544 __le32 snap_trace_len; 541 __le32 snap_trace_len;
545 __le64 client_tid; /* for FLUSH(SNAP) -> FLUSH(SNAP)_ACK */
546 542
547 /* authlock */ 543 /* authlock */
548 __le32 uid, gid, mode; 544 __le32 uid, gid, mode;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 63ca3b1ad45..ec884e2845d 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1339,6 +1339,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1339 if (IS_ERR(msg)) 1339 if (IS_ERR(msg))
1340 goto out_free2; 1340 goto out_free2;
1341 1341
1342 msg->hdr.tid = cpu_to_le64(req->r_tid);
1343
1342 head = msg->front.iov_base; 1344 head = msg->front.iov_base;
1343 p = msg->front.iov_base + sizeof(*head); 1345 p = msg->front.iov_base + sizeof(*head);
1344 end = msg->front.iov_base + msg->front.iov_len; 1346 end = msg->front.iov_base + msg->front.iov_len;
@@ -1431,7 +1433,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
1431 req->r_request = msg; 1433 req->r_request = msg;
1432 1434
1433 rhead = msg->front.iov_base; 1435 rhead = msg->front.iov_base;
1434 rhead->tid = cpu_to_le64(req->r_tid);
1435 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 1436 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1436 if (req->r_got_unsafe) 1437 if (req->r_got_unsafe)
1437 flags |= CEPH_MDS_FLAG_REPLAY; 1438 flags |= CEPH_MDS_FLAG_REPLAY;
@@ -1664,7 +1665,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1664 } 1665 }
1665 1666
1666 /* get request, session */ 1667 /* get request, session */
1667 tid = le64_to_cpu(head->tid); 1668 tid = le64_to_cpu(msg->hdr.tid);
1668 mutex_lock(&mdsc->mutex); 1669 mutex_lock(&mdsc->mutex);
1669 req = __lookup_request(mdsc, tid); 1670 req = __lookup_request(mdsc, tid);
1670 if (!req) { 1671 if (!req) {
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 775a9c029c5..bb94006fc68 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -349,7 +349,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
349 349
350 if (msg->front.iov_len != sizeof(*reply)) 350 if (msg->front.iov_len != sizeof(*reply))
351 goto bad; 351 goto bad;
352 tid = le64_to_cpu(reply->tid); 352 tid = le64_to_cpu(msg->hdr.tid);
353 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 353 dout("handle_statfs_reply %p tid %llu\n", msg, tid);
354 354
355 mutex_lock(&monc->mutex); 355 mutex_lock(&monc->mutex);
@@ -382,12 +382,12 @@ static int send_statfs(struct ceph_mon_client *monc,
382 if (IS_ERR(msg)) 382 if (IS_ERR(msg))
383 return PTR_ERR(msg); 383 return PTR_ERR(msg);
384 req->request = msg; 384 req->request = msg;
385 msg->hdr.tid = cpu_to_le64(req->tid);
385 h = msg->front.iov_base; 386 h = msg->front.iov_base;
386 h->monhdr.have_version = 0; 387 h->monhdr.have_version = 0;
387 h->monhdr.session_mon = cpu_to_le16(-1); 388 h->monhdr.session_mon = cpu_to_le16(-1);
388 h->monhdr.session_mon_tid = 0; 389 h->monhdr.session_mon_tid = 0;
389 h->fsid = monc->monmap->fsid; 390 h->fsid = monc->monmap->fsid;
390 h->tid = cpu_to_le64(req->tid);
391 ceph_con_send(monc->con, msg); 391 ceph_con_send(monc->con, msg);
392 return 0; 392 return 0;
393} 393}
diff --git a/fs/ceph/msgr.h b/fs/ceph/msgr.h
index c758e8f8f71..e46d8b806de 100644
--- a/fs/ceph/msgr.h
+++ b/fs/ceph/msgr.h
@@ -21,7 +21,7 @@
21 * whenever the wire protocol changes. try to keep this string length 21 * whenever the wire protocol changes. try to keep this string length
22 * constant. 22 * constant.
23 */ 23 */
24#define CEPH_BANNER "ceph v024" 24#define CEPH_BANNER "ceph v025"
25#define CEPH_BANNER_MAX_LEN 30 25#define CEPH_BANNER_MAX_LEN 30
26 26
27 27
@@ -132,6 +132,7 @@ struct ceph_msg_connect_reply {
132 */ 132 */
133struct ceph_msg_header { 133struct ceph_msg_header {
134 __le64 seq; /* message seq# for this session */ 134 __le64 seq; /* message seq# for this session */
135 __le64 tid; /* transaction id */
135 __le16 type; /* message type */ 136 __le16 type; /* message type */
136 __le16 priority; /* priority. higher value == higher priority */ 137 __le16 priority; /* priority. higher value == higher priority */
137 __le16 version; /* version of message encoding */ 138 __le16 version; /* version of message encoding */
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 374f0013956..a0aac436d5d 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -439,11 +439,9 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
439static void register_request(struct ceph_osd_client *osdc, 439static void register_request(struct ceph_osd_client *osdc,
440 struct ceph_osd_request *req) 440 struct ceph_osd_request *req)
441{ 441{
442 struct ceph_osd_request_head *head = req->r_request->front.iov_base;
443
444 mutex_lock(&osdc->request_mutex); 442 mutex_lock(&osdc->request_mutex);
445 req->r_tid = ++osdc->last_tid; 443 req->r_tid = ++osdc->last_tid;
446 head->tid = cpu_to_le64(req->r_tid); 444 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
447 445
448 dout("register_request %p tid %lld\n", req, req->r_tid); 446 dout("register_request %p tid %lld\n", req, req->r_tid);
449 __insert_request(osdc, req); 447 __insert_request(osdc, req);
@@ -702,9 +700,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
702 u64 tid; 700 u64 tid;
703 int numops, object_len, flags; 701 int numops, object_len, flags;
704 702
703 tid = le64_to_cpu(msg->hdr.tid);
705 if (msg->front.iov_len < sizeof(*rhead)) 704 if (msg->front.iov_len < sizeof(*rhead))
706 goto bad; 705 goto bad;
707 tid = le64_to_cpu(rhead->tid);
708 numops = le32_to_cpu(rhead->num_ops); 706 numops = le32_to_cpu(rhead->num_ops);
709 object_len = le32_to_cpu(rhead->object_len); 707 object_len = le32_to_cpu(rhead->object_len);
710 if (msg->front.iov_len != sizeof(*rhead) + object_len + 708 if (msg->front.iov_len != sizeof(*rhead) + object_len +
@@ -1002,7 +1000,6 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
1002{ 1000{
1003 struct ceph_osd *osd = con->private; 1001 struct ceph_osd *osd = con->private;
1004 struct ceph_osd_client *osdc; 1002 struct ceph_osd_client *osdc;
1005 struct ceph_osd_reply_head *rhead = m->front.iov_base;
1006 struct ceph_osd_request *req; 1003 struct ceph_osd_request *req;
1007 u64 tid; 1004 u64 tid;
1008 int ret = -1; 1005 int ret = -1;
@@ -1016,7 +1013,7 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
1016 if (unlikely(type != CEPH_MSG_OSD_OPREPLY)) 1013 if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
1017 return -1; /* hmm! */ 1014 return -1; /* hmm! */
1018 1015
1019 tid = le64_to_cpu(rhead->tid); 1016 tid = le64_to_cpu(m->hdr.tid);
1020 mutex_lock(&osdc->request_mutex); 1017 mutex_lock(&osdc->request_mutex);
1021 req = __lookup_request(osdc, tid); 1018 req = __lookup_request(osdc, tid);
1022 if (!req) { 1019 if (!req) {
diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h
index 12bfb2f7c27..c5614d4ae34 100644
--- a/fs/ceph/rados.h
+++ b/fs/ceph/rados.h
@@ -331,7 +331,6 @@ struct ceph_osd_op {
331 * ceph_osd_op object operations. 331 * ceph_osd_op object operations.
332 */ 332 */
333struct ceph_osd_request_head { 333struct ceph_osd_request_head {
334 __le64 tid; /* transaction id */
335 __le32 client_inc; /* client incarnation */ 334 __le32 client_inc; /* client incarnation */
336 struct ceph_object_layout layout; /* pgid */ 335 struct ceph_object_layout layout; /* pgid */
337 __le32 osdmap_epoch; /* client's osdmap epoch */ 336 __le32 osdmap_epoch; /* client's osdmap epoch */
@@ -352,7 +351,6 @@ struct ceph_osd_request_head {
352} __attribute__ ((packed)); 351} __attribute__ ((packed));
353 352
354struct ceph_osd_reply_head { 353struct ceph_osd_reply_head {
355 __le64 tid; /* transaction id */
356 __le32 client_inc; /* client incarnation */ 354 __le32 client_inc; /* client incarnation */
357 __le32 flags; 355 __le32 flags;
358 struct ceph_object_layout layout; 356 struct ceph_object_layout layout;