aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/messenger.h
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2009-10-06 14:31:13 -0400
committerSage Weil <sage@newdream.net>2009-10-06 14:31:13 -0400
commit31b8006e1d79e127a776c9414e3e0b5f9508047e (patch)
tree9c56f678f8dce4e25690461b078409d657731f77 /fs/ceph/messenger.h
parent963b61eb041e8850807d95f8d7a4c6a454c45000 (diff)
ceph: messenger library
A generic message passing library is used to communicate with all other components in the Ceph file system. The messenger library provides ordered, reliable delivery of messages between two nodes in the system. This implementation is based on TCP. Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/messenger.h')
-rw-r--r--fs/ceph/messenger.h243
1 files changed, 243 insertions, 0 deletions
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
new file mode 100644
index 000000000000..dcd98b64dca9
--- /dev/null
+++ b/fs/ceph/messenger.h
@@ -0,0 +1,243 @@
1#ifndef __FS_CEPH_MESSENGER_H
2#define __FS_CEPH_MESSENGER_H
3
4#include <linux/mutex.h>
5#include <linux/net.h>
6#include <linux/radix-tree.h>
7#include <linux/uio.h>
8#include <linux/version.h>
9#include <linux/workqueue.h>
10
11#include "types.h"
12#include "buffer.h"
13
14struct ceph_msg;
15struct ceph_connection;
16
17extern struct workqueue_struct *ceph_msgr_wq; /* receive work queue */
18
19/*
20 * Ceph defines these callbacks for handling connection events.
21 */
22struct ceph_connection_operations {
23 struct ceph_connection *(*get)(struct ceph_connection *);
24 void (*put)(struct ceph_connection *);
25
26 /* handle an incoming message. */
27 void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
28
29 /* protocol version mismatch */
30 void (*bad_proto) (struct ceph_connection *con);
31
32 /* there was some error on the socket (disconnect, whatever) */
33 void (*fault) (struct ceph_connection *con);
34
35 /* a remote host as terminated a message exchange session, and messages
36 * we sent (or they tried to send us) may be lost. */
37 void (*peer_reset) (struct ceph_connection *con);
38
39 struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
40 struct ceph_msg_header *hdr);
41 int (*alloc_middle) (struct ceph_connection *con,
42 struct ceph_msg *msg);
43 /* an incoming message has a data payload; tell me what pages I
44 * should read the data into. */
45 int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
46 int want);
47};
48
49extern const char *ceph_name_type_str(int t);
50
51/* use format string %s%d */
52#define ENTITY_NAME(n) ceph_name_type_str((n).type), le64_to_cpu((n).num)
53
54struct ceph_messenger {
55 struct ceph_entity_inst inst; /* my name+address */
56 struct page *zero_page; /* used in certain error cases */
57
58 bool nocrc;
59
60 /*
61 * the global_seq counts connections i (attempt to) initiate
62 * in order to disambiguate certain connect race conditions.
63 */
64 u32 global_seq;
65 spinlock_t global_seq_lock;
66};
67
68/*
69 * a single message. it contains a header (src, dest, message type, etc.),
70 * footer (crc values, mainly), a "front" message body, and possibly a
71 * data payload (stored in some number of pages).
72 */
73struct ceph_msg {
74 struct ceph_msg_header hdr; /* header */
75 struct ceph_msg_footer footer; /* footer */
76 struct kvec front; /* unaligned blobs of message */
77 struct ceph_buffer *middle;
78 struct page **pages; /* data payload. NOT OWNER. */
79 unsigned nr_pages; /* size of page array */
80 struct list_head list_head;
81 atomic_t nref;
82 bool front_is_vmalloc;
83 bool more_to_follow;
84 int front_max;
85
86 struct ceph_msgpool *pool;
87};
88
89struct ceph_msg_pos {
90 int page, page_pos; /* which page; offset in page */
91 int data_pos; /* offset in data payload */
92 int did_page_crc; /* true if we've calculated crc for current page */
93};
94
95/* ceph connection fault delay defaults, for exponential backoff */
96#define BASE_DELAY_INTERVAL (HZ/2)
97#define MAX_DELAY_INTERVAL (5 * 60 * HZ)
98
99/*
100 * ceph_connection state bit flags
101 *
102 * QUEUED and BUSY are used together to ensure that only a single
103 * thread is currently opening, reading or writing data to the socket.
104 */
105#define LOSSYTX 0 /* we can close channel or drop messages on errors */
106#define LOSSYRX 1 /* peer may reset/drop messages */
107#define CONNECTING 2
108#define KEEPALIVE_PENDING 3
109#define WRITE_PENDING 4 /* we have data ready to send */
110#define QUEUED 5 /* there is work queued on this connection */
111#define BUSY 6 /* work is being done */
112#define STANDBY 8 /* no outgoing messages, socket closed. we keep
113 * the ceph_connection around to maintain shared
114 * state with the peer. */
115#define CLOSED 10 /* we've closed the connection */
116#define SOCK_CLOSED 11 /* socket state changed to closed */
117#define REGISTERED 12 /* connection appears in con_tree */
118#define OPENING 13 /* open connection w/ (possibly new) peer */
119#define DEAD 14 /* dead, about to kfree */
120
121/*
122 * A single connection with another host.
123 *
124 * We maintain a queue of outgoing messages, and some session state to
125 * ensure that we can preserve the lossless, ordered delivery of
126 * messages in the case of a TCP disconnect.
127 */
128struct ceph_connection {
129 void *private;
130 atomic_t nref;
131
132 const struct ceph_connection_operations *ops;
133
134 struct ceph_messenger *msgr;
135 struct socket *sock;
136 unsigned long state; /* connection state (see flags above) */
137 const char *error_msg; /* error message, if any */
138
139 struct ceph_entity_addr peer_addr; /* peer address */
140 struct ceph_entity_name peer_name; /* peer name */
141 struct ceph_entity_addr peer_addr_for_me;
142 u32 connect_seq; /* identify the most recent connection
143 attempt for this connection, client */
144 u32 peer_global_seq; /* peer's global seq for this connection */
145
146 /* out queue */
147 struct mutex out_mutex;
148 struct list_head out_queue;
149 struct list_head out_sent; /* sending or sent but unacked */
150 u64 out_seq; /* last message queued for send */
151 u64 out_seq_sent; /* last message sent */
152 bool out_keepalive_pending;
153
154 u64 in_seq, in_seq_acked; /* last message received, acked */
155
156 /* connection negotiation temps */
157 char in_banner[CEPH_BANNER_MAX_LEN];
158 union {
159 struct { /* outgoing connection */
160 struct ceph_msg_connect out_connect;
161 struct ceph_msg_connect_reply in_reply;
162 };
163 struct { /* incoming */
164 struct ceph_msg_connect in_connect;
165 struct ceph_msg_connect_reply out_reply;
166 };
167 };
168 struct ceph_entity_addr actual_peer_addr;
169
170 /* message out temps */
171 struct ceph_msg *out_msg; /* sending message (== tail of
172 out_sent) */
173 struct ceph_msg_pos out_msg_pos;
174
175 struct kvec out_kvec[8], /* sending header/footer data */
176 *out_kvec_cur;
177 int out_kvec_left; /* kvec's left in out_kvec */
178 int out_skip; /* skip this many bytes */
179 int out_kvec_bytes; /* total bytes left */
180 bool out_kvec_is_msg; /* kvec refers to out_msg */
181 int out_more; /* there is more data after the kvecs */
182 __le64 out_temp_ack; /* for writing an ack */
183
184 /* message in temps */
185 struct ceph_msg_header in_hdr;
186 struct ceph_msg *in_msg;
187 struct ceph_msg_pos in_msg_pos;
188 u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */
189
190 char in_tag; /* protocol control byte */
191 int in_base_pos; /* bytes read */
192 __le64 in_temp_ack; /* for reading an ack */
193
194 struct delayed_work work; /* send|recv work */
195 unsigned long delay; /* current delay interval */
196};
197
198
199extern const char *pr_addr(const struct sockaddr_storage *ss);
200extern int ceph_parse_ips(const char *c, const char *end,
201 struct ceph_entity_addr *addr,
202 int max_count, int *count);
203
204
205extern int ceph_msgr_init(void);
206extern void ceph_msgr_exit(void);
207
208extern struct ceph_messenger *ceph_messenger_create(
209 struct ceph_entity_addr *myaddr);
210extern void ceph_messenger_destroy(struct ceph_messenger *);
211
212extern void ceph_con_init(struct ceph_messenger *msgr,
213 struct ceph_connection *con);
214extern void ceph_con_shutdown(struct ceph_connection *con);
215extern void ceph_con_open(struct ceph_connection *con,
216 struct ceph_entity_addr *addr);
217extern void ceph_con_close(struct ceph_connection *con);
218extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
219extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
220extern void ceph_con_keepalive(struct ceph_connection *con);
221extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
222extern void ceph_con_put(struct ceph_connection *con);
223
224extern struct ceph_msg *ceph_msg_new(int type, int front_len,
225 int page_len, int page_off,
226 struct page **pages);
227extern void ceph_msg_kfree(struct ceph_msg *m);
228
229extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
230 struct ceph_msg_header *hdr);
231extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
232
233
234static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
235{
236 dout("ceph_msg_get %p %d -> %d\n", msg, atomic_read(&msg->nref),
237 atomic_read(&msg->nref)+1);
238 atomic_inc(&msg->nref);
239 return msg;
240}
241extern void ceph_msg_put(struct ceph_msg *msg);
242
243#endif