aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-10-06 14:31:13 -0400
committerSage Weil <sage@newdream.net>2009-10-06 14:31:13 -0400
commit31b8006e1d79e127a776c9414e3e0b5f9508047e (patch)
tree9c56f678f8dce4e25690461b078409d657731f77 /fs
parent963b61eb041e8850807d95f8d7a4c6a454c45000 (diff)
ceph: messenger library
A generic message passing library is used to communicate with all other components in the Ceph file system. The messenger library provides ordered, reliable delivery of messages between two nodes in the system. This implementation is based on TCP. Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/decode.h136
-rw-r--r--fs/ceph/messenger.c2019
-rw-r--r--fs/ceph/messenger.h243
3 files changed, 2398 insertions, 0 deletions
diff --git a/fs/ceph/decode.h b/fs/ceph/decode.h
new file mode 100644
index 000000000000..fc2769df062d
--- /dev/null
+++ b/fs/ceph/decode.h
@@ -0,0 +1,136 @@
1#ifndef __CEPH_DECODE_H
2#define __CEPH_DECODE_H
3
4#include <asm/unaligned.h>
5
6/*
7 * in all cases,
8 * void **p pointer to position pointer
9 * void *end pointer to end of buffer (last byte + 1)
10 */
11
12/*
13 * bounds check input.
14 */
15#define ceph_decode_need(p, end, n, bad) \
16 do { \
17 if (unlikely(*(p) + (n) > (end))) \
18 goto bad; \
19 } while (0)
20
21#define ceph_decode_64(p, v) \
22 do { \
23 v = get_unaligned_le64(*(p)); \
24 *(p) += sizeof(u64); \
25 } while (0)
26#define ceph_decode_32(p, v) \
27 do { \
28 v = get_unaligned_le32(*(p)); \
29 *(p) += sizeof(u32); \
30 } while (0)
31#define ceph_decode_16(p, v) \
32 do { \
33 v = get_unaligned_le16(*(p)); \
34 *(p) += sizeof(u16); \
35 } while (0)
36#define ceph_decode_8(p, v) \
37 do { \
38 v = *(u8 *)*(p); \
39 (*p)++; \
40 } while (0)
41
42#define ceph_decode_copy(p, pv, n) \
43 do { \
44 memcpy(pv, *(p), n); \
45 *(p) += n; \
46 } while (0)
47
48/* bounds check too */
49#define ceph_decode_64_safe(p, end, v, bad) \
50 do { \
51 ceph_decode_need(p, end, sizeof(u64), bad); \
52 ceph_decode_64(p, v); \
53 } while (0)
54#define ceph_decode_32_safe(p, end, v, bad) \
55 do { \
56 ceph_decode_need(p, end, sizeof(u32), bad); \
57 ceph_decode_32(p, v); \
58 } while (0)
59#define ceph_decode_16_safe(p, end, v, bad) \
60 do { \
61 ceph_decode_need(p, end, sizeof(u16), bad); \
62 ceph_decode_16(p, v); \
63 } while (0)
64
65#define ceph_decode_copy_safe(p, end, pv, n, bad) \
66 do { \
67 ceph_decode_need(p, end, n, bad); \
68 ceph_decode_copy(p, pv, n); \
69 } while (0)
70
71/*
72 * struct ceph_timespec <-> struct timespec
73 */
74#define ceph_decode_timespec(ts, tv) \
75 do { \
76 (ts)->tv_sec = le32_to_cpu((tv)->tv_sec); \
77 (ts)->tv_nsec = le32_to_cpu((tv)->tv_nsec); \
78 } while (0)
79#define ceph_encode_timespec(tv, ts) \
80 do { \
81 (tv)->tv_sec = cpu_to_le32((ts)->tv_sec); \
82 (tv)->tv_nsec = cpu_to_le32((ts)->tv_nsec); \
83 } while (0)
84
85
86/*
87 * encoders
88 */
89#define ceph_encode_64(p, v) \
90 do { \
91 put_unaligned_le64(v, (__le64 *)*(p)); \
92 *(p) += sizeof(u64); \
93 } while (0)
94#define ceph_encode_32(p, v) \
95 do { \
96 put_unaligned_le32(v, (__le32 *)*(p)); \
97 *(p) += sizeof(u32); \
98 } while (0)
99#define ceph_encode_16(p, v) \
100 do { \
101 put_unaligned_le16(v), (__le16 *)*(p)); \
102 *(p) += sizeof(u16); \
103 } while (0)
104#define ceph_encode_8(p, v) \
105 do { \
106 *(u8 *)*(p) = v; \
107 (*(p))++; \
108 } while (0)
109
110/*
111 * filepath, string encoders
112 */
113static inline void ceph_encode_filepath(void **p, void *end,
114 u64 ino, const char *path)
115{
116 u32 len = path ? strlen(path) : 0;
117 BUG_ON(*p + sizeof(ino) + sizeof(len) + len > end);
118 ceph_encode_64(p, ino);
119 ceph_encode_32(p, len);
120 if (len)
121 memcpy(*p, path, len);
122 *p += len;
123}
124
125static inline void ceph_encode_string(void **p, void *end,
126 const char *s, u32 len)
127{
128 BUG_ON(*p + sizeof(len) + len > end);
129 ceph_encode_32(p, len);
130 if (len)
131 memcpy(*p, s, len);
132 *p += len;
133}
134
135
136#endif
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
new file mode 100644
index 000000000000..63f7f1359385
--- /dev/null
+++ b/fs/ceph/messenger.c
@@ -0,0 +1,2019 @@
1#include "ceph_debug.h"
2
3#include <linux/crc32c.h>
4#include <linux/ctype.h>
5#include <linux/highmem.h>
6#include <linux/inet.h>
7#include <linux/kthread.h>
8#include <linux/net.h>
9#include <linux/socket.h>
10#include <linux/string.h>
11#include <net/tcp.h>
12
13#include "super.h"
14#include "messenger.h"
15
16/*
17 * Ceph uses the messenger to exchange ceph_msg messages with other
18 * hosts in the system. The messenger provides ordered and reliable
19 * delivery. We tolerate TCP disconnects by reconnecting (with
20 * exponential backoff) in the case of a fault (disconnection, bad
21 * crc, protocol error). Acks allow sent messages to be discarded by
22 * the sender.
23 */
24
25/* static tag bytes (protocol control messages) */
26static char tag_msg = CEPH_MSGR_TAG_MSG;
27static char tag_ack = CEPH_MSGR_TAG_ACK;
28static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
29
30
31static void queue_con(struct ceph_connection *con);
32static void con_work(struct work_struct *);
33static void ceph_fault(struct ceph_connection *con);
34
35const char *ceph_name_type_str(int t)
36{
37 switch (t) {
38 case CEPH_ENTITY_TYPE_MON: return "mon";
39 case CEPH_ENTITY_TYPE_MDS: return "mds";
40 case CEPH_ENTITY_TYPE_OSD: return "osd";
41 case CEPH_ENTITY_TYPE_CLIENT: return "client";
42 case CEPH_ENTITY_TYPE_ADMIN: return "admin";
43 default: return "???";
44 }
45}
46
47/*
48 * nicely render a sockaddr as a string.
49 */
50#define MAX_ADDR_STR 20
51static char addr_str[MAX_ADDR_STR][40];
52static DEFINE_SPINLOCK(addr_str_lock);
53static int last_addr_str;
54
55const char *pr_addr(const struct sockaddr_storage *ss)
56{
57 int i;
58 char *s;
59 struct sockaddr_in *in4 = (void *)ss;
60 unsigned char *quad = (void *)&in4->sin_addr.s_addr;
61 struct sockaddr_in6 *in6 = (void *)ss;
62
63 spin_lock(&addr_str_lock);
64 i = last_addr_str++;
65 if (last_addr_str == MAX_ADDR_STR)
66 last_addr_str = 0;
67 spin_unlock(&addr_str_lock);
68 s = addr_str[i];
69
70 switch (ss->ss_family) {
71 case AF_INET:
72 sprintf(s, "%u.%u.%u.%u:%u",
73 (unsigned int)quad[0],
74 (unsigned int)quad[1],
75 (unsigned int)quad[2],
76 (unsigned int)quad[3],
77 (unsigned int)ntohs(in4->sin_port));
78 break;
79
80 case AF_INET6:
81 sprintf(s, "%04x:%04x:%04x:%04x:%04x:%04x:%04x:%04x:%u",
82 in6->sin6_addr.s6_addr16[0],
83 in6->sin6_addr.s6_addr16[1],
84 in6->sin6_addr.s6_addr16[2],
85 in6->sin6_addr.s6_addr16[3],
86 in6->sin6_addr.s6_addr16[4],
87 in6->sin6_addr.s6_addr16[5],
88 in6->sin6_addr.s6_addr16[6],
89 in6->sin6_addr.s6_addr16[7],
90 (unsigned int)ntohs(in6->sin6_port));
91 break;
92
93 default:
94 sprintf(s, "(unknown sockaddr family %d)", (int)ss->ss_family);
95 }
96
97 return s;
98}
99
100/*
101 * work queue for all reading and writing to/from the socket.
102 */
103struct workqueue_struct *ceph_msgr_wq;
104
105int __init ceph_msgr_init(void)
106{
107 ceph_msgr_wq = create_workqueue("ceph-msgr");
108 if (IS_ERR(ceph_msgr_wq)) {
109 int ret = PTR_ERR(ceph_msgr_wq);
110 pr_err("msgr_init failed to create workqueue: %d\n", ret);
111 ceph_msgr_wq = NULL;
112 return ret;
113 }
114 return 0;
115}
116
117void ceph_msgr_exit(void)
118{
119 destroy_workqueue(ceph_msgr_wq);
120}
121
122/*
123 * socket callback functions
124 */
125
126/* data available on socket, or listen socket received a connect */
127static void ceph_data_ready(struct sock *sk, int count_unused)
128{
129 struct ceph_connection *con =
130 (struct ceph_connection *)sk->sk_user_data;
131 if (sk->sk_state != TCP_CLOSE_WAIT) {
132 dout("ceph_data_ready on %p state = %lu, queueing work\n",
133 con, con->state);
134 queue_con(con);
135 }
136}
137
138/* socket has buffer space for writing */
139static void ceph_write_space(struct sock *sk)
140{
141 struct ceph_connection *con =
142 (struct ceph_connection *)sk->sk_user_data;
143
144 /* only queue to workqueue if there is data we want to write. */
145 if (test_bit(WRITE_PENDING, &con->state)) {
146 dout("ceph_write_space %p queueing write work\n", con);
147 queue_con(con);
148 } else {
149 dout("ceph_write_space %p nothing to write\n", con);
150 }
151
152 /* since we have our own write_space, clear the SOCK_NOSPACE flag */
153 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
154}
155
156/* socket's state has changed */
157static void ceph_state_change(struct sock *sk)
158{
159 struct ceph_connection *con =
160 (struct ceph_connection *)sk->sk_user_data;
161
162 dout("ceph_state_change %p state = %lu sk_state = %u\n",
163 con, con->state, sk->sk_state);
164
165 if (test_bit(CLOSED, &con->state))
166 return;
167
168 switch (sk->sk_state) {
169 case TCP_CLOSE:
170 dout("ceph_state_change TCP_CLOSE\n");
171 case TCP_CLOSE_WAIT:
172 dout("ceph_state_change TCP_CLOSE_WAIT\n");
173 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
174 if (test_bit(CONNECTING, &con->state))
175 con->error_msg = "connection failed";
176 else
177 con->error_msg = "socket closed";
178 queue_con(con);
179 }
180 break;
181 case TCP_ESTABLISHED:
182 dout("ceph_state_change TCP_ESTABLISHED\n");
183 queue_con(con);
184 break;
185 }
186}
187
188/*
189 * set up socket callbacks
190 */
191static void set_sock_callbacks(struct socket *sock,
192 struct ceph_connection *con)
193{
194 struct sock *sk = sock->sk;
195 sk->sk_user_data = (void *)con;
196 sk->sk_data_ready = ceph_data_ready;
197 sk->sk_write_space = ceph_write_space;
198 sk->sk_state_change = ceph_state_change;
199}
200
201
202/*
203 * socket helpers
204 */
205
206/*
207 * initiate connection to a remote socket.
208 */
209static struct socket *ceph_tcp_connect(struct ceph_connection *con)
210{
211 struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.in_addr;
212 struct socket *sock;
213 int ret;
214
215 BUG_ON(con->sock);
216 ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
217 if (ret)
218 return ERR_PTR(ret);
219 con->sock = sock;
220 sock->sk->sk_allocation = GFP_NOFS;
221
222 set_sock_callbacks(sock, con);
223
224 dout("connect %s\n", pr_addr(&con->peer_addr.in_addr));
225
226 ret = sock->ops->connect(sock, paddr, sizeof(*paddr), O_NONBLOCK);
227 if (ret == -EINPROGRESS) {
228 dout("connect %s EINPROGRESS sk_state = %u\n",
229 pr_addr(&con->peer_addr.in_addr),
230 sock->sk->sk_state);
231 ret = 0;
232 }
233 if (ret < 0) {
234 pr_err("connect %s error %d\n",
235 pr_addr(&con->peer_addr.in_addr), ret);
236 sock_release(sock);
237 con->sock = NULL;
238 con->error_msg = "connect error";
239 }
240
241 if (ret < 0)
242 return ERR_PTR(ret);
243 return sock;
244}
245
246static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
247{
248 struct kvec iov = {buf, len};
249 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
250
251 return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
252}
253
254/*
255 * write something. @more is true if caller will be sending more data
256 * shortly.
257 */
258static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
259 size_t kvlen, size_t len, int more)
260{
261 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
262
263 if (more)
264 msg.msg_flags |= MSG_MORE;
265 else
266 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
267
268 return kernel_sendmsg(sock, &msg, iov, kvlen, len);
269}
270
271
272/*
273 * Shutdown/close the socket for the given connection.
274 */
275static int con_close_socket(struct ceph_connection *con)
276{
277 int rc;
278
279 dout("con_close_socket on %p sock %p\n", con, con->sock);
280 if (!con->sock)
281 return 0;
282 set_bit(SOCK_CLOSED, &con->state);
283 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
284 sock_release(con->sock);
285 con->sock = NULL;
286 clear_bit(SOCK_CLOSED, &con->state);
287 return rc;
288}
289
290/*
291 * Reset a connection. Discard all incoming and outgoing messages
292 * and clear *_seq state.
293 */
294static void ceph_msg_remove(struct ceph_msg *msg)
295{
296 list_del_init(&msg->list_head);
297 ceph_msg_put(msg);
298}
299static void ceph_msg_remove_list(struct list_head *head)
300{
301 while (!list_empty(head)) {
302 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
303 list_head);
304 ceph_msg_remove(msg);
305 }
306}
307
308static void reset_connection(struct ceph_connection *con)
309{
310 /* reset connection, out_queue, msg_ and connect_seq */
311 /* discard existing out_queue and msg_seq */
312 mutex_lock(&con->out_mutex);
313 ceph_msg_remove_list(&con->out_queue);
314 ceph_msg_remove_list(&con->out_sent);
315
316 con->connect_seq = 0;
317 con->out_seq = 0;
318 con->out_msg = NULL;
319 con->in_seq = 0;
320 mutex_unlock(&con->out_mutex);
321}
322
323/*
324 * mark a peer down. drop any open connections.
325 */
326void ceph_con_close(struct ceph_connection *con)
327{
328 dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
329 set_bit(CLOSED, &con->state); /* in case there's queued work */
330 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
331 reset_connection(con);
332 queue_con(con);
333}
334
335/*
336 * clean up connection state
337 */
338void ceph_con_shutdown(struct ceph_connection *con)
339{
340 dout("con_shutdown %p\n", con);
341 reset_connection(con);
342 set_bit(DEAD, &con->state);
343 con_close_socket(con); /* silently ignore errors */
344}
345
346/*
347 * Reopen a closed connection, with a new peer address.
348 */
349void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
350{
351 dout("con_open %p %s\n", con, pr_addr(&addr->in_addr));
352 set_bit(OPENING, &con->state);
353 clear_bit(CLOSED, &con->state);
354 memcpy(&con->peer_addr, addr, sizeof(*addr));
355 queue_con(con);
356}
357
358/*
359 * generic get/put
360 */
361struct ceph_connection *ceph_con_get(struct ceph_connection *con)
362{
363 dout("con_get %p nref = %d -> %d\n", con,
364 atomic_read(&con->nref), atomic_read(&con->nref) + 1);
365 if (atomic_inc_not_zero(&con->nref))
366 return con;
367 return NULL;
368}
369
370void ceph_con_put(struct ceph_connection *con)
371{
372 dout("con_put %p nref = %d -> %d\n", con,
373 atomic_read(&con->nref), atomic_read(&con->nref) - 1);
374 BUG_ON(atomic_read(&con->nref) == 0);
375 if (atomic_dec_and_test(&con->nref)) {
376 ceph_con_shutdown(con);
377 kfree(con);
378 }
379}
380
381/*
382 * initialize a new connection.
383 */
384void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
385{
386 dout("con_init %p\n", con);
387 memset(con, 0, sizeof(*con));
388 atomic_set(&con->nref, 1);
389 con->msgr = msgr;
390 mutex_init(&con->out_mutex);
391 INIT_LIST_HEAD(&con->out_queue);
392 INIT_LIST_HEAD(&con->out_sent);
393 INIT_DELAYED_WORK(&con->work, con_work);
394}
395
396
397/*
398 * We maintain a global counter to order connection attempts. Get
399 * a unique seq greater than @gt.
400 */
401static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
402{
403 u32 ret;
404
405 spin_lock(&msgr->global_seq_lock);
406 if (msgr->global_seq < gt)
407 msgr->global_seq = gt;
408 ret = ++msgr->global_seq;
409 spin_unlock(&msgr->global_seq_lock);
410 return ret;
411}
412
413
414/*
415 * Prepare footer for currently outgoing message, and finish things
416 * off. Assumes out_kvec* are already valid.. we just add on to the end.
417 */
418static void prepare_write_message_footer(struct ceph_connection *con, int v)
419{
420 struct ceph_msg *m = con->out_msg;
421
422 dout("prepare_write_message_footer %p\n", con);
423 con->out_kvec_is_msg = true;
424 con->out_kvec[v].iov_base = &m->footer;
425 con->out_kvec[v].iov_len = sizeof(m->footer);
426 con->out_kvec_bytes += sizeof(m->footer);
427 con->out_kvec_left++;
428 con->out_more = m->more_to_follow;
429 con->out_msg = NULL; /* we're done with this one */
430}
431
432/*
433 * Prepare headers for the next outgoing message.
434 */
435static void prepare_write_message(struct ceph_connection *con)
436{
437 struct ceph_msg *m;
438 int v = 0;
439
440 con->out_kvec_bytes = 0;
441 con->out_kvec_is_msg = true;
442
443 /* Sneak an ack in there first? If we can get it into the same
444 * TCP packet that's a good thing. */
445 if (con->in_seq > con->in_seq_acked) {
446 con->in_seq_acked = con->in_seq;
447 con->out_kvec[v].iov_base = &tag_ack;
448 con->out_kvec[v++].iov_len = 1;
449 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
450 con->out_kvec[v].iov_base = &con->out_temp_ack;
451 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack);
452 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
453 }
454
455 /* move message to sending/sent list */
456 m = list_first_entry(&con->out_queue,
457 struct ceph_msg, list_head);
458 list_move_tail(&m->list_head, &con->out_sent);
459 con->out_msg = m; /* we don't bother taking a reference here. */
460
461 m->hdr.seq = cpu_to_le64(++con->out_seq);
462
463 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
464 m, con->out_seq, le16_to_cpu(m->hdr.type),
465 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
466 le32_to_cpu(m->hdr.data_len),
467 m->nr_pages);
468 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
469
470 /* tag + hdr + front + middle */
471 con->out_kvec[v].iov_base = &tag_msg;
472 con->out_kvec[v++].iov_len = 1;
473 con->out_kvec[v].iov_base = &m->hdr;
474 con->out_kvec[v++].iov_len = sizeof(m->hdr);
475 con->out_kvec[v++] = m->front;
476 if (m->middle)
477 con->out_kvec[v++] = m->middle->vec;
478 con->out_kvec_left = v;
479 con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
480 (m->middle ? m->middle->vec.iov_len : 0);
481 con->out_kvec_cur = con->out_kvec;
482
483 /* fill in crc (except data pages), footer */
484 con->out_msg->hdr.crc =
485 cpu_to_le32(crc32c(0, (void *)&m->hdr,
486 sizeof(m->hdr) - sizeof(m->hdr.crc)));
487 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
488 con->out_msg->footer.front_crc =
489 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
490 if (m->middle)
491 con->out_msg->footer.middle_crc =
492 cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
493 m->middle->vec.iov_len));
494 else
495 con->out_msg->footer.middle_crc = 0;
496 con->out_msg->footer.data_crc = 0;
497 dout("prepare_write_message front_crc %u data_crc %u\n",
498 le32_to_cpu(con->out_msg->footer.front_crc),
499 le32_to_cpu(con->out_msg->footer.middle_crc));
500
501 /* is there a data payload? */
502 if (le32_to_cpu(m->hdr.data_len) > 0) {
503 /* initialize page iterator */
504 con->out_msg_pos.page = 0;
505 con->out_msg_pos.page_pos =
506 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
507 con->out_msg_pos.data_pos = 0;
508 con->out_msg_pos.did_page_crc = 0;
509 con->out_more = 1; /* data + footer will follow */
510 } else {
511 /* no, queue up footer too and be done */
512 prepare_write_message_footer(con, v);
513 }
514
515 set_bit(WRITE_PENDING, &con->state);
516}
517
518/*
519 * Prepare an ack.
520 */
521static void prepare_write_ack(struct ceph_connection *con)
522{
523 dout("prepare_write_ack %p %llu -> %llu\n", con,
524 con->in_seq_acked, con->in_seq);
525 con->in_seq_acked = con->in_seq;
526
527 con->out_kvec[0].iov_base = &tag_ack;
528 con->out_kvec[0].iov_len = 1;
529 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
530 con->out_kvec[1].iov_base = &con->out_temp_ack;
531 con->out_kvec[1].iov_len = sizeof(con->out_temp_ack);
532 con->out_kvec_left = 2;
533 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
534 con->out_kvec_cur = con->out_kvec;
535 con->out_more = 1; /* more will follow.. eventually.. */
536 set_bit(WRITE_PENDING, &con->state);
537}
538
539/*
540 * Prepare to write keepalive byte.
541 */
542static void prepare_write_keepalive(struct ceph_connection *con)
543{
544 dout("prepare_write_keepalive %p\n", con);
545 con->out_kvec[0].iov_base = &tag_keepalive;
546 con->out_kvec[0].iov_len = 1;
547 con->out_kvec_left = 1;
548 con->out_kvec_bytes = 1;
549 con->out_kvec_cur = con->out_kvec;
550 set_bit(WRITE_PENDING, &con->state);
551}
552
553/*
554 * Connection negotiation.
555 */
556
557/*
558 * We connected to a peer and are saying hello.
559 */
560static void prepare_write_connect(struct ceph_messenger *msgr,
561 struct ceph_connection *con)
562{
563 int len = strlen(CEPH_BANNER);
564 unsigned global_seq = get_global_seq(con->msgr, 0);
565 int proto;
566
567 switch (con->peer_name.type) {
568 case CEPH_ENTITY_TYPE_MON:
569 proto = CEPH_MONC_PROTOCOL;
570 break;
571 case CEPH_ENTITY_TYPE_OSD:
572 proto = CEPH_OSDC_PROTOCOL;
573 break;
574 case CEPH_ENTITY_TYPE_MDS:
575 proto = CEPH_MDSC_PROTOCOL;
576 break;
577 default:
578 BUG();
579 }
580
581 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
582 con->connect_seq, global_seq, proto);
583 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
584 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
585 con->out_connect.global_seq = cpu_to_le32(global_seq);
586 con->out_connect.protocol_version = cpu_to_le32(proto);
587 con->out_connect.flags = 0;
588 if (test_bit(LOSSYTX, &con->state))
589 con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY;
590
591 con->out_kvec[0].iov_base = CEPH_BANNER;
592 con->out_kvec[0].iov_len = len;
593 con->out_kvec[1].iov_base = &msgr->inst.addr;
594 con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
595 con->out_kvec[2].iov_base = &con->out_connect;
596 con->out_kvec[2].iov_len = sizeof(con->out_connect);
597 con->out_kvec_left = 3;
598 con->out_kvec_bytes = len + sizeof(msgr->inst.addr) +
599 sizeof(con->out_connect);
600 con->out_kvec_cur = con->out_kvec;
601 con->out_more = 0;
602 set_bit(WRITE_PENDING, &con->state);
603}
604
605static void prepare_write_connect_retry(struct ceph_messenger *msgr,
606 struct ceph_connection *con)
607{
608 dout("prepare_write_connect_retry %p\n", con);
609 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
610 con->out_connect.global_seq =
611 cpu_to_le32(get_global_seq(con->msgr, 0));
612
613 con->out_kvec[0].iov_base = &con->out_connect;
614 con->out_kvec[0].iov_len = sizeof(con->out_connect);
615 con->out_kvec_left = 1;
616 con->out_kvec_bytes = sizeof(con->out_connect);
617 con->out_kvec_cur = con->out_kvec;
618 con->out_more = 0;
619 set_bit(WRITE_PENDING, &con->state);
620}
621
622
623/*
624 * write as much of pending kvecs to the socket as we can.
625 * 1 -> done
626 * 0 -> socket full, but more to do
627 * <0 -> error
628 */
629static int write_partial_kvec(struct ceph_connection *con)
630{
631 int ret;
632
633 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
634 while (con->out_kvec_bytes > 0) {
635 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
636 con->out_kvec_left, con->out_kvec_bytes,
637 con->out_more);
638 if (ret <= 0)
639 goto out;
640 con->out_kvec_bytes -= ret;
641 if (con->out_kvec_bytes == 0)
642 break; /* done */
643 while (ret > 0) {
644 if (ret >= con->out_kvec_cur->iov_len) {
645 ret -= con->out_kvec_cur->iov_len;
646 con->out_kvec_cur++;
647 con->out_kvec_left--;
648 } else {
649 con->out_kvec_cur->iov_len -= ret;
650 con->out_kvec_cur->iov_base += ret;
651 ret = 0;
652 break;
653 }
654 }
655 }
656 con->out_kvec_left = 0;
657 con->out_kvec_is_msg = false;
658 ret = 1;
659out:
660 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
661 con->out_kvec_bytes, con->out_kvec_left, ret);
662 return ret; /* done! */
663}
664
665/*
666 * Write as much message data payload as we can. If we finish, queue
667 * up the footer.
668 * 1 -> done, footer is now queued in out_kvec[].
669 * 0 -> socket full, but more to do
670 * <0 -> error
671 */
672static int write_partial_msg_pages(struct ceph_connection *con)
673{
674 struct ceph_msg *msg = con->out_msg;
675 unsigned data_len = le32_to_cpu(msg->hdr.data_len);
676 size_t len;
677 int crc = con->msgr->nocrc;
678 int ret;
679
680 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
681 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
682 con->out_msg_pos.page_pos);
683
684 while (con->out_msg_pos.page < con->out_msg->nr_pages) {
685 struct page *page = NULL;
686 void *kaddr = NULL;
687
688 /*
689 * if we are calculating the data crc (the default), we need
690 * to map the page. if our pages[] has been revoked, use the
691 * zero page.
692 */
693 if (msg->pages) {
694 page = msg->pages[con->out_msg_pos.page];
695 if (crc)
696 kaddr = kmap(page);
697 } else {
698 page = con->msgr->zero_page;
699 if (crc)
700 kaddr = page_address(con->msgr->zero_page);
701 }
702 len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
703 (int)(data_len - con->out_msg_pos.data_pos));
704 if (crc && !con->out_msg_pos.did_page_crc) {
705 void *base = kaddr + con->out_msg_pos.page_pos;
706 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
707
708 BUG_ON(kaddr == NULL);
709 con->out_msg->footer.data_crc =
710 cpu_to_le32(crc32c(tmpcrc, base, len));
711 con->out_msg_pos.did_page_crc = 1;
712 }
713
714 ret = kernel_sendpage(con->sock, page,
715 con->out_msg_pos.page_pos, len,
716 MSG_DONTWAIT | MSG_NOSIGNAL |
717 MSG_MORE);
718
719 if (crc && msg->pages)
720 kunmap(page);
721
722 if (ret <= 0)
723 goto out;
724
725 con->out_msg_pos.data_pos += ret;
726 con->out_msg_pos.page_pos += ret;
727 if (ret == len) {
728 con->out_msg_pos.page_pos = 0;
729 con->out_msg_pos.page++;
730 con->out_msg_pos.did_page_crc = 0;
731 }
732 }
733
734 dout("write_partial_msg_pages %p msg %p done\n", con, msg);
735
736 /* prepare and queue up footer, too */
737 if (!crc)
738 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
739 con->out_kvec_bytes = 0;
740 con->out_kvec_left = 0;
741 con->out_kvec_cur = con->out_kvec;
742 prepare_write_message_footer(con, 0);
743 ret = 1;
744out:
745 return ret;
746}
747
748/*
749 * write some zeros
750 */
751static int write_partial_skip(struct ceph_connection *con)
752{
753 int ret;
754
755 while (con->out_skip > 0) {
756 struct kvec iov = {
757 .iov_base = page_address(con->msgr->zero_page),
758 .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
759 };
760
761 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
762 if (ret <= 0)
763 goto out;
764 con->out_skip -= ret;
765 }
766 ret = 1;
767out:
768 return ret;
769}
770
771/*
772 * Prepare to read connection handshake, or an ack.
773 */
774static void prepare_read_connect(struct ceph_connection *con)
775{
776 dout("prepare_read_connect %p\n", con);
777 con->in_base_pos = 0;
778}
779
780static void prepare_read_ack(struct ceph_connection *con)
781{
782 dout("prepare_read_ack %p\n", con);
783 con->in_base_pos = 0;
784}
785
786static void prepare_read_tag(struct ceph_connection *con)
787{
788 dout("prepare_read_tag %p\n", con);
789 con->in_base_pos = 0;
790 con->in_tag = CEPH_MSGR_TAG_READY;
791}
792
793/*
794 * Prepare to read a message.
795 */
796static int prepare_read_message(struct ceph_connection *con)
797{
798 dout("prepare_read_message %p\n", con);
799 BUG_ON(con->in_msg != NULL);
800 con->in_base_pos = 0;
801 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
802 return 0;
803}
804
805
806static int read_partial(struct ceph_connection *con,
807 int *to, int size, void *object)
808{
809 *to += size;
810 while (con->in_base_pos < *to) {
811 int left = *to - con->in_base_pos;
812 int have = size - left;
813 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
814 if (ret <= 0)
815 return ret;
816 con->in_base_pos += ret;
817 }
818 return 1;
819}
820
821
822/*
823 * Read all or part of the connect-side handshake on a new connection
824 */
825static int read_partial_connect(struct ceph_connection *con)
826{
827 int ret, to = 0;
828
829 dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
830
831 /* peer's banner */
832 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner);
833 if (ret <= 0)
834 goto out;
835 ret = read_partial(con, &to, sizeof(con->actual_peer_addr),
836 &con->actual_peer_addr);
837 if (ret <= 0)
838 goto out;
839 ret = read_partial(con, &to, sizeof(con->peer_addr_for_me),
840 &con->peer_addr_for_me);
841 if (ret <= 0)
842 goto out;
843 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
844 if (ret <= 0)
845 goto out;
846
847 dout("read_partial_connect %p connect_seq = %u, global_seq = %u\n",
848 con, le32_to_cpu(con->in_reply.connect_seq),
849 le32_to_cpu(con->in_reply.global_seq));
850out:
851 return ret;
852}
853
854/*
855 * Verify the hello banner looks okay.
856 */
857static int verify_hello(struct ceph_connection *con)
858{
859 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
860 pr_err("connect to/from %s has bad banner\n",
861 pr_addr(&con->peer_addr.in_addr));
862 con->error_msg = "protocol error, bad banner";
863 return -1;
864 }
865 return 0;
866}
867
868static bool addr_is_blank(struct sockaddr_storage *ss)
869{
870 switch (ss->ss_family) {
871 case AF_INET:
872 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
873 case AF_INET6:
874 return
875 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
876 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
877 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
878 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
879 }
880 return false;
881}
882
883static int addr_port(struct sockaddr_storage *ss)
884{
885 switch (ss->ss_family) {
886 case AF_INET:
887 return ((struct sockaddr_in *)ss)->sin_port;
888 case AF_INET6:
889 return ((struct sockaddr_in6 *)ss)->sin6_port;
890 }
891 return 0;
892}
893
894static void addr_set_port(struct sockaddr_storage *ss, int p)
895{
896 switch (ss->ss_family) {
897 case AF_INET:
898 ((struct sockaddr_in *)ss)->sin_port = htons(p);
899 case AF_INET6:
900 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p);
901 }
902}
903
904/*
905 * Parse an ip[:port] list into an addr array. Use the default
906 * monitor port if a port isn't specified.
907 */
908int ceph_parse_ips(const char *c, const char *end,
909 struct ceph_entity_addr *addr,
910 int max_count, int *count)
911{
912 int i;
913 const char *p = c;
914
915 dout("parse_ips on '%.*s'\n", (int)(end-c), c);
916 for (i = 0; i < max_count; i++) {
917 const char *ipend;
918 struct sockaddr_storage *ss = &addr[i].in_addr;
919 struct sockaddr_in *in4 = (void *)ss;
920 struct sockaddr_in6 *in6 = (void *)ss;
921 int port;
922
923 memset(ss, 0, sizeof(*ss));
924 if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr,
925 ',', &ipend)) {
926 ss->ss_family = AF_INET;
927 } else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
928 ',', &ipend)) {
929 ss->ss_family = AF_INET6;
930 } else {
931 goto bad;
932 }
933 p = ipend;
934
935 /* port? */
936 if (p < end && *p == ':') {
937 port = 0;
938 p++;
939 while (p < end && *p >= '0' && *p <= '9') {
940 port = (port * 10) + (*p - '0');
941 p++;
942 }
943 if (port > 65535 || port == 0)
944 goto bad;
945 } else {
946 port = CEPH_MON_PORT;
947 }
948
949 addr_set_port(ss, port);
950
951 dout("parse_ips got %s\n", pr_addr(ss));
952
953 if (p == end)
954 break;
955 if (*p != ',')
956 goto bad;
957 p++;
958 }
959
960 if (p != end)
961 goto bad;
962
963 if (count)
964 *count = i + 1;
965 return 0;
966
967bad:
968 pr_err("parse_ips bad ip '%s'\n", c);
969 return -EINVAL;
970}
971
972static int process_connect(struct ceph_connection *con)
973{
974 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
975
976 if (verify_hello(con) < 0)
977 return -1;
978
979 /*
980 * Make sure the other end is who we wanted. note that the other
981 * end may not yet know their ip address, so if it's 0.0.0.0, give
982 * them the benefit of the doubt.
983 */
984 if (!ceph_entity_addr_is_local(&con->peer_addr,
985 &con->actual_peer_addr) &&
986 !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
987 con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
988 pr_err("wrong peer, want %s/%d, "
989 "got %s/%d, wtf\n",
990 pr_addr(&con->peer_addr.in_addr),
991 con->peer_addr.nonce,
992 pr_addr(&con->actual_peer_addr.in_addr),
993 con->actual_peer_addr.nonce);
994 con->error_msg = "protocol error, wrong peer";
995 return -1;
996 }
997
998 /*
999 * did we learn our address?
1000 */
1001 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) {
1002 int port = addr_port(&con->msgr->inst.addr.in_addr);
1003
1004 memcpy(&con->msgr->inst.addr.in_addr,
1005 &con->peer_addr_for_me.in_addr,
1006 sizeof(con->peer_addr_for_me.in_addr));
1007 addr_set_port(&con->msgr->inst.addr.in_addr, port);
1008 dout("process_connect learned my addr is %s\n",
1009 pr_addr(&con->msgr->inst.addr.in_addr));
1010 }
1011
1012 switch (con->in_reply.tag) {
1013 case CEPH_MSGR_TAG_BADPROTOVER:
1014 dout("process_connect got BADPROTOVER my %d != their %d\n",
1015 le32_to_cpu(con->out_connect.protocol_version),
1016 le32_to_cpu(con->in_reply.protocol_version));
1017 pr_err("%s%lld %s protocol version mismatch,"
1018 " my %d != server's %d\n",
1019 ENTITY_NAME(con->peer_name),
1020 pr_addr(&con->peer_addr.in_addr),
1021 le32_to_cpu(con->out_connect.protocol_version),
1022 le32_to_cpu(con->in_reply.protocol_version));
1023 con->error_msg = "protocol version mismatch";
1024 if (con->ops->bad_proto)
1025 con->ops->bad_proto(con);
1026 reset_connection(con);
1027 set_bit(CLOSED, &con->state); /* in case there's queued work */
1028 return -1;
1029
1030
1031 case CEPH_MSGR_TAG_RESETSESSION:
1032 /*
1033 * If we connected with a large connect_seq but the peer
1034 * has no record of a session with us (no connection, or
1035 * connect_seq == 0), they will send RESETSESION to indicate
1036 * that they must have reset their session, and may have
1037 * dropped messages.
1038 */
1039 dout("process_connect got RESET peer seq %u\n",
1040 le32_to_cpu(con->in_connect.connect_seq));
1041 pr_err("%s%lld %s connection reset\n",
1042 ENTITY_NAME(con->peer_name),
1043 pr_addr(&con->peer_addr.in_addr));
1044 reset_connection(con);
1045 prepare_write_connect_retry(con->msgr, con);
1046 prepare_read_connect(con);
1047
1048 /* Tell ceph about it. */
1049 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
1050 if (con->ops->peer_reset)
1051 con->ops->peer_reset(con);
1052 break;
1053
1054 case CEPH_MSGR_TAG_RETRY_SESSION:
1055 /*
1056 * If we sent a smaller connect_seq than the peer has, try
1057 * again with a larger value.
1058 */
1059 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n",
1060 le32_to_cpu(con->out_connect.connect_seq),
1061 le32_to_cpu(con->in_connect.connect_seq));
1062 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
1063 prepare_write_connect_retry(con->msgr, con);
1064 prepare_read_connect(con);
1065 break;
1066
1067 case CEPH_MSGR_TAG_RETRY_GLOBAL:
1068 /*
1069 * If we sent a smaller global_seq than the peer has, try
1070 * again with a larger value.
1071 */
1072 dout("process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
1073 con->peer_global_seq,
1074 le32_to_cpu(con->in_connect.global_seq));
1075 get_global_seq(con->msgr,
1076 le32_to_cpu(con->in_connect.global_seq));
1077 prepare_write_connect_retry(con->msgr, con);
1078 prepare_read_connect(con);
1079 break;
1080
1081 case CEPH_MSGR_TAG_READY:
1082 clear_bit(CONNECTING, &con->state);
1083 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1084 set_bit(LOSSYRX, &con->state);
1085 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1086 con->connect_seq++;
1087 dout("process_connect got READY gseq %d cseq %d (%d)\n",
1088 con->peer_global_seq,
1089 le32_to_cpu(con->in_reply.connect_seq),
1090 con->connect_seq);
1091 WARN_ON(con->connect_seq !=
1092 le32_to_cpu(con->in_reply.connect_seq));
1093
1094 con->delay = 0; /* reset backoff memory */
1095 prepare_read_tag(con);
1096 break;
1097
1098 case CEPH_MSGR_TAG_WAIT:
1099 /*
1100 * If there is a connection race (we are opening
1101 * connections to each other), one of us may just have
1102 * to WAIT. This shouldn't happen if we are the
1103 * client.
1104 */
1105 pr_err("process_connect peer connecting WAIT\n");
1106
1107 default:
1108 pr_err("connect protocol error, will retry\n");
1109 con->error_msg = "protocol error, garbage tag during connect";
1110 return -1;
1111 }
1112 return 0;
1113}
1114
1115
1116/*
1117 * read (part of) an ack
1118 */
1119static int read_partial_ack(struct ceph_connection *con)
1120{
1121 int to = 0;
1122
1123 return read_partial(con, &to, sizeof(con->in_temp_ack),
1124 &con->in_temp_ack);
1125}
1126
1127
1128/*
1129 * We can finally discard anything that's been acked.
1130 */
1131static void process_ack(struct ceph_connection *con)
1132{
1133 struct ceph_msg *m;
1134 u64 ack = le64_to_cpu(con->in_temp_ack);
1135 u64 seq;
1136
1137 mutex_lock(&con->out_mutex);
1138 while (!list_empty(&con->out_sent)) {
1139 m = list_first_entry(&con->out_sent, struct ceph_msg,
1140 list_head);
1141 seq = le64_to_cpu(m->hdr.seq);
1142 if (seq > ack)
1143 break;
1144 dout("got ack for seq %llu type %d at %p\n", seq,
1145 le16_to_cpu(m->hdr.type), m);
1146 ceph_msg_remove(m);
1147 }
1148 mutex_unlock(&con->out_mutex);
1149 prepare_read_tag(con);
1150}
1151
1152
1153
1154
1155
1156
1157/*
1158 * read (part of) a message.
1159 */
1160static int read_partial_message(struct ceph_connection *con)
1161{
1162 struct ceph_msg *m = con->in_msg;
1163 void *p;
1164 int ret;
1165 int to, want, left;
1166 unsigned front_len, middle_len, data_len, data_off;
1167 int datacrc = con->msgr->nocrc;
1168
1169 dout("read_partial_message con %p msg %p\n", con, m);
1170
1171 /* header */
1172 while (con->in_base_pos < sizeof(con->in_hdr)) {
1173 left = sizeof(con->in_hdr) - con->in_base_pos;
1174 ret = ceph_tcp_recvmsg(con->sock,
1175 (char *)&con->in_hdr + con->in_base_pos,
1176 left);
1177 if (ret <= 0)
1178 return ret;
1179 con->in_base_pos += ret;
1180 if (con->in_base_pos == sizeof(con->in_hdr)) {
1181 u32 crc = crc32c(0, (void *)&con->in_hdr,
1182 sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
1183 if (crc != le32_to_cpu(con->in_hdr.crc)) {
1184 pr_err("read_partial_message bad hdr "
1185 " crc %u != expected %u\n",
1186 crc, con->in_hdr.crc);
1187 return -EBADMSG;
1188 }
1189 }
1190 }
1191
1192 front_len = le32_to_cpu(con->in_hdr.front_len);
1193 if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1194 return -EIO;
1195 middle_len = le32_to_cpu(con->in_hdr.middle_len);
1196 if (middle_len > CEPH_MSG_MAX_DATA_LEN)
1197 return -EIO;
1198 data_len = le32_to_cpu(con->in_hdr.data_len);
1199 if (data_len > CEPH_MSG_MAX_DATA_LEN)
1200 return -EIO;
1201
1202 /* allocate message? */
1203 if (!con->in_msg) {
1204 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
1205 con->in_hdr.front_len, con->in_hdr.data_len);
1206 con->in_msg = con->ops->alloc_msg(con, &con->in_hdr);
1207 if (!con->in_msg) {
1208 /* skip this message */
1209 dout("alloc_msg returned NULL, skipping message\n");
1210 con->in_base_pos = -front_len - middle_len - data_len -
1211 sizeof(m->footer);
1212 con->in_tag = CEPH_MSGR_TAG_READY;
1213 return 0;
1214 }
1215 if (IS_ERR(con->in_msg)) {
1216 ret = PTR_ERR(con->in_msg);
1217 con->in_msg = NULL;
1218 con->error_msg = "out of memory for incoming message";
1219 return ret;
1220 }
1221 m = con->in_msg;
1222 m->front.iov_len = 0; /* haven't read it yet */
1223 memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
1224 }
1225
1226 /* front */
1227 while (m->front.iov_len < front_len) {
1228 BUG_ON(m->front.iov_base == NULL);
1229 left = front_len - m->front.iov_len;
1230 ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
1231 m->front.iov_len, left);
1232 if (ret <= 0)
1233 return ret;
1234 m->front.iov_len += ret;
1235 if (m->front.iov_len == front_len)
1236 con->in_front_crc = crc32c(0, m->front.iov_base,
1237 m->front.iov_len);
1238 }
1239
1240 /* middle */
1241 while (middle_len > 0 && (!m->middle ||
1242 m->middle->vec.iov_len < middle_len)) {
1243 if (m->middle == NULL) {
1244 ret = -EOPNOTSUPP;
1245 if (con->ops->alloc_middle)
1246 ret = con->ops->alloc_middle(con, m);
1247 if (ret < 0) {
1248 dout("alloc_middle failed, skipping payload\n");
1249 con->in_base_pos = -middle_len - data_len
1250 - sizeof(m->footer);
1251 ceph_msg_put(con->in_msg);
1252 con->in_msg = NULL;
1253 con->in_tag = CEPH_MSGR_TAG_READY;
1254 return 0;
1255 }
1256 m->middle->vec.iov_len = 0;
1257 }
1258 left = middle_len - m->middle->vec.iov_len;
1259 ret = ceph_tcp_recvmsg(con->sock,
1260 (char *)m->middle->vec.iov_base +
1261 m->middle->vec.iov_len, left);
1262 if (ret <= 0)
1263 return ret;
1264 m->middle->vec.iov_len += ret;
1265 if (m->middle->vec.iov_len == middle_len)
1266 con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
1267 m->middle->vec.iov_len);
1268 }
1269
1270 /* (page) data */
1271 data_off = le16_to_cpu(m->hdr.data_off);
1272 if (data_len == 0)
1273 goto no_data;
1274
1275 if (m->nr_pages == 0) {
1276 con->in_msg_pos.page = 0;
1277 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
1278 con->in_msg_pos.data_pos = 0;
1279 /* find pages for data payload */
1280 want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1281 ret = -1;
1282 if (con->ops->prepare_pages)
1283 ret = con->ops->prepare_pages(con, m, want);
1284 if (ret < 0) {
1285 dout("%p prepare_pages failed, skipping payload\n", m);
1286 con->in_base_pos = -data_len - sizeof(m->footer);
1287 ceph_msg_put(con->in_msg);
1288 con->in_msg = NULL;
1289 con->in_tag = CEPH_MSGR_TAG_READY;
1290 return 0;
1291 }
1292 BUG_ON(m->nr_pages < want);
1293 }
1294 while (con->in_msg_pos.data_pos < data_len) {
1295 left = min((int)(data_len - con->in_msg_pos.data_pos),
1296 (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1297 BUG_ON(m->pages == NULL);
1298 p = kmap(m->pages[con->in_msg_pos.page]);
1299 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1300 left);
1301 if (ret > 0 && datacrc)
1302 con->in_data_crc =
1303 crc32c(con->in_data_crc,
1304 p + con->in_msg_pos.page_pos, ret);
1305 kunmap(m->pages[con->in_msg_pos.page]);
1306 if (ret <= 0)
1307 return ret;
1308 con->in_msg_pos.data_pos += ret;
1309 con->in_msg_pos.page_pos += ret;
1310 if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1311 con->in_msg_pos.page_pos = 0;
1312 con->in_msg_pos.page++;
1313 }
1314 }
1315
1316no_data:
1317 /* footer */
1318 to = sizeof(m->hdr) + sizeof(m->footer);
1319 while (con->in_base_pos < to) {
1320 left = to - con->in_base_pos;
1321 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
1322 (con->in_base_pos - sizeof(m->hdr)),
1323 left);
1324 if (ret <= 0)
1325 return ret;
1326 con->in_base_pos += ret;
1327 }
1328 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1329 m, front_len, m->footer.front_crc, middle_len,
1330 m->footer.middle_crc, data_len, m->footer.data_crc);
1331
1332 /* crc ok? */
1333 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1334 pr_err("read_partial_message %p front crc %u != exp. %u\n",
1335 m, con->in_front_crc, m->footer.front_crc);
1336 return -EBADMSG;
1337 }
1338 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1339 pr_err("read_partial_message %p middle crc %u != exp %u\n",
1340 m, con->in_middle_crc, m->footer.middle_crc);
1341 return -EBADMSG;
1342 }
1343 if (datacrc &&
1344 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1345 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1346 pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1347 con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1348 return -EBADMSG;
1349 }
1350
1351 return 1; /* done! */
1352}
1353
1354/*
1355 * Process message. This happens in the worker thread. The callback should
1356 * be careful not to do anything that waits on other incoming messages or it
1357 * may deadlock.
1358 */
1359static void process_message(struct ceph_connection *con)
1360{
1361 struct ceph_msg *msg = con->in_msg;
1362
1363 con->in_msg = NULL;
1364
1365 /* if first message, set peer_name */
1366 if (con->peer_name.type == 0)
1367 con->peer_name = msg->hdr.src.name;
1368
1369 mutex_lock(&con->out_mutex);
1370 con->in_seq++;
1371 mutex_unlock(&con->out_mutex);
1372
1373 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
1374 msg, le64_to_cpu(msg->hdr.seq),
1375 ENTITY_NAME(msg->hdr.src.name),
1376 le16_to_cpu(msg->hdr.type),
1377 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1378 le32_to_cpu(msg->hdr.front_len),
1379 le32_to_cpu(msg->hdr.data_len),
1380 con->in_front_crc, con->in_middle_crc, con->in_data_crc);
1381 con->ops->dispatch(con, msg);
1382 prepare_read_tag(con);
1383}
1384
1385
1386/*
1387 * Write something to the socket. Called in a worker thread when the
1388 * socket appears to be writeable and we have something ready to send.
1389 */
1390static int try_write(struct ceph_connection *con)
1391{
1392 struct ceph_messenger *msgr = con->msgr;
1393 int ret = 1;
1394
1395 dout("try_write start %p state %lu nref %d\n", con, con->state,
1396 atomic_read(&con->nref));
1397
1398 mutex_lock(&con->out_mutex);
1399more:
1400 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1401
1402 /* open the socket first? */
1403 if (con->sock == NULL) {
1404 /*
1405 * if we were STANDBY and are reconnecting _this_
1406 * connection, bump connect_seq now. Always bump
1407 * global_seq.
1408 */
1409 if (test_and_clear_bit(STANDBY, &con->state))
1410 con->connect_seq++;
1411
1412 prepare_write_connect(msgr, con);
1413 prepare_read_connect(con);
1414 set_bit(CONNECTING, &con->state);
1415
1416 con->in_tag = CEPH_MSGR_TAG_READY;
1417 dout("try_write initiating connect on %p new state %lu\n",
1418 con, con->state);
1419 con->sock = ceph_tcp_connect(con);
1420 if (IS_ERR(con->sock)) {
1421 con->sock = NULL;
1422 con->error_msg = "connect error";
1423 ret = -1;
1424 goto out;
1425 }
1426 }
1427
1428more_kvec:
1429 /* kvec data queued? */
1430 if (con->out_skip) {
1431 ret = write_partial_skip(con);
1432 if (ret <= 0)
1433 goto done;
1434 if (ret < 0) {
1435 dout("try_write write_partial_skip err %d\n", ret);
1436 goto done;
1437 }
1438 }
1439 if (con->out_kvec_left) {
1440 ret = write_partial_kvec(con);
1441 if (ret <= 0)
1442 goto done;
1443 if (ret < 0) {
1444 dout("try_write write_partial_kvec err %d\n", ret);
1445 goto done;
1446 }
1447 }
1448
1449 /* msg pages? */
1450 if (con->out_msg) {
1451 ret = write_partial_msg_pages(con);
1452 if (ret == 1)
1453 goto more_kvec; /* we need to send the footer, too! */
1454 if (ret == 0)
1455 goto done;
1456 if (ret < 0) {
1457 dout("try_write write_partial_msg_pages err %d\n",
1458 ret);
1459 goto done;
1460 }
1461 }
1462
1463 if (!test_bit(CONNECTING, &con->state)) {
1464 /* is anything else pending? */
1465 if (!list_empty(&con->out_queue)) {
1466 prepare_write_message(con);
1467 goto more;
1468 }
1469 if (con->in_seq > con->in_seq_acked) {
1470 prepare_write_ack(con);
1471 goto more;
1472 }
1473 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
1474 prepare_write_keepalive(con);
1475 goto more;
1476 }
1477 }
1478
1479 /* Nothing to do! */
1480 clear_bit(WRITE_PENDING, &con->state);
1481 dout("try_write nothing else to write.\n");
1482done:
1483 ret = 0;
1484out:
1485 mutex_unlock(&con->out_mutex);
1486 dout("try_write done on %p\n", con);
1487 return ret;
1488}
1489
1490
1491
1492/*
1493 * Read what we can from the socket.
1494 */
1495static int try_read(struct ceph_connection *con)
1496{
1497 struct ceph_messenger *msgr;
1498 int ret = -1;
1499
1500 if (!con->sock)
1501 return 0;
1502
1503 if (test_bit(STANDBY, &con->state))
1504 return 0;
1505
1506 dout("try_read start on %p\n", con);
1507 msgr = con->msgr;
1508
1509more:
1510 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
1511 con->in_base_pos);
1512 if (test_bit(CONNECTING, &con->state)) {
1513 dout("try_read connecting\n");
1514 ret = read_partial_connect(con);
1515 if (ret <= 0)
1516 goto done;
1517 if (process_connect(con) < 0) {
1518 ret = -1;
1519 goto out;
1520 }
1521 goto more;
1522 }
1523
1524 if (con->in_base_pos < 0) {
1525 /*
1526 * skipping + discarding content.
1527 *
1528 * FIXME: there must be a better way to do this!
1529 */
1530 static char buf[1024];
1531 int skip = min(1024, -con->in_base_pos);
1532 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
1533 ret = ceph_tcp_recvmsg(con->sock, buf, skip);
1534 if (ret <= 0)
1535 goto done;
1536 con->in_base_pos += ret;
1537 if (con->in_base_pos)
1538 goto more;
1539 }
1540 if (con->in_tag == CEPH_MSGR_TAG_READY) {
1541 /*
1542 * what's next?
1543 */
1544 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
1545 if (ret <= 0)
1546 goto done;
1547 dout("try_read got tag %d\n", (int)con->in_tag);
1548 switch (con->in_tag) {
1549 case CEPH_MSGR_TAG_MSG:
1550 prepare_read_message(con);
1551 break;
1552 case CEPH_MSGR_TAG_ACK:
1553 prepare_read_ack(con);
1554 break;
1555 case CEPH_MSGR_TAG_CLOSE:
1556 set_bit(CLOSED, &con->state); /* fixme */
1557 goto done;
1558 default:
1559 goto bad_tag;
1560 }
1561 }
1562 if (con->in_tag == CEPH_MSGR_TAG_MSG) {
1563 ret = read_partial_message(con);
1564 if (ret <= 0) {
1565 switch (ret) {
1566 case -EBADMSG:
1567 con->error_msg = "bad crc";
1568 ret = -EIO;
1569 goto out;
1570 case -EIO:
1571 con->error_msg = "io error";
1572 goto out;
1573 default:
1574 goto done;
1575 }
1576 }
1577 if (con->in_tag == CEPH_MSGR_TAG_READY)
1578 goto more;
1579 process_message(con);
1580 goto more;
1581 }
1582 if (con->in_tag == CEPH_MSGR_TAG_ACK) {
1583 ret = read_partial_ack(con);
1584 if (ret <= 0)
1585 goto done;
1586 process_ack(con);
1587 goto more;
1588 }
1589
1590done:
1591 ret = 0;
1592out:
1593 dout("try_read done on %p\n", con);
1594 return ret;
1595
1596bad_tag:
1597 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
1598 con->error_msg = "protocol error, garbage tag";
1599 ret = -1;
1600 goto out;
1601}
1602
1603
1604/*
1605 * Atomically queue work on a connection. Bump @con reference to
1606 * avoid races with connection teardown.
1607 *
1608 * There is some trickery going on with QUEUED and BUSY because we
1609 * only want a _single_ thread operating on each connection at any
1610 * point in time, but we want to use all available CPUs.
1611 *
1612 * The worker thread only proceeds if it can atomically set BUSY. It
1613 * clears QUEUED and does it's thing. When it thinks it's done, it
1614 * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
1615 * (tries again to set BUSY).
1616 *
1617 * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
1618 * try to queue work. If that fails (work is already queued, or BUSY)
1619 * we give up (work also already being done or is queued) but leave QUEUED
1620 * set so that the worker thread will loop if necessary.
1621 */
1622static void queue_con(struct ceph_connection *con)
1623{
1624 if (test_bit(DEAD, &con->state)) {
1625 dout("queue_con %p ignoring: DEAD\n",
1626 con);
1627 return;
1628 }
1629
1630 if (!con->ops->get(con)) {
1631 dout("queue_con %p ref count 0\n", con);
1632 return;
1633 }
1634
1635 set_bit(QUEUED, &con->state);
1636 if (test_bit(BUSY, &con->state)) {
1637 dout("queue_con %p - already BUSY\n", con);
1638 con->ops->put(con);
1639 } else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
1640 dout("queue_con %p - already queued\n", con);
1641 con->ops->put(con);
1642 } else {
1643 dout("queue_con %p\n", con);
1644 }
1645}
1646
1647/*
1648 * Do some work on a connection. Drop a connection ref when we're done.
1649 */
1650static void con_work(struct work_struct *work)
1651{
1652 struct ceph_connection *con = container_of(work, struct ceph_connection,
1653 work.work);
1654 int backoff = 0;
1655
1656more:
1657 if (test_and_set_bit(BUSY, &con->state) != 0) {
1658 dout("con_work %p BUSY already set\n", con);
1659 goto out;
1660 }
1661 dout("con_work %p start, clearing QUEUED\n", con);
1662 clear_bit(QUEUED, &con->state);
1663
1664 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
1665 dout("con_work CLOSED\n");
1666 con_close_socket(con);
1667 goto done;
1668 }
1669 if (test_and_clear_bit(OPENING, &con->state)) {
1670 /* reopen w/ new peer */
1671 dout("con_work OPENING\n");
1672 con_close_socket(con);
1673 }
1674
1675 if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
1676 try_read(con) < 0 ||
1677 try_write(con) < 0) {
1678 backoff = 1;
1679 ceph_fault(con); /* error/fault path */
1680 }
1681
1682done:
1683 clear_bit(BUSY, &con->state);
1684 dout("con->state=%lu\n", con->state);
1685 if (test_bit(QUEUED, &con->state)) {
1686 if (!backoff) {
1687 dout("con_work %p QUEUED reset, looping\n", con);
1688 goto more;
1689 }
1690 dout("con_work %p QUEUED reset, but just faulted\n", con);
1691 clear_bit(QUEUED, &con->state);
1692 }
1693 dout("con_work %p done\n", con);
1694
1695out:
1696 con->ops->put(con);
1697}
1698
1699
1700/*
1701 * Generic error/fault handler. A retry mechanism is used with
1702 * exponential backoff
1703 */
1704static void ceph_fault(struct ceph_connection *con)
1705{
1706 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
1707 pr_addr(&con->peer_addr.in_addr), con->error_msg);
1708 dout("fault %p state %lu to peer %s\n",
1709 con, con->state, pr_addr(&con->peer_addr.in_addr));
1710
1711 if (test_bit(LOSSYTX, &con->state)) {
1712 dout("fault on LOSSYTX channel\n");
1713 goto out;
1714 }
1715
1716 clear_bit(BUSY, &con->state); /* to avoid an improbable race */
1717
1718 con_close_socket(con);
1719 con->in_msg = NULL;
1720
1721 /* If there are no messages in the queue, place the connection
1722 * in a STANDBY state (i.e., don't try to reconnect just yet). */
1723 mutex_lock(&con->out_mutex);
1724 if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
1725 dout("fault setting STANDBY\n");
1726 set_bit(STANDBY, &con->state);
1727 mutex_unlock(&con->out_mutex);
1728 goto out;
1729 }
1730
1731 /* Requeue anything that hasn't been acked, and retry after a
1732 * delay. */
1733 list_splice_init(&con->out_sent, &con->out_queue);
1734 mutex_unlock(&con->out_mutex);
1735
1736 if (con->delay == 0)
1737 con->delay = BASE_DELAY_INTERVAL;
1738 else if (con->delay < MAX_DELAY_INTERVAL)
1739 con->delay *= 2;
1740
1741 /* explicitly schedule work to try to reconnect again later. */
1742 dout("fault queueing %p delay %lu\n", con, con->delay);
1743 con->ops->get(con);
1744 if (queue_delayed_work(ceph_msgr_wq, &con->work,
1745 round_jiffies_relative(con->delay)) == 0)
1746 con->ops->put(con);
1747
1748out:
1749 if (con->ops->fault)
1750 con->ops->fault(con);
1751}
1752
1753
1754
1755/*
1756 * create a new messenger instance
1757 */
1758struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
1759{
1760 struct ceph_messenger *msgr;
1761
1762 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
1763 if (msgr == NULL)
1764 return ERR_PTR(-ENOMEM);
1765
1766 spin_lock_init(&msgr->global_seq_lock);
1767
1768 /* the zero page is needed if a request is "canceled" while the message
1769 * is being written over the socket */
1770 msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
1771 if (!msgr->zero_page) {
1772 kfree(msgr);
1773 return ERR_PTR(-ENOMEM);
1774 }
1775 kmap(msgr->zero_page);
1776
1777 if (myaddr)
1778 msgr->inst.addr = *myaddr;
1779
1780 /* select a random nonce */
1781 get_random_bytes(&msgr->inst.addr.nonce,
1782 sizeof(msgr->inst.addr.nonce));
1783
1784 dout("messenger_create %p\n", msgr);
1785 return msgr;
1786}
1787
1788void ceph_messenger_destroy(struct ceph_messenger *msgr)
1789{
1790 dout("destroy %p\n", msgr);
1791 kunmap(msgr->zero_page);
1792 __free_page(msgr->zero_page);
1793 kfree(msgr);
1794 dout("destroyed messenger %p\n", msgr);
1795}
1796
1797/*
1798 * Queue up an outgoing message on the given connection.
1799 */
1800void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
1801{
1802 if (test_bit(CLOSED, &con->state)) {
1803 dout("con_send %p closed, dropping %p\n", con, msg);
1804 ceph_msg_put(msg);
1805 return;
1806 }
1807
1808 /* set src+dst */
1809 msg->hdr.src = con->msgr->inst;
1810 msg->hdr.orig_src = con->msgr->inst;
1811 msg->hdr.dst_erank = con->peer_addr.erank;
1812
1813 /* queue */
1814 mutex_lock(&con->out_mutex);
1815 BUG_ON(!list_empty(&msg->list_head));
1816 list_add_tail(&msg->list_head, &con->out_queue);
1817 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
1818 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
1819 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
1820 le32_to_cpu(msg->hdr.front_len),
1821 le32_to_cpu(msg->hdr.middle_len),
1822 le32_to_cpu(msg->hdr.data_len));
1823 mutex_unlock(&con->out_mutex);
1824
1825 /* if there wasn't anything waiting to send before, queue
1826 * new work */
1827 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
1828 queue_con(con);
1829}
1830
1831/*
1832 * Revoke a message that was previously queued for send
1833 */
1834void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
1835{
1836 mutex_lock(&con->out_mutex);
1837 if (!list_empty(&msg->list_head)) {
1838 dout("con_revoke %p msg %p\n", con, msg);
1839 list_del_init(&msg->list_head);
1840 ceph_msg_put(msg);
1841 msg->hdr.seq = 0;
1842 if (con->out_msg == msg)
1843 con->out_msg = NULL;
1844 if (con->out_kvec_is_msg) {
1845 con->out_skip = con->out_kvec_bytes;
1846 con->out_kvec_is_msg = false;
1847 }
1848 } else {
1849 dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
1850 }
1851 mutex_unlock(&con->out_mutex);
1852}
1853
1854/*
1855 * Queue a keepalive byte to ensure the tcp connection is alive.
1856 */
1857void ceph_con_keepalive(struct ceph_connection *con)
1858{
1859 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
1860 test_and_set_bit(WRITE_PENDING, &con->state) == 0)
1861 queue_con(con);
1862}
1863
1864
1865/*
1866 * construct a new message with given type, size
1867 * the new msg has a ref count of 1.
1868 */
1869struct ceph_msg *ceph_msg_new(int type, int front_len,
1870 int page_len, int page_off, struct page **pages)
1871{
1872 struct ceph_msg *m;
1873
1874 m = kmalloc(sizeof(*m), GFP_NOFS);
1875 if (m == NULL)
1876 goto out;
1877 atomic_set(&m->nref, 1);
1878 INIT_LIST_HEAD(&m->list_head);
1879
1880 m->hdr.type = cpu_to_le16(type);
1881 m->hdr.front_len = cpu_to_le32(front_len);
1882 m->hdr.middle_len = 0;
1883 m->hdr.data_len = cpu_to_le32(page_len);
1884 m->hdr.data_off = cpu_to_le16(page_off);
1885 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
1886 m->footer.front_crc = 0;
1887 m->footer.middle_crc = 0;
1888 m->footer.data_crc = 0;
1889 m->front_max = front_len;
1890 m->front_is_vmalloc = false;
1891 m->more_to_follow = false;
1892 m->pool = NULL;
1893
1894 /* front */
1895 if (front_len) {
1896 if (front_len > PAGE_CACHE_SIZE) {
1897 m->front.iov_base = __vmalloc(front_len, GFP_NOFS,
1898 PAGE_KERNEL);
1899 m->front_is_vmalloc = true;
1900 } else {
1901 m->front.iov_base = kmalloc(front_len, GFP_NOFS);
1902 }
1903 if (m->front.iov_base == NULL) {
1904 pr_err("msg_new can't allocate %d bytes\n",
1905 front_len);
1906 goto out2;
1907 }
1908 } else {
1909 m->front.iov_base = NULL;
1910 }
1911 m->front.iov_len = front_len;
1912
1913 /* middle */
1914 m->middle = NULL;
1915
1916 /* data */
1917 m->nr_pages = calc_pages_for(page_off, page_len);
1918 m->pages = pages;
1919
1920 dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
1921 m->nr_pages);
1922 return m;
1923
1924out2:
1925 ceph_msg_put(m);
1926out:
1927 pr_err("msg_new can't create type %d len %d\n", type, front_len);
1928 return ERR_PTR(-ENOMEM);
1929}
1930
1931/*
1932 * Generic message allocator, for incoming messages.
1933 */
1934struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1935 struct ceph_msg_header *hdr)
1936{
1937 int type = le16_to_cpu(hdr->type);
1938 int front_len = le32_to_cpu(hdr->front_len);
1939 struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
1940
1941 if (!msg) {
1942 pr_err("unable to allocate msg type %d len %d\n",
1943 type, front_len);
1944 return ERR_PTR(-ENOMEM);
1945 }
1946 return msg;
1947}
1948
1949/*
1950 * Allocate "middle" portion of a message, if it is needed and wasn't
1951 * allocated by alloc_msg. This allows us to read a small fixed-size
1952 * per-type header in the front and then gracefully fail (i.e.,
1953 * propagate the error to the caller based on info in the front) when
1954 * the middle is too large.
1955 */
1956int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
1957{
1958 int type = le16_to_cpu(msg->hdr.type);
1959 int middle_len = le32_to_cpu(msg->hdr.middle_len);
1960
1961 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
1962 ceph_msg_type_name(type), middle_len);
1963 BUG_ON(!middle_len);
1964 BUG_ON(msg->middle);
1965
1966 msg->middle = ceph_buffer_new_alloc(middle_len, GFP_NOFS);
1967 if (!msg->middle)
1968 return -ENOMEM;
1969 return 0;
1970}
1971
1972
1973/*
1974 * Free a generically kmalloc'd message.
1975 */
1976void ceph_msg_kfree(struct ceph_msg *m)
1977{
1978 dout("msg_kfree %p\n", m);
1979 if (m->front_is_vmalloc)
1980 vfree(m->front.iov_base);
1981 else
1982 kfree(m->front.iov_base);
1983 kfree(m);
1984}
1985
1986/*
1987 * Drop a msg ref. Destroy as needed.
1988 */
1989void ceph_msg_put(struct ceph_msg *m)
1990{
1991 dout("ceph_msg_put %p %d -> %d\n", m, atomic_read(&m->nref),
1992 atomic_read(&m->nref)-1);
1993 if (atomic_read(&m->nref) <= 0) {
1994 pr_err("bad ceph_msg_put on %p %llu %d=%s %d+%d\n",
1995 m, le64_to_cpu(m->hdr.seq),
1996 le16_to_cpu(m->hdr.type),
1997 ceph_msg_type_name(le16_to_cpu(m->hdr.type)),
1998 le32_to_cpu(m->hdr.front_len),
1999 le32_to_cpu(m->hdr.data_len));
2000 WARN_ON(1);
2001 }
2002 if (atomic_dec_and_test(&m->nref)) {
2003 dout("ceph_msg_put last one on %p\n", m);
2004 WARN_ON(!list_empty(&m->list_head));
2005
2006 /* drop middle, data, if any */
2007 if (m->middle) {
2008 ceph_buffer_put(m->middle);
2009 m->middle = NULL;
2010 }
2011 m->nr_pages = 0;
2012 m->pages = NULL;
2013
2014 if (m->pool)
2015 ceph_msgpool_put(m->pool, m);
2016 else
2017 ceph_msg_kfree(m);
2018 }
2019}
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
new file mode 100644
index 000000000000..dcd98b64dca9
--- /dev/null
+++ b/fs/ceph/messenger.h
@@ -0,0 +1,243 @@
1#ifndef __FS_CEPH_MESSENGER_H
2#define __FS_CEPH_MESSENGER_H
3
4#include <linux/mutex.h>
5#include <linux/net.h>
6#include <linux/radix-tree.h>
7#include <linux/uio.h>
8#include <linux/version.h>
9#include <linux/workqueue.h>
10
11#include "types.h"
12#include "buffer.h"
13
14struct ceph_msg;
15struct ceph_connection;
16
17extern struct workqueue_struct *ceph_msgr_wq; /* receive work queue */
18
19/*
20 * Ceph defines these callbacks for handling connection events.
21 */
22struct ceph_connection_operations {
23 struct ceph_connection *(*get)(struct ceph_connection *);
24 void (*put)(struct ceph_connection *);
25
26 /* handle an incoming message. */
27 void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
28
29 /* protocol version mismatch */
30 void (*bad_proto) (struct ceph_connection *con);
31
32 /* there was some error on the socket (disconnect, whatever) */
33 void (*fault) (struct ceph_connection *con);
34
35 /* a remote host as terminated a message exchange session, and messages
36 * we sent (or they tried to send us) may be lost. */
37 void (*peer_reset) (struct ceph_connection *con);
38
39 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
40 struct ceph_msg_header *hdr);
41 int (*alloc_middle) (struct ceph_connection *con,
42 struct ceph_msg *msg);
43 /* an incoming message has a data payload; tell me what pages I
44 * should read the data into. */
45 int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
46 int want);
47};
48
49extern const char *ceph_name_type_str(int t);
50
51/* use format string %s%d */
52#define ENTITY_NAME(n) ceph_name_type_str((n).type), le64_to_cpu((n).num)
53
54struct ceph_messenger {
55 struct ceph_entity_inst inst; /* my name+address */
56 struct page *zero_page; /* used in certain error cases */
57
58 bool nocrc;
59
60 /*
61 * the global_seq counts connections i (attempt to) initiate
62 * in order to disambiguate certain connect race conditions.
63 */
64 u32 global_seq;
65 spinlock_t global_seq_lock;
66};
67
68/*
69 * a single message. it contains a header (src, dest, message type, etc.),
70 * footer (crc values, mainly), a "front" message body, and possibly a
71 * data payload (stored in some number of pages).
72 */
73struct ceph_msg {
74 struct ceph_msg_header hdr; /* header */
75 struct ceph_msg_footer footer; /* footer */
76 struct kvec front; /* unaligned blobs of message */
77 struct ceph_buffer *middle;
78 struct page **pages; /* data payload. NOT OWNER. */
79 unsigned nr_pages; /* size of page array */
80 struct list_head list_head;
81 atomic_t nref;
82 bool front_is_vmalloc;
83 bool more_to_follow;
84 int front_max;
85
86 struct ceph_msgpool *pool;
87};
88
89struct ceph_msg_pos {
90 int page, page_pos; /* which page; offset in page */
91 int data_pos; /* offset in data payload */
92 int did_page_crc; /* true if we've calculated crc for current page */
93};
94
95/* ceph connection fault delay defaults, for exponential backoff */
96#define BASE_DELAY_INTERVAL (HZ/2)
97#define MAX_DELAY_INTERVAL (5 * 60 * HZ)
98
99/*
100 * ceph_connection state bit flags
101 *
102 * QUEUED and BUSY are used together to ensure that only a single
103 * thread is currently opening, reading or writing data to the socket.
104 */
105#define LOSSYTX 0 /* we can close channel or drop messages on errors */
106#define LOSSYRX 1 /* peer may reset/drop messages */
107#define CONNECTING 2
108#define KEEPALIVE_PENDING 3
109#define WRITE_PENDING 4 /* we have data ready to send */
110#define QUEUED 5 /* there is work queued on this connection */
111#define BUSY 6 /* work is being done */
112#define STANDBY 8 /* no outgoing messages, socket closed. we keep
113 * the ceph_connection around to maintain shared
114 * state with the peer. */
115#define CLOSED 10 /* we've closed the connection */
116#define SOCK_CLOSED 11 /* socket state changed to closed */
117#define REGISTERED 12 /* connection appears in con_tree */
118#define OPENING 13 /* open connection w/ (possibly new) peer */
119#define DEAD 14 /* dead, about to kfree */
120
121/*
122 * A single connection with another host.
123 *
124 * We maintain a queue of outgoing messages, and some session state to
125 * ensure that we can preserve the lossless, ordered delivery of
126 * messages in the case of a TCP disconnect.
127 */
128struct ceph_connection {
129 void *private;
130 atomic_t nref;
131
132 const struct ceph_connection_operations *ops;
133
134 struct ceph_messenger *msgr;
135 struct socket *sock;
136 unsigned long state; /* connection state (see flags above) */
137 const char *error_msg; /* error message, if any */
138
139 struct ceph_entity_addr peer_addr; /* peer address */
140 struct ceph_entity_name peer_name; /* peer name */
141 struct ceph_entity_addr peer_addr_for_me;
142 u32 connect_seq; /* identify the most recent connection
143 attempt for this connection, client */
144 u32 peer_global_seq; /* peer's global seq for this connection */
145
146 /* out queue */
147 struct mutex out_mutex;
148 struct list_head out_queue;
149 struct list_head out_sent; /* sending or sent but unacked */
150 u64 out_seq; /* last message queued for send */
151 u64 out_seq_sent; /* last message sent */
152 bool out_keepalive_pending;
153
154 u64 in_seq, in_seq_acked; /* last message received, acked */
155
156 /* connection negotiation temps */
157 char in_banner[CEPH_BANNER_MAX_LEN];
158 union {
159 struct { /* outgoing connection */
160 struct ceph_msg_connect out_connect;
161 struct ceph_msg_connect_reply in_reply;
162 };
163 struct { /* incoming */
164 struct ceph_msg_connect in_connect;
165 struct ceph_msg_connect_reply out_reply;
166 };
167 };
168 struct ceph_entity_addr actual_peer_addr;
169
170 /* message out temps */
171 struct ceph_msg *out_msg; /* sending message (== tail of
172 out_sent) */
173 struct ceph_msg_pos out_msg_pos;
174
175 struct kvec out_kvec[8], /* sending header/footer data */
176 *out_kvec_cur;
177 int out_kvec_left; /* kvec's left in out_kvec */
178 int out_skip; /* skip this many bytes */
179 int out_kvec_bytes; /* total bytes left */
180 bool out_kvec_is_msg; /* kvec refers to out_msg */
181 int out_more; /* there is more data after the kvecs */
182 __le64 out_temp_ack; /* for writing an ack */
183
184 /* message in temps */
185 struct ceph_msg_header in_hdr;
186 struct ceph_msg *in_msg;
187 struct ceph_msg_pos in_msg_pos;
188 u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */
189
190 char in_tag; /* protocol control byte */
191 int in_base_pos; /* bytes read */
192 __le64 in_temp_ack; /* for reading an ack */
193
194 struct delayed_work work; /* send|recv work */
195 unsigned long delay; /* current delay interval */
196};
197
198
199extern const char *pr_addr(const struct sockaddr_storage *ss);
200extern int ceph_parse_ips(const char *c, const char *end,
201 struct ceph_entity_addr *addr,
202 int max_count, int *count);
203
204
205extern int ceph_msgr_init(void);
206extern void ceph_msgr_exit(void);
207
208extern struct ceph_messenger *ceph_messenger_create(
209 struct ceph_entity_addr *myaddr);
210extern void ceph_messenger_destroy(struct ceph_messenger *);
211
212extern void ceph_con_init(struct ceph_messenger *msgr,
213 struct ceph_connection *con);
214extern void ceph_con_shutdown(struct ceph_connection *con);
215extern void ceph_con_open(struct ceph_connection *con,
216 struct ceph_entity_addr *addr);
217extern void ceph_con_close(struct ceph_connection *con);
218extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
219extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
220extern void ceph_con_keepalive(struct ceph_connection *con);
221extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
222extern void ceph_con_put(struct ceph_connection *con);
223
224extern struct ceph_msg *ceph_msg_new(int type, int front_len,
225 int page_len, int page_off,
226 struct page **pages);
227extern void ceph_msg_kfree(struct ceph_msg *m);
228
229extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
230 struct ceph_msg_header *hdr);
231extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
232
233
234static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
235{
236 dout("ceph_msg_get %p %d -> %d\n", msg, atomic_read(&msg->nref),
237 atomic_read(&msg->nref)+1);
238 atomic_inc(&msg->nref);
239 return msg;
240}
241extern void ceph_msg_put(struct ceph_msg *msg);
242
243#endif