aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/cluster')
-rw-r--r--fs/ocfs2/cluster/Makefile4
-rw-r--r--fs/ocfs2/cluster/endian.h30
-rw-r--r--fs/ocfs2/cluster/heartbeat.c1797
-rw-r--r--fs/ocfs2/cluster/heartbeat.h82
-rw-r--r--fs/ocfs2/cluster/masklog.c166
-rw-r--r--fs/ocfs2/cluster/masklog.h275
-rw-r--r--fs/ocfs2/cluster/nodemanager.c791
-rw-r--r--fs/ocfs2/cluster/nodemanager.h64
-rw-r--r--fs/ocfs2/cluster/ocfs2_heartbeat.h37
-rw-r--r--fs/ocfs2/cluster/ocfs2_nodemanager.h39
-rw-r--r--fs/ocfs2/cluster/quorum.c315
-rw-r--r--fs/ocfs2/cluster/quorum.h36
-rw-r--r--fs/ocfs2/cluster/sys.c124
-rw-r--r--fs/ocfs2/cluster/sys.h33
-rw-r--r--fs/ocfs2/cluster/tcp.c1829
-rw-r--r--fs/ocfs2/cluster/tcp.h113
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h174
-rw-r--r--fs/ocfs2/cluster/ver.c42
-rw-r--r--fs/ocfs2/cluster/ver.h31
19 files changed, 5982 insertions, 0 deletions
diff --git a/fs/ocfs2/cluster/Makefile b/fs/ocfs2/cluster/Makefile
new file mode 100644
index 000000000000..cdd162f13650
--- /dev/null
+++ b/fs/ocfs2/cluster/Makefile
@@ -0,0 +1,4 @@
1obj-$(CONFIG_OCFS2_FS) += ocfs2_nodemanager.o
2
3ocfs2_nodemanager-objs := heartbeat.o masklog.o sys.o nodemanager.o \
4 quorum.o tcp.o ver.o
diff --git a/fs/ocfs2/cluster/endian.h b/fs/ocfs2/cluster/endian.h
new file mode 100644
index 000000000000..2df9082f4e35
--- /dev/null
+++ b/fs/ocfs2/cluster/endian.h
@@ -0,0 +1,30 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * Copyright (C) 2005 Oracle. All rights reserved.
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA.
20 */
21
22#ifndef OCFS2_CLUSTER_ENDIAN_H
23#define OCFS2_CLUSTER_ENDIAN_H
24
25static inline void be32_add_cpu(__be32 *var, u32 val)
26{
27 *var = cpu_to_be32(be32_to_cpu(*var) + val);
28}
29
30#endif /* OCFS2_CLUSTER_ENDIAN_H */
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
new file mode 100644
index 000000000000..7307ba528913
--- /dev/null
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -0,0 +1,1797 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * Copyright (C) 2004, 2005 Oracle. All rights reserved.
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA.
20 */
21
22#include <linux/kernel.h>
23#include <linux/sched.h>
24#include <linux/jiffies.h>
25#include <linux/module.h>
26#include <linux/fs.h>
27#include <linux/bio.h>
28#include <linux/blkdev.h>
29#include <linux/delay.h>
30#include <linux/file.h>
31#include <linux/kthread.h>
32#include <linux/configfs.h>
33#include <linux/random.h>
34#include <linux/crc32.h>
35#include <linux/time.h>
36
37#include "heartbeat.h"
38#include "tcp.h"
39#include "nodemanager.h"
40#include "quorum.h"
41
42#include "masklog.h"
43
44
45/*
46 * The first heartbeat pass had one global thread that would serialize all hb
47 * callback calls. This global serializing sem should only be removed once
48 * we've made sure that all callees can deal with being called concurrently
49 * from multiple hb region threads.
50 */
51static DECLARE_RWSEM(o2hb_callback_sem);
52
53/*
54 * multiple hb threads are watching multiple regions. A node is live
55 * whenever any of the threads sees activity from the node in its region.
56 */
57static spinlock_t o2hb_live_lock = SPIN_LOCK_UNLOCKED;
58static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
59static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
60static LIST_HEAD(o2hb_node_events);
61static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
62
63static LIST_HEAD(o2hb_all_regions);
64
65static struct o2hb_callback {
66 struct list_head list;
67} o2hb_callbacks[O2HB_NUM_CB];
68
69static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
70
71#define O2HB_DEFAULT_BLOCK_BITS 9
72
73unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
74
75/* Only sets a new threshold if there are no active regions.
76 *
77 * No locking or otherwise interesting code is required for reading
78 * o2hb_dead_threshold as it can't change once regions are active and
79 * it's not interesting to anyone until then anyway. */
80static void o2hb_dead_threshold_set(unsigned int threshold)
81{
82 if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
83 spin_lock(&o2hb_live_lock);
84 if (list_empty(&o2hb_all_regions))
85 o2hb_dead_threshold = threshold;
86 spin_unlock(&o2hb_live_lock);
87 }
88}
89
90struct o2hb_node_event {
91 struct list_head hn_item;
92 enum o2hb_callback_type hn_event_type;
93 struct o2nm_node *hn_node;
94 int hn_node_num;
95};
96
97struct o2hb_disk_slot {
98 struct o2hb_disk_heartbeat_block *ds_raw_block;
99 u8 ds_node_num;
100 u64 ds_last_time;
101 u64 ds_last_generation;
102 u16 ds_equal_samples;
103 u16 ds_changed_samples;
104 struct list_head ds_live_item;
105};
106
107/* each thread owns a region.. when we're asked to tear down the region
108 * we ask the thread to stop, who cleans up the region */
109struct o2hb_region {
110 struct config_item hr_item;
111
112 struct list_head hr_all_item;
113 unsigned hr_unclean_stop:1;
114
115 /* protected by the hr_callback_sem */
116 struct task_struct *hr_task;
117
118 unsigned int hr_blocks;
119 unsigned long long hr_start_block;
120
121 unsigned int hr_block_bits;
122 unsigned int hr_block_bytes;
123
124 unsigned int hr_slots_per_page;
125 unsigned int hr_num_pages;
126
127 struct page **hr_slot_data;
128 struct block_device *hr_bdev;
129 struct o2hb_disk_slot *hr_slots;
130
131 /* let the person setting up hb wait for it to return until it
132 * has reached a 'steady' state. This will be fixed when we have
133 * a more complete api that doesn't lead to this sort of fragility. */
134 atomic_t hr_steady_iterations;
135
136 char hr_dev_name[BDEVNAME_SIZE];
137
138 unsigned int hr_timeout_ms;
139
140 /* randomized as the region goes up and down so that a node
141 * recognizes a node going up and down in one iteration */
142 u64 hr_generation;
143
144 struct work_struct hr_write_timeout_work;
145 unsigned long hr_last_timeout_start;
146
147 /* Used during o2hb_check_slot to hold a copy of the block
148 * being checked because we temporarily have to zero out the
149 * crc field. */
150 struct o2hb_disk_heartbeat_block *hr_tmp_block;
151};
152
153struct o2hb_bio_wait_ctxt {
154 atomic_t wc_num_reqs;
155 struct completion wc_io_complete;
156};
157
158static void o2hb_write_timeout(void *arg)
159{
160 struct o2hb_region *reg = arg;
161
162 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
163 "milliseconds\n", reg->hr_dev_name,
164 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
165 o2quo_disk_timeout();
166}
167
168static void o2hb_arm_write_timeout(struct o2hb_region *reg)
169{
170 mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);
171
172 cancel_delayed_work(&reg->hr_write_timeout_work);
173 reg->hr_last_timeout_start = jiffies;
174 schedule_delayed_work(&reg->hr_write_timeout_work,
175 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
176}
177
178static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
179{
180 cancel_delayed_work(&reg->hr_write_timeout_work);
181 flush_scheduled_work();
182}
183
184static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc,
185 unsigned int num_ios)
186{
187 atomic_set(&wc->wc_num_reqs, num_ios);
188 init_completion(&wc->wc_io_complete);
189}
190
191/* Used in error paths too */
192static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
193 unsigned int num)
194{
195 /* sadly atomic_sub_and_test() isn't available on all platforms. The
196 * good news is that the fast path only completes one at a time */
197 while(num--) {
198 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
199 BUG_ON(num > 0);
200 complete(&wc->wc_io_complete);
201 }
202 }
203}
204
205static void o2hb_wait_on_io(struct o2hb_region *reg,
206 struct o2hb_bio_wait_ctxt *wc)
207{
208 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
209
210 blk_run_address_space(mapping);
211
212 wait_for_completion(&wc->wc_io_complete);
213}
214
215static int o2hb_bio_end_io(struct bio *bio,
216 unsigned int bytes_done,
217 int error)
218{
219 struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
220
221 if (error)
222 mlog(ML_ERROR, "IO Error %d\n", error);
223
224 if (bio->bi_size)
225 return 1;
226
227 o2hb_bio_wait_dec(wc, 1);
228 return 0;
229}
230
231/* Setup a Bio to cover I/O against num_slots slots starting at
232 * start_slot. */
233static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
234 struct o2hb_bio_wait_ctxt *wc,
235 unsigned int start_slot,
236 unsigned int num_slots)
237{
238 int i, nr_vecs, len, first_page, last_page;
239 unsigned int vec_len, vec_start;
240 unsigned int bits = reg->hr_block_bits;
241 unsigned int spp = reg->hr_slots_per_page;
242 struct bio *bio;
243 struct page *page;
244
245 nr_vecs = (num_slots + spp - 1) / spp;
246
247 /* Testing has shown this allocation to take long enough under
248 * GFP_KERNEL that the local node can get fenced. It would be
249 * nicest if we could pre-allocate these bios and avoid this
250 * all together. */
251 bio = bio_alloc(GFP_ATOMIC, nr_vecs);
252 if (!bio) {
253 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
254 bio = ERR_PTR(-ENOMEM);
255 goto bail;
256 }
257
258 /* Must put everything in 512 byte sectors for the bio... */
259 bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9);
260 bio->bi_bdev = reg->hr_bdev;
261 bio->bi_private = wc;
262 bio->bi_end_io = o2hb_bio_end_io;
263
264 first_page = start_slot / spp;
265 last_page = first_page + nr_vecs;
266 vec_start = (start_slot << bits) % PAGE_CACHE_SIZE;
267 for(i = first_page; i < last_page; i++) {
268 page = reg->hr_slot_data[i];
269
270 vec_len = PAGE_CACHE_SIZE;
271 /* last page might be short */
272 if (((i + 1) * spp) > (start_slot + num_slots))
273 vec_len = ((num_slots + start_slot) % spp) << bits;
274 vec_len -= vec_start;
275
276 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
277 i, vec_len, vec_start);
278
279 len = bio_add_page(bio, page, vec_len, vec_start);
280 if (len != vec_len) {
281 bio_put(bio);
282 bio = ERR_PTR(-EIO);
283
284 mlog(ML_ERROR, "Error adding page to bio i = %d, "
285 "vec_len = %u, len = %d\n, start = %u\n",
286 i, vec_len, len, vec_start);
287 goto bail;
288 }
289
290 vec_start = 0;
291 }
292
293bail:
294 return bio;
295}
296
297/*
298 * Compute the maximum number of sectors the bdev can handle in one bio,
299 * as a power of two.
300 *
301 * Stolen from oracleasm, thanks Joel!
302 */
303static int compute_max_sectors(struct block_device *bdev)
304{
305 int max_pages, max_sectors, pow_two_sectors;
306
307 struct request_queue *q;
308
309 q = bdev_get_queue(bdev);
310 max_pages = q->max_sectors >> (PAGE_SHIFT - 9);
311 if (max_pages > BIO_MAX_PAGES)
312 max_pages = BIO_MAX_PAGES;
313 if (max_pages > q->max_phys_segments)
314 max_pages = q->max_phys_segments;
315 if (max_pages > q->max_hw_segments)
316 max_pages = q->max_hw_segments;
317 max_pages--; /* Handle I/Os that straddle a page */
318
319 max_sectors = max_pages << (PAGE_SHIFT - 9);
320
321 /* Why is fls() 1-based???? */
322 pow_two_sectors = 1 << (fls(max_sectors) - 1);
323
324 return pow_two_sectors;
325}
326
327static inline void o2hb_compute_request_limits(struct o2hb_region *reg,
328 unsigned int num_slots,
329 unsigned int *num_bios,
330 unsigned int *slots_per_bio)
331{
332 unsigned int max_sectors, io_sectors;
333
334 max_sectors = compute_max_sectors(reg->hr_bdev);
335
336 io_sectors = num_slots << (reg->hr_block_bits - 9);
337
338 *num_bios = (io_sectors + max_sectors - 1) / max_sectors;
339 *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);
340
341 mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "
342 "device can handle %u sectors of I/O\n", io_sectors, num_slots,
343 max_sectors);
344 mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",
345 *num_bios, *slots_per_bio);
346}
347
348static int o2hb_read_slots(struct o2hb_region *reg,
349 unsigned int max_slots)
350{
351 unsigned int num_bios, slots_per_bio, start_slot, num_slots;
352 int i, status;
353 struct o2hb_bio_wait_ctxt wc;
354 struct bio **bios;
355 struct bio *bio;
356
357 o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio);
358
359 bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL);
360 if (!bios) {
361 status = -ENOMEM;
362 mlog_errno(status);
363 return status;
364 }
365
366 o2hb_bio_wait_init(&wc, num_bios);
367
368 num_slots = slots_per_bio;
369 for(i = 0; i < num_bios; i++) {
370 start_slot = i * slots_per_bio;
371
372 /* adjust num_slots at last bio */
373 if (max_slots < (start_slot + num_slots))
374 num_slots = max_slots - start_slot;
375
376 bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);
377 if (IS_ERR(bio)) {
378 o2hb_bio_wait_dec(&wc, num_bios - i);
379
380 status = PTR_ERR(bio);
381 mlog_errno(status);
382 goto bail_and_wait;
383 }
384 bios[i] = bio;
385
386 submit_bio(READ, bio);
387 }
388
389 status = 0;
390
391bail_and_wait:
392 o2hb_wait_on_io(reg, &wc);
393
394 if (bios) {
395 for(i = 0; i < num_bios; i++)
396 if (bios[i])
397 bio_put(bios[i]);
398 kfree(bios);
399 }
400
401 return status;
402}
403
404static int o2hb_issue_node_write(struct o2hb_region *reg,
405 struct bio **write_bio,
406 struct o2hb_bio_wait_ctxt *write_wc)
407{
408 int status;
409 unsigned int slot;
410 struct bio *bio;
411
412 o2hb_bio_wait_init(write_wc, 1);
413
414 slot = o2nm_this_node();
415
416 bio = o2hb_setup_one_bio(reg, write_wc, slot, 1);
417 if (IS_ERR(bio)) {
418 status = PTR_ERR(bio);
419 mlog_errno(status);
420 goto bail;
421 }
422
423 submit_bio(WRITE, bio);
424
425 *write_bio = bio;
426 status = 0;
427bail:
428 return status;
429}
430
431static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
432 struct o2hb_disk_heartbeat_block *hb_block)
433{
434 __le32 old_cksum;
435 u32 ret;
436
437 /* We want to compute the block crc with a 0 value in the
438 * hb_cksum field. Save it off here and replace after the
439 * crc. */
440 old_cksum = hb_block->hb_cksum;
441 hb_block->hb_cksum = 0;
442
443 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
444
445 hb_block->hb_cksum = old_cksum;
446
447 return ret;
448}
449
450static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
451{
452 mlog(ML_ERROR, "Dump slot information: seq = 0x%"MLFx64", node = %u, "
453 "cksum = 0x%x, generation 0x%"MLFx64"\n",
454 le64_to_cpu(hb_block->hb_seq), hb_block->hb_node,
455 le32_to_cpu(hb_block->hb_cksum),
456 le64_to_cpu(hb_block->hb_generation));
457}
458
459static int o2hb_verify_crc(struct o2hb_region *reg,
460 struct o2hb_disk_heartbeat_block *hb_block)
461{
462 u32 read, computed;
463
464 read = le32_to_cpu(hb_block->hb_cksum);
465 computed = o2hb_compute_block_crc_le(reg, hb_block);
466
467 return read == computed;
468}
469
470/* We want to make sure that nobody is heartbeating on top of us --
471 * this will help detect an invalid configuration. */
472static int o2hb_check_last_timestamp(struct o2hb_region *reg)
473{
474 int node_num, ret;
475 struct o2hb_disk_slot *slot;
476 struct o2hb_disk_heartbeat_block *hb_block;
477
478 node_num = o2nm_this_node();
479
480 ret = 1;
481 slot = &reg->hr_slots[node_num];
482 /* Don't check on our 1st timestamp */
483 if (slot->ds_last_time) {
484 hb_block = slot->ds_raw_block;
485
486 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time)
487 ret = 0;
488 }
489
490 return ret;
491}
492
493static inline void o2hb_prepare_block(struct o2hb_region *reg,
494 u64 generation)
495{
496 int node_num;
497 u64 cputime;
498 struct o2hb_disk_slot *slot;
499 struct o2hb_disk_heartbeat_block *hb_block;
500
501 node_num = o2nm_this_node();
502 slot = &reg->hr_slots[node_num];
503
504 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
505 memset(hb_block, 0, reg->hr_block_bytes);
506 /* TODO: time stuff */
507 cputime = CURRENT_TIME.tv_sec;
508 if (!cputime)
509 cputime = 1;
510
511 hb_block->hb_seq = cpu_to_le64(cputime);
512 hb_block->hb_node = node_num;
513 hb_block->hb_generation = cpu_to_le64(generation);
514
515 /* This step must always happen last! */
516 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
517 hb_block));
518
519 mlog(ML_HB_BIO, "our node generation = 0x%"MLFx64", cksum = 0x%x\n",
520 cpu_to_le64(generation), le32_to_cpu(hb_block->hb_cksum));
521}
522
523static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
524 struct o2nm_node *node,
525 int idx)
526{
527 struct list_head *iter;
528 struct o2hb_callback_func *f;
529
530 list_for_each(iter, &hbcall->list) {
531 f = list_entry(iter, struct o2hb_callback_func, hc_item);
532 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
533 (f->hc_func)(node, idx, f->hc_data);
534 }
535}
536
537/* Will run the list in order until we process the passed event */
538static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
539{
540 int empty;
541 struct o2hb_callback *hbcall;
542 struct o2hb_node_event *event;
543
544 spin_lock(&o2hb_live_lock);
545 empty = list_empty(&queued_event->hn_item);
546 spin_unlock(&o2hb_live_lock);
547 if (empty)
548 return;
549
550 /* Holding callback sem assures we don't alter the callback
551 * lists when doing this, and serializes ourselves with other
552 * processes wanting callbacks. */
553 down_write(&o2hb_callback_sem);
554
555 spin_lock(&o2hb_live_lock);
556 while (!list_empty(&o2hb_node_events)
557 && !list_empty(&queued_event->hn_item)) {
558 event = list_entry(o2hb_node_events.next,
559 struct o2hb_node_event,
560 hn_item);
561 list_del_init(&event->hn_item);
562 spin_unlock(&o2hb_live_lock);
563
564 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
565 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
566 event->hn_node_num);
567
568 hbcall = hbcall_from_type(event->hn_event_type);
569
570 /* We should *never* have gotten on to the list with a
571 * bad type... This isn't something that we should try
572 * to recover from. */
573 BUG_ON(IS_ERR(hbcall));
574
575 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
576
577 spin_lock(&o2hb_live_lock);
578 }
579 spin_unlock(&o2hb_live_lock);
580
581 up_write(&o2hb_callback_sem);
582}
583
584static void o2hb_queue_node_event(struct o2hb_node_event *event,
585 enum o2hb_callback_type type,
586 struct o2nm_node *node,
587 int node_num)
588{
589 assert_spin_locked(&o2hb_live_lock);
590
591 event->hn_event_type = type;
592 event->hn_node = node;
593 event->hn_node_num = node_num;
594
595 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
596 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
597
598 list_add_tail(&event->hn_item, &o2hb_node_events);
599}
600
601static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
602{
603 struct o2hb_node_event event =
604 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
605 struct o2nm_node *node;
606
607 node = o2nm_get_node_by_num(slot->ds_node_num);
608 if (!node)
609 return;
610
611 spin_lock(&o2hb_live_lock);
612 if (!list_empty(&slot->ds_live_item)) {
613 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
614 slot->ds_node_num);
615
616 list_del_init(&slot->ds_live_item);
617
618 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
619 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
620
621 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
622 slot->ds_node_num);
623 }
624 }
625 spin_unlock(&o2hb_live_lock);
626
627 o2hb_run_event_list(&event);
628
629 o2nm_node_put(node);
630}
631
632static int o2hb_check_slot(struct o2hb_region *reg,
633 struct o2hb_disk_slot *slot)
634{
635 int changed = 0, gen_changed = 0;
636 struct o2hb_node_event event =
637 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
638 struct o2nm_node *node;
639 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
640 u64 cputime;
641
642 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
643
644 /* Is this correct? Do we assume that the node doesn't exist
645 * if we're not configured for him? */
646 node = o2nm_get_node_by_num(slot->ds_node_num);
647 if (!node)
648 return 0;
649
650 if (!o2hb_verify_crc(reg, hb_block)) {
651 /* all paths from here will drop o2hb_live_lock for
652 * us. */
653 spin_lock(&o2hb_live_lock);
654
655 /* Don't print an error on the console in this case -
656 * a freshly formatted heartbeat area will not have a
657 * crc set on it. */
658 if (list_empty(&slot->ds_live_item))
659 goto out;
660
661 /* The node is live but pushed out a bad crc. We
662 * consider it a transient miss but don't populate any
663 * other values as they may be junk. */
664 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
665 slot->ds_node_num, reg->hr_dev_name);
666 o2hb_dump_slot(hb_block);
667
668 slot->ds_equal_samples++;
669 goto fire_callbacks;
670 }
671
672 /* we don't care if these wrap.. the state transitions below
673 * clear at the right places */
674 cputime = le64_to_cpu(hb_block->hb_seq);
675 if (slot->ds_last_time != cputime)
676 slot->ds_changed_samples++;
677 else
678 slot->ds_equal_samples++;
679 slot->ds_last_time = cputime;
680
681 /* The node changed heartbeat generations. We assume this to
682 * mean it dropped off but came back before we timed out. We
683 * want to consider it down for the time being but don't want
684 * to lose any changed_samples state we might build up to
685 * considering it live again. */
686 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
687 gen_changed = 1;
688 slot->ds_equal_samples = 0;
689 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%"MLFx64" "
690 "to 0x%"MLFx64")\n", slot->ds_node_num,
691 slot->ds_last_generation,
692 le64_to_cpu(hb_block->hb_generation));
693 }
694
695 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
696
697 mlog(ML_HEARTBEAT, "Slot %d gen 0x%"MLFx64" cksum 0x%x "
698 "seq %"MLFu64" last %"MLFu64" changed %u equal %u\n",
699 slot->ds_node_num, slot->ds_last_generation,
700 le32_to_cpu(hb_block->hb_cksum), le64_to_cpu(hb_block->hb_seq),
701 slot->ds_last_time, slot->ds_changed_samples,
702 slot->ds_equal_samples);
703
704 spin_lock(&o2hb_live_lock);
705
706fire_callbacks:
707 /* dead nodes only come to life after some number of
708 * changes at any time during their dead time */
709 if (list_empty(&slot->ds_live_item) &&
710 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
711 mlog(ML_HEARTBEAT, "Node %d (id 0x%"MLFx64") joined my "
712 "region\n", slot->ds_node_num, slot->ds_last_generation);
713
714 /* first on the list generates a callback */
715 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
716 set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
717
718 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
719 slot->ds_node_num);
720
721 changed = 1;
722 }
723
724 list_add_tail(&slot->ds_live_item,
725 &o2hb_live_slots[slot->ds_node_num]);
726
727 slot->ds_equal_samples = 0;
728 goto out;
729 }
730
731 /* if the list is dead, we're done.. */
732 if (list_empty(&slot->ds_live_item))
733 goto out;
734
735 /* live nodes only go dead after enough consequtive missed
736 * samples.. reset the missed counter whenever we see
737 * activity */
738 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
739 mlog(ML_HEARTBEAT, "Node %d left my region\n",
740 slot->ds_node_num);
741
742 /* last off the live_slot generates a callback */
743 list_del_init(&slot->ds_live_item);
744 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
745 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
746
747 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
748 slot->ds_node_num);
749
750 changed = 1;
751 }
752
753 /* We don't clear this because the node is still
754 * actually writing new blocks. */
755 if (!gen_changed)
756 slot->ds_changed_samples = 0;
757 goto out;
758 }
759 if (slot->ds_changed_samples) {
760 slot->ds_changed_samples = 0;
761 slot->ds_equal_samples = 0;
762 }
763out:
764 spin_unlock(&o2hb_live_lock);
765
766 o2hb_run_event_list(&event);
767
768 o2nm_node_put(node);
769 return changed;
770}
771
772/* This could be faster if we just implmented a find_last_bit, but I
773 * don't think the circumstances warrant it. */
774static int o2hb_highest_node(unsigned long *nodes,
775 int numbits)
776{
777 int highest, node;
778
779 highest = numbits;
780 node = -1;
781 while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
782 if (node >= numbits)
783 break;
784
785 highest = node;
786 }
787
788 return highest;
789}
790
791static void o2hb_do_disk_heartbeat(struct o2hb_region *reg)
792{
793 int i, ret, highest_node, change = 0;
794 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
795 struct bio *write_bio;
796 struct o2hb_bio_wait_ctxt write_wc;
797
798 if (o2nm_configured_node_map(configured_nodes, sizeof(configured_nodes)))
799 return;
800
801 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
802 if (highest_node >= O2NM_MAX_NODES) {
803 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
804 return;
805 }
806
807 /* No sense in reading the slots of nodes that don't exist
808 * yet. Of course, if the node definitions have holes in them
809 * then we're reading an empty slot anyway... Consider this
810 * best-effort. */
811 ret = o2hb_read_slots(reg, highest_node + 1);
812 if (ret < 0) {
813 mlog_errno(ret);
814 return;
815 }
816
817 /* With an up to date view of the slots, we can check that no
818 * other node has been improperly configured to heartbeat in
819 * our slot. */
820 if (!o2hb_check_last_timestamp(reg))
821 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating "
822 "in our slot!\n", reg->hr_dev_name);
823
824 /* fill in the proper info for our next heartbeat */
825 o2hb_prepare_block(reg, reg->hr_generation);
826
827 /* And fire off the write. Note that we don't wait on this I/O
828 * until later. */
829 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
830 if (ret < 0) {
831 mlog_errno(ret);
832 return;
833 }
834
835 i = -1;
836 while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
837
838 change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
839 }
840
841 /*
842 * We have to be sure we've advertised ourselves on disk
843 * before we can go to steady state. This ensures that
844 * people we find in our steady state have seen us.
845 */
846 o2hb_wait_on_io(reg, &write_wc);
847 bio_put(write_bio);
848 o2hb_arm_write_timeout(reg);
849
850 /* let the person who launched us know when things are steady */
851 if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) {
852 if (atomic_dec_and_test(&reg->hr_steady_iterations))
853 wake_up(&o2hb_steady_queue);
854 }
855}
856
857/* Subtract b from a, storing the result in a. a *must* have a larger
858 * value than b. */
859static void o2hb_tv_subtract(struct timeval *a,
860 struct timeval *b)
861{
862 /* just return 0 when a is after b */
863 if (a->tv_sec < b->tv_sec ||
864 (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
865 a->tv_sec = 0;
866 a->tv_usec = 0;
867 return;
868 }
869
870 a->tv_sec -= b->tv_sec;
871 a->tv_usec -= b->tv_usec;
872 while ( a->tv_usec < 0 ) {
873 a->tv_sec--;
874 a->tv_usec += 1000000;
875 }
876}
877
878static unsigned int o2hb_elapsed_msecs(struct timeval *start,
879 struct timeval *end)
880{
881 struct timeval res = *end;
882
883 o2hb_tv_subtract(&res, start);
884
885 return res.tv_sec * 1000 + res.tv_usec / 1000;
886}
887
888/*
889 * we ride the region ref that the region dir holds. before the region
890 * dir is removed and drops it ref it will wait to tear down this
891 * thread.
892 */
893static int o2hb_thread(void *data)
894{
895 int i, ret;
896 struct o2hb_region *reg = data;
897 struct bio *write_bio;
898 struct o2hb_bio_wait_ctxt write_wc;
899 struct timeval before_hb, after_hb;
900 unsigned int elapsed_msec;
901
902 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
903
904 set_user_nice(current, -20);
905
906 while (!kthread_should_stop() && !reg->hr_unclean_stop) {
907 /* We track the time spent inside
908 * o2hb_do_disk_heartbeat so that we avoid more then
909 * hr_timeout_ms between disk writes. On busy systems
910 * this should result in a heartbeat which is less
911 * likely to time itself out. */
912 do_gettimeofday(&before_hb);
913
914 o2hb_do_disk_heartbeat(reg);
915
916 do_gettimeofday(&after_hb);
917 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
918
919 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
920 before_hb.tv_sec, before_hb.tv_usec,
921 after_hb.tv_sec, after_hb.tv_usec, elapsed_msec);
922
923 if (elapsed_msec < reg->hr_timeout_ms) {
924 /* the kthread api has blocked signals for us so no
925 * need to record the return value. */
926 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
927 }
928 }
929
930 o2hb_disarm_write_timeout(reg);
931
932 /* unclean stop is only used in very bad situation */
933 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
934 o2hb_shutdown_slot(&reg->hr_slots[i]);
935
936 /* Explicit down notification - avoid forcing the other nodes
937 * to timeout on this region when we could just as easily
938 * write a clear generation - thus indicating to them that
939 * this node has left this region.
940 *
941 * XXX: Should we skip this on unclean_stop? */
942 o2hb_prepare_block(reg, 0);
943 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
944 if (ret == 0) {
945 o2hb_wait_on_io(reg, &write_wc);
946 bio_put(write_bio);
947 } else {
948 mlog_errno(ret);
949 }
950
951 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
952
953 return 0;
954}
955
956void o2hb_init(void)
957{
958 int i;
959
960 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
961 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
962
963 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
964 INIT_LIST_HEAD(&o2hb_live_slots[i]);
965
966 INIT_LIST_HEAD(&o2hb_node_events);
967
968 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
969}
970
971/* if we're already in a callback then we're already serialized by the sem */
972static void o2hb_fill_node_map_from_callback(unsigned long *map,
973 unsigned bytes)
974{
975 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
976
977 memcpy(map, &o2hb_live_node_bitmap, bytes);
978}
979
980/*
981 * get a map of all nodes that are heartbeating in any regions
982 */
983void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
984{
985 /* callers want to serialize this map and callbacks so that they
986 * can trust that they don't miss nodes coming to the party */
987 down_read(&o2hb_callback_sem);
988 spin_lock(&o2hb_live_lock);
989 o2hb_fill_node_map_from_callback(map, bytes);
990 spin_unlock(&o2hb_live_lock);
991 up_read(&o2hb_callback_sem);
992}
993EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
994
995/*
996 * heartbeat configfs bits. The heartbeat set is a default set under
997 * the cluster set in nodemanager.c.
998 */
999
1000static struct o2hb_region *to_o2hb_region(struct config_item *item)
1001{
1002 return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1003}
1004
1005/* drop_item only drops its ref after killing the thread, nothing should
1006 * be using the region anymore. this has to clean up any state that
1007 * attributes might have built up. */
1008static void o2hb_region_release(struct config_item *item)
1009{
1010 int i;
1011 struct page *page;
1012 struct o2hb_region *reg = to_o2hb_region(item);
1013
1014 if (reg->hr_tmp_block)
1015 kfree(reg->hr_tmp_block);
1016
1017 if (reg->hr_slot_data) {
1018 for (i = 0; i < reg->hr_num_pages; i++) {
1019 page = reg->hr_slot_data[i];
1020 if (page)
1021 __free_page(page);
1022 }
1023 kfree(reg->hr_slot_data);
1024 }
1025
1026 if (reg->hr_bdev)
1027 blkdev_put(reg->hr_bdev);
1028
1029 if (reg->hr_slots)
1030 kfree(reg->hr_slots);
1031
1032 spin_lock(&o2hb_live_lock);
1033 list_del(&reg->hr_all_item);
1034 spin_unlock(&o2hb_live_lock);
1035
1036 kfree(reg);
1037}
1038
1039static int o2hb_read_block_input(struct o2hb_region *reg,
1040 const char *page,
1041 size_t count,
1042 unsigned long *ret_bytes,
1043 unsigned int *ret_bits)
1044{
1045 unsigned long bytes;
1046 char *p = (char *)page;
1047
1048 bytes = simple_strtoul(p, &p, 0);
1049 if (!p || (*p && (*p != '\n')))
1050 return -EINVAL;
1051
1052 /* Heartbeat and fs min / max block sizes are the same. */
1053 if (bytes > 4096 || bytes < 512)
1054 return -ERANGE;
1055 if (hweight16(bytes) != 1)
1056 return -EINVAL;
1057
1058 if (ret_bytes)
1059 *ret_bytes = bytes;
1060 if (ret_bits)
1061 *ret_bits = ffs(bytes) - 1;
1062
1063 return 0;
1064}
1065
1066static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
1067 char *page)
1068{
1069 return sprintf(page, "%u\n", reg->hr_block_bytes);
1070}
1071
1072static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
1073 const char *page,
1074 size_t count)
1075{
1076 int status;
1077 unsigned long block_bytes;
1078 unsigned int block_bits;
1079
1080 if (reg->hr_bdev)
1081 return -EINVAL;
1082
1083 status = o2hb_read_block_input(reg, page, count,
1084 &block_bytes, &block_bits);
1085 if (status)
1086 return status;
1087
1088 reg->hr_block_bytes = (unsigned int)block_bytes;
1089 reg->hr_block_bits = block_bits;
1090
1091 return count;
1092}
1093
1094static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
1095 char *page)
1096{
1097 return sprintf(page, "%llu\n", reg->hr_start_block);
1098}
1099
1100static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
1101 const char *page,
1102 size_t count)
1103{
1104 unsigned long long tmp;
1105 char *p = (char *)page;
1106
1107 if (reg->hr_bdev)
1108 return -EINVAL;
1109
1110 tmp = simple_strtoull(p, &p, 0);
1111 if (!p || (*p && (*p != '\n')))
1112 return -EINVAL;
1113
1114 reg->hr_start_block = tmp;
1115
1116 return count;
1117}
1118
1119static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
1120 char *page)
1121{
1122 return sprintf(page, "%d\n", reg->hr_blocks);
1123}
1124
1125static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
1126 const char *page,
1127 size_t count)
1128{
1129 unsigned long tmp;
1130 char *p = (char *)page;
1131
1132 if (reg->hr_bdev)
1133 return -EINVAL;
1134
1135 tmp = simple_strtoul(p, &p, 0);
1136 if (!p || (*p && (*p != '\n')))
1137 return -EINVAL;
1138
1139 if (tmp > O2NM_MAX_NODES || tmp == 0)
1140 return -ERANGE;
1141
1142 reg->hr_blocks = (unsigned int)tmp;
1143
1144 return count;
1145}
1146
1147static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
1148 char *page)
1149{
1150 unsigned int ret = 0;
1151
1152 if (reg->hr_bdev)
1153 ret = sprintf(page, "%s\n", reg->hr_dev_name);
1154
1155 return ret;
1156}
1157
1158static void o2hb_init_region_params(struct o2hb_region *reg)
1159{
1160 reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
1161 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1162
1163 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1164 reg->hr_start_block, reg->hr_blocks);
1165 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1166 reg->hr_block_bytes, reg->hr_block_bits);
1167 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1168 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1169}
1170
1171static int o2hb_map_slot_data(struct o2hb_region *reg)
1172{
1173 int i, j;
1174 unsigned int last_slot;
1175 unsigned int spp = reg->hr_slots_per_page;
1176 struct page *page;
1177 char *raw;
1178 struct o2hb_disk_slot *slot;
1179
1180 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1181 if (reg->hr_tmp_block == NULL) {
1182 mlog_errno(-ENOMEM);
1183 return -ENOMEM;
1184 }
1185
1186 reg->hr_slots = kcalloc(reg->hr_blocks,
1187 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1188 if (reg->hr_slots == NULL) {
1189 mlog_errno(-ENOMEM);
1190 return -ENOMEM;
1191 }
1192
1193 for(i = 0; i < reg->hr_blocks; i++) {
1194 slot = &reg->hr_slots[i];
1195 slot->ds_node_num = i;
1196 INIT_LIST_HEAD(&slot->ds_live_item);
1197 slot->ds_raw_block = NULL;
1198 }
1199
1200 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1201 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1202 "at %u blocks per page\n",
1203 reg->hr_num_pages, reg->hr_blocks, spp);
1204
1205 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1206 GFP_KERNEL);
1207 if (!reg->hr_slot_data) {
1208 mlog_errno(-ENOMEM);
1209 return -ENOMEM;
1210 }
1211
1212 for(i = 0; i < reg->hr_num_pages; i++) {
1213 page = alloc_page(GFP_KERNEL);
1214 if (!page) {
1215 mlog_errno(-ENOMEM);
1216 return -ENOMEM;
1217 }
1218
1219 reg->hr_slot_data[i] = page;
1220
1221 last_slot = i * spp;
1222 raw = page_address(page);
1223 for (j = 0;
1224 (j < spp) && ((j + last_slot) < reg->hr_blocks);
1225 j++) {
1226 BUG_ON((j + last_slot) >= reg->hr_blocks);
1227
1228 slot = &reg->hr_slots[j + last_slot];
1229 slot->ds_raw_block =
1230 (struct o2hb_disk_heartbeat_block *) raw;
1231
1232 raw += reg->hr_block_bytes;
1233 }
1234 }
1235
1236 return 0;
1237}
1238
1239/* Read in all the slots available and populate the tracking
1240 * structures so that we can start with a baseline idea of what's
1241 * there. */
1242static int o2hb_populate_slot_data(struct o2hb_region *reg)
1243{
1244 int ret, i;
1245 struct o2hb_disk_slot *slot;
1246 struct o2hb_disk_heartbeat_block *hb_block;
1247
1248 mlog_entry_void();
1249
1250 ret = o2hb_read_slots(reg, reg->hr_blocks);
1251 if (ret) {
1252 mlog_errno(ret);
1253 goto out;
1254 }
1255
1256 /* We only want to get an idea of the values initially in each
1257 * slot, so we do no verification - o2hb_check_slot will
1258 * actually determine if each configured slot is valid and
1259 * whether any values have changed. */
1260 for(i = 0; i < reg->hr_blocks; i++) {
1261 slot = &reg->hr_slots[i];
1262 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1263
1264 /* Only fill the values that o2hb_check_slot uses to
1265 * determine changing slots */
1266 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1267 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1268 }
1269
1270out:
1271 mlog_exit(ret);
1272 return ret;
1273}
1274
1275/* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1276static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1277 const char *page,
1278 size_t count)
1279{
1280 long fd;
1281 int sectsize;
1282 char *p = (char *)page;
1283 struct file *filp = NULL;
1284 struct inode *inode = NULL;
1285 ssize_t ret = -EINVAL;
1286
1287 if (reg->hr_bdev)
1288 goto out;
1289
1290 /* We can't heartbeat without having had our node number
1291 * configured yet. */
1292 if (o2nm_this_node() == O2NM_MAX_NODES)
1293 goto out;
1294
1295 fd = simple_strtol(p, &p, 0);
1296 if (!p || (*p && (*p != '\n')))
1297 goto out;
1298
1299 if (fd < 0 || fd >= INT_MAX)
1300 goto out;
1301
1302 filp = fget(fd);
1303 if (filp == NULL)
1304 goto out;
1305
1306 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1307 reg->hr_block_bytes == 0)
1308 goto out;
1309
1310 inode = igrab(filp->f_mapping->host);
1311 if (inode == NULL)
1312 goto out;
1313
1314 if (!S_ISBLK(inode->i_mode))
1315 goto out;
1316
1317 reg->hr_bdev = I_BDEV(filp->f_mapping->host);
1318 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0);
1319 if (ret) {
1320 reg->hr_bdev = NULL;
1321 goto out;
1322 }
1323 inode = NULL;
1324
1325 bdevname(reg->hr_bdev, reg->hr_dev_name);
1326
1327 sectsize = bdev_hardsect_size(reg->hr_bdev);
1328 if (sectsize != reg->hr_block_bytes) {
1329 mlog(ML_ERROR,
1330 "blocksize %u incorrect for device, expected %d",
1331 reg->hr_block_bytes, sectsize);
1332 ret = -EINVAL;
1333 goto out;
1334 }
1335
1336 o2hb_init_region_params(reg);
1337
1338 /* Generation of zero is invalid */
1339 do {
1340 get_random_bytes(&reg->hr_generation,
1341 sizeof(reg->hr_generation));
1342 } while (reg->hr_generation == 0);
1343
1344 ret = o2hb_map_slot_data(reg);
1345 if (ret) {
1346 mlog_errno(ret);
1347 goto out;
1348 }
1349
1350 ret = o2hb_populate_slot_data(reg);
1351 if (ret) {
1352 mlog_errno(ret);
1353 goto out;
1354 }
1355
1356 INIT_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout, reg);
1357
1358 /*
1359 * A node is considered live after it has beat LIVE_THRESHOLD
1360 * times. We're not steady until we've given them a chance
1361 * _after_ our first read.
1362 */
1363 atomic_set(&reg->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1);
1364
1365 reg->hr_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1366 reg->hr_item.ci_name);
1367 if (IS_ERR(reg->hr_task)) {
1368 ret = PTR_ERR(reg->hr_task);
1369 mlog_errno(ret);
1370 reg->hr_task = NULL;
1371 goto out;
1372 }
1373
1374 ret = wait_event_interruptible(o2hb_steady_queue,
1375 atomic_read(&reg->hr_steady_iterations) == 0);
1376 if (ret) {
1377 kthread_stop(reg->hr_task);
1378 reg->hr_task = NULL;
1379 goto out;
1380 }
1381
1382 ret = count;
1383out:
1384 if (filp)
1385 fput(filp);
1386 if (inode)
1387 iput(inode);
1388 if (ret < 0) {
1389 if (reg->hr_bdev) {
1390 blkdev_put(reg->hr_bdev);
1391 reg->hr_bdev = NULL;
1392 }
1393 }
1394 return ret;
1395}
1396
1397struct o2hb_region_attribute {
1398 struct configfs_attribute attr;
1399 ssize_t (*show)(struct o2hb_region *, char *);
1400 ssize_t (*store)(struct o2hb_region *, const char *, size_t);
1401};
1402
1403static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
1404 .attr = { .ca_owner = THIS_MODULE,
1405 .ca_name = "block_bytes",
1406 .ca_mode = S_IRUGO | S_IWUSR },
1407 .show = o2hb_region_block_bytes_read,
1408 .store = o2hb_region_block_bytes_write,
1409};
1410
1411static struct o2hb_region_attribute o2hb_region_attr_start_block = {
1412 .attr = { .ca_owner = THIS_MODULE,
1413 .ca_name = "start_block",
1414 .ca_mode = S_IRUGO | S_IWUSR },
1415 .show = o2hb_region_start_block_read,
1416 .store = o2hb_region_start_block_write,
1417};
1418
1419static struct o2hb_region_attribute o2hb_region_attr_blocks = {
1420 .attr = { .ca_owner = THIS_MODULE,
1421 .ca_name = "blocks",
1422 .ca_mode = S_IRUGO | S_IWUSR },
1423 .show = o2hb_region_blocks_read,
1424 .store = o2hb_region_blocks_write,
1425};
1426
1427static struct o2hb_region_attribute o2hb_region_attr_dev = {
1428 .attr = { .ca_owner = THIS_MODULE,
1429 .ca_name = "dev",
1430 .ca_mode = S_IRUGO | S_IWUSR },
1431 .show = o2hb_region_dev_read,
1432 .store = o2hb_region_dev_write,
1433};
1434
1435static struct configfs_attribute *o2hb_region_attrs[] = {
1436 &o2hb_region_attr_block_bytes.attr,
1437 &o2hb_region_attr_start_block.attr,
1438 &o2hb_region_attr_blocks.attr,
1439 &o2hb_region_attr_dev.attr,
1440 NULL,
1441};
1442
1443static ssize_t o2hb_region_show(struct config_item *item,
1444 struct configfs_attribute *attr,
1445 char *page)
1446{
1447 struct o2hb_region *reg = to_o2hb_region(item);
1448 struct o2hb_region_attribute *o2hb_region_attr =
1449 container_of(attr, struct o2hb_region_attribute, attr);
1450 ssize_t ret = 0;
1451
1452 if (o2hb_region_attr->show)
1453 ret = o2hb_region_attr->show(reg, page);
1454 return ret;
1455}
1456
1457static ssize_t o2hb_region_store(struct config_item *item,
1458 struct configfs_attribute *attr,
1459 const char *page, size_t count)
1460{
1461 struct o2hb_region *reg = to_o2hb_region(item);
1462 struct o2hb_region_attribute *o2hb_region_attr =
1463 container_of(attr, struct o2hb_region_attribute, attr);
1464 ssize_t ret = -EINVAL;
1465
1466 if (o2hb_region_attr->store)
1467 ret = o2hb_region_attr->store(reg, page, count);
1468 return ret;
1469}
1470
1471static struct configfs_item_operations o2hb_region_item_ops = {
1472 .release = o2hb_region_release,
1473 .show_attribute = o2hb_region_show,
1474 .store_attribute = o2hb_region_store,
1475};
1476
1477static struct config_item_type o2hb_region_type = {
1478 .ct_item_ops = &o2hb_region_item_ops,
1479 .ct_attrs = o2hb_region_attrs,
1480 .ct_owner = THIS_MODULE,
1481};
1482
1483/* heartbeat set */
1484
1485struct o2hb_heartbeat_group {
1486 struct config_group hs_group;
1487 /* some stuff? */
1488};
1489
1490static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1491{
1492 return group ?
1493 container_of(group, struct o2hb_heartbeat_group, hs_group)
1494 : NULL;
1495}
1496
1497static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
1498 const char *name)
1499{
1500 struct o2hb_region *reg = NULL;
1501 struct config_item *ret = NULL;
1502
1503 reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL);
1504 if (reg == NULL)
1505 goto out; /* ENOMEM */
1506
1507 config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
1508
1509 ret = &reg->hr_item;
1510
1511 spin_lock(&o2hb_live_lock);
1512 list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
1513 spin_unlock(&o2hb_live_lock);
1514out:
1515 if (ret == NULL)
1516 kfree(reg);
1517
1518 return ret;
1519}
1520
1521static void o2hb_heartbeat_group_drop_item(struct config_group *group,
1522 struct config_item *item)
1523{
1524 struct o2hb_region *reg = to_o2hb_region(item);
1525
1526 /* stop the thread when the user removes the region dir */
1527 if (reg->hr_task) {
1528 kthread_stop(reg->hr_task);
1529 reg->hr_task = NULL;
1530 }
1531
1532 config_item_put(item);
1533}
1534
1535struct o2hb_heartbeat_group_attribute {
1536 struct configfs_attribute attr;
1537 ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
1538 ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
1539};
1540
1541static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
1542 struct configfs_attribute *attr,
1543 char *page)
1544{
1545 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1546 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1547 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1548 ssize_t ret = 0;
1549
1550 if (o2hb_heartbeat_group_attr->show)
1551 ret = o2hb_heartbeat_group_attr->show(reg, page);
1552 return ret;
1553}
1554
1555static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
1556 struct configfs_attribute *attr,
1557 const char *page, size_t count)
1558{
1559 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1560 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1561 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1562 ssize_t ret = -EINVAL;
1563
1564 if (o2hb_heartbeat_group_attr->store)
1565 ret = o2hb_heartbeat_group_attr->store(reg, page, count);
1566 return ret;
1567}
1568
1569static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
1570 char *page)
1571{
1572 return sprintf(page, "%u\n", o2hb_dead_threshold);
1573}
1574
1575static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
1576 const char *page,
1577 size_t count)
1578{
1579 unsigned long tmp;
1580 char *p = (char *)page;
1581
1582 tmp = simple_strtoul(p, &p, 10);
1583 if (!p || (*p && (*p != '\n')))
1584 return -EINVAL;
1585
1586 /* this will validate ranges for us. */
1587 o2hb_dead_threshold_set((unsigned int) tmp);
1588
1589 return count;
1590}
1591
1592static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
1593 .attr = { .ca_owner = THIS_MODULE,
1594 .ca_name = "dead_threshold",
1595 .ca_mode = S_IRUGO | S_IWUSR },
1596 .show = o2hb_heartbeat_group_threshold_show,
1597 .store = o2hb_heartbeat_group_threshold_store,
1598};
1599
1600static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
1601 &o2hb_heartbeat_group_attr_threshold.attr,
1602 NULL,
1603};
1604
1605static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
1606 .show_attribute = o2hb_heartbeat_group_show,
1607 .store_attribute = o2hb_heartbeat_group_store,
1608};
1609
1610static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
1611 .make_item = o2hb_heartbeat_group_make_item,
1612 .drop_item = o2hb_heartbeat_group_drop_item,
1613};
1614
1615static struct config_item_type o2hb_heartbeat_group_type = {
1616 .ct_group_ops = &o2hb_heartbeat_group_group_ops,
1617 .ct_item_ops = &o2hb_hearbeat_group_item_ops,
1618 .ct_attrs = o2hb_heartbeat_group_attrs,
1619 .ct_owner = THIS_MODULE,
1620};
1621
1622/* this is just here to avoid touching group in heartbeat.h which the
1623 * entire damn world #includes */
1624struct config_group *o2hb_alloc_hb_set(void)
1625{
1626 struct o2hb_heartbeat_group *hs = NULL;
1627 struct config_group *ret = NULL;
1628
1629 hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
1630 if (hs == NULL)
1631 goto out;
1632
1633 config_group_init_type_name(&hs->hs_group, "heartbeat",
1634 &o2hb_heartbeat_group_type);
1635
1636 ret = &hs->hs_group;
1637out:
1638 if (ret == NULL)
1639 kfree(hs);
1640 return ret;
1641}
1642
1643void o2hb_free_hb_set(struct config_group *group)
1644{
1645 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
1646 kfree(hs);
1647}
1648
1649/* hb callback registration and issueing */
1650
1651static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
1652{
1653 if (type == O2HB_NUM_CB)
1654 return ERR_PTR(-EINVAL);
1655
1656 return &o2hb_callbacks[type];
1657}
1658
1659void o2hb_setup_callback(struct o2hb_callback_func *hc,
1660 enum o2hb_callback_type type,
1661 o2hb_cb_func *func,
1662 void *data,
1663 int priority)
1664{
1665 INIT_LIST_HEAD(&hc->hc_item);
1666 hc->hc_func = func;
1667 hc->hc_data = data;
1668 hc->hc_priority = priority;
1669 hc->hc_type = type;
1670 hc->hc_magic = O2HB_CB_MAGIC;
1671}
1672EXPORT_SYMBOL_GPL(o2hb_setup_callback);
1673
1674int o2hb_register_callback(struct o2hb_callback_func *hc)
1675{
1676 struct o2hb_callback_func *tmp;
1677 struct list_head *iter;
1678 struct o2hb_callback *hbcall;
1679 int ret;
1680
1681 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1682 BUG_ON(!list_empty(&hc->hc_item));
1683
1684 hbcall = hbcall_from_type(hc->hc_type);
1685 if (IS_ERR(hbcall)) {
1686 ret = PTR_ERR(hbcall);
1687 goto out;
1688 }
1689
1690 down_write(&o2hb_callback_sem);
1691
1692 list_for_each(iter, &hbcall->list) {
1693 tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
1694 if (hc->hc_priority < tmp->hc_priority) {
1695 list_add_tail(&hc->hc_item, iter);
1696 break;
1697 }
1698 }
1699 if (list_empty(&hc->hc_item))
1700 list_add_tail(&hc->hc_item, &hbcall->list);
1701
1702 up_write(&o2hb_callback_sem);
1703 ret = 0;
1704out:
1705 mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n",
1706 ret, __builtin_return_address(0), hc);
1707 return ret;
1708}
1709EXPORT_SYMBOL_GPL(o2hb_register_callback);
1710
1711int o2hb_unregister_callback(struct o2hb_callback_func *hc)
1712{
1713 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1714
1715 mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n",
1716 __builtin_return_address(0), hc);
1717
1718 if (list_empty(&hc->hc_item))
1719 return 0;
1720
1721 down_write(&o2hb_callback_sem);
1722
1723 list_del_init(&hc->hc_item);
1724
1725 up_write(&o2hb_callback_sem);
1726
1727 return 0;
1728}
1729EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
1730
1731int o2hb_check_node_heartbeating(u8 node_num)
1732{
1733 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1734
1735 o2hb_fill_node_map(testing_map, sizeof(testing_map));
1736 if (!test_bit(node_num, testing_map)) {
1737 mlog(ML_HEARTBEAT,
1738 "node (%u) does not have heartbeating enabled.\n",
1739 node_num);
1740 return 0;
1741 }
1742
1743 return 1;
1744}
1745EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
1746
1747int o2hb_check_node_heartbeating_from_callback(u8 node_num)
1748{
1749 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1750
1751 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
1752 if (!test_bit(node_num, testing_map)) {
1753 mlog(ML_HEARTBEAT,
1754 "node (%u) does not have heartbeating enabled.\n",
1755 node_num);
1756 return 0;
1757 }
1758
1759 return 1;
1760}
1761EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
1762
1763/* Makes sure our local node is configured with a node number, and is
1764 * heartbeating. */
1765int o2hb_check_local_node_heartbeating(void)
1766{
1767 u8 node_num;
1768
1769 /* if this node was set then we have networking */
1770 node_num = o2nm_this_node();
1771 if (node_num == O2NM_MAX_NODES) {
1772 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
1773 return 0;
1774 }
1775
1776 return o2hb_check_node_heartbeating(node_num);
1777}
1778EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
1779
1780/*
1781 * this is just a hack until we get the plumbing which flips file systems
1782 * read only and drops the hb ref instead of killing the node dead.
1783 */
1784void o2hb_stop_all_regions(void)
1785{
1786 struct o2hb_region *reg;
1787
1788 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
1789
1790 spin_lock(&o2hb_live_lock);
1791
1792 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
1793 reg->hr_unclean_stop = 1;
1794
1795 spin_unlock(&o2hb_live_lock);
1796}
1797EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
diff --git a/fs/ocfs2/cluster/heartbeat.h b/fs/ocfs2/cluster/heartbeat.h
new file mode 100644
index 000000000000..cac6223206a9
--- /dev/null
+++ b/fs/ocfs2/cluster/heartbeat.h
@@ -0,0 +1,82 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * heartbeat.h
5 *
6 * Function prototypes
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27#ifndef O2CLUSTER_HEARTBEAT_H
28#define O2CLUSTER_HEARTBEAT_H
29
30#include "ocfs2_heartbeat.h"
31
32#define O2HB_REGION_TIMEOUT_MS 2000
33
34/* number of changes to be seen as live */
35#define O2HB_LIVE_THRESHOLD 2
36/* number of equal samples to be seen as dead */
37extern unsigned int o2hb_dead_threshold;
38#define O2HB_DEFAULT_DEAD_THRESHOLD 7
39/* Otherwise MAX_WRITE_TIMEOUT will be zero... */
40#define O2HB_MIN_DEAD_THRESHOLD 2
41#define O2HB_MAX_WRITE_TIMEOUT_MS (O2HB_REGION_TIMEOUT_MS * (o2hb_dead_threshold - 1))
42
43#define O2HB_CB_MAGIC 0x51d1e4ec
44
45/* callback stuff */
46enum o2hb_callback_type {
47 O2HB_NODE_DOWN_CB = 0,
48 O2HB_NODE_UP_CB,
49 O2HB_NUM_CB
50};
51
52struct o2nm_node;
53typedef void (o2hb_cb_func)(struct o2nm_node *, int, void *);
54
55struct o2hb_callback_func {
56 u32 hc_magic;
57 struct list_head hc_item;
58 o2hb_cb_func *hc_func;
59 void *hc_data;
60 int hc_priority;
61 enum o2hb_callback_type hc_type;
62};
63
64struct config_group *o2hb_alloc_hb_set(void);
65void o2hb_free_hb_set(struct config_group *group);
66
67void o2hb_setup_callback(struct o2hb_callback_func *hc,
68 enum o2hb_callback_type type,
69 o2hb_cb_func *func,
70 void *data,
71 int priority);
72int o2hb_register_callback(struct o2hb_callback_func *hc);
73int o2hb_unregister_callback(struct o2hb_callback_func *hc);
74void o2hb_fill_node_map(unsigned long *map,
75 unsigned bytes);
76void o2hb_init(void);
77int o2hb_check_node_heartbeating(u8 node_num);
78int o2hb_check_node_heartbeating_from_callback(u8 node_num);
79int o2hb_check_local_node_heartbeating(void);
80void o2hb_stop_all_regions(void);
81
82#endif /* O2CLUSTER_HEARTBEAT_H */
diff --git a/fs/ocfs2/cluster/masklog.c b/fs/ocfs2/cluster/masklog.c
new file mode 100644
index 000000000000..fd741cea5705
--- /dev/null
+++ b/fs/ocfs2/cluster/masklog.c
@@ -0,0 +1,166 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * Copyright (C) 2004, 2005 Oracle. All rights reserved.
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA.
20 */
21
22#include <linux/module.h>
23#include <linux/kernel.h>
24#include <linux/proc_fs.h>
25#include <linux/seq_file.h>
26#include <linux/string.h>
27#include <asm/uaccess.h>
28
29#include "masklog.h"
30
31struct mlog_bits mlog_and_bits = MLOG_BITS_RHS(MLOG_INITIAL_AND_MASK);
32EXPORT_SYMBOL_GPL(mlog_and_bits);
33struct mlog_bits mlog_not_bits = MLOG_BITS_RHS(MLOG_INITIAL_NOT_MASK);
34EXPORT_SYMBOL_GPL(mlog_not_bits);
35
36static ssize_t mlog_mask_show(u64 mask, char *buf)
37{
38 char *state;
39
40 if (__mlog_test_u64(mask, mlog_and_bits))
41 state = "allow";
42 else if (__mlog_test_u64(mask, mlog_not_bits))
43 state = "deny";
44 else
45 state = "off";
46
47 return snprintf(buf, PAGE_SIZE, "%s\n", state);
48}
49
50static ssize_t mlog_mask_store(u64 mask, const char *buf, size_t count)
51{
52 if (!strnicmp(buf, "allow", 5)) {
53 __mlog_set_u64(mask, mlog_and_bits);
54 __mlog_clear_u64(mask, mlog_not_bits);
55 } else if (!strnicmp(buf, "deny", 4)) {
56 __mlog_set_u64(mask, mlog_not_bits);
57 __mlog_clear_u64(mask, mlog_and_bits);
58 } else if (!strnicmp(buf, "off", 3)) {
59 __mlog_clear_u64(mask, mlog_not_bits);
60 __mlog_clear_u64(mask, mlog_and_bits);
61 } else
62 return -EINVAL;
63
64 return count;
65}
66
67struct mlog_attribute {
68 struct attribute attr;
69 u64 mask;
70};
71
72#define to_mlog_attr(_attr) container_of(_attr, struct mlog_attribute, attr)
73
74#define define_mask(_name) { \
75 .attr = { \
76 .name = #_name, \
77 .mode = S_IRUGO | S_IWUSR, \
78 }, \
79 .mask = ML_##_name, \
80}
81
82static struct mlog_attribute mlog_attrs[MLOG_MAX_BITS] = {
83 define_mask(ENTRY),
84 define_mask(EXIT),
85 define_mask(TCP),
86 define_mask(MSG),
87 define_mask(SOCKET),
88 define_mask(HEARTBEAT),
89 define_mask(HB_BIO),
90 define_mask(DLMFS),
91 define_mask(DLM),
92 define_mask(DLM_DOMAIN),
93 define_mask(DLM_THREAD),
94 define_mask(DLM_MASTER),
95 define_mask(DLM_RECOVERY),
96 define_mask(AIO),
97 define_mask(JOURNAL),
98 define_mask(DISK_ALLOC),
99 define_mask(SUPER),
100 define_mask(FILE_IO),
101 define_mask(EXTENT_MAP),
102 define_mask(DLM_GLUE),
103 define_mask(BH_IO),
104 define_mask(UPTODATE),
105 define_mask(NAMEI),
106 define_mask(INODE),
107 define_mask(VOTE),
108 define_mask(DCACHE),
109 define_mask(CONN),
110 define_mask(QUORUM),
111 define_mask(EXPORT),
112 define_mask(ERROR),
113 define_mask(NOTICE),
114 define_mask(KTHREAD),
115};
116
117static struct attribute *mlog_attr_ptrs[MLOG_MAX_BITS] = {NULL, };
118
119static ssize_t mlog_show(struct kobject *obj, struct attribute *attr,
120 char *buf)
121{
122 struct mlog_attribute *mlog_attr = to_mlog_attr(attr);
123
124 return mlog_mask_show(mlog_attr->mask, buf);
125}
126
127static ssize_t mlog_store(struct kobject *obj, struct attribute *attr,
128 const char *buf, size_t count)
129{
130 struct mlog_attribute *mlog_attr = to_mlog_attr(attr);
131
132 return mlog_mask_store(mlog_attr->mask, buf, count);
133}
134
135static struct sysfs_ops mlog_attr_ops = {
136 .show = mlog_show,
137 .store = mlog_store,
138};
139
140static struct kobj_type mlog_ktype = {
141 .default_attrs = mlog_attr_ptrs,
142 .sysfs_ops = &mlog_attr_ops,
143};
144
145static struct kset mlog_kset = {
146 .kobj = {.name = "logmask", .ktype = &mlog_ktype},
147};
148
149int mlog_sys_init(struct subsystem *o2cb_subsys)
150{
151 int i = 0;
152
153 while (mlog_attrs[i].attr.mode) {
154 mlog_attr_ptrs[i] = &mlog_attrs[i].attr;
155 i++;
156 }
157 mlog_attr_ptrs[i] = NULL;
158
159 mlog_kset.subsys = o2cb_subsys;
160 return kset_register(&mlog_kset);
161}
162
163void mlog_sys_shutdown(void)
164{
165 kset_unregister(&mlog_kset);
166}
diff --git a/fs/ocfs2/cluster/masklog.h b/fs/ocfs2/cluster/masklog.h
new file mode 100644
index 000000000000..f5ef5ea61a05
--- /dev/null
+++ b/fs/ocfs2/cluster/masklog.h
@@ -0,0 +1,275 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * Copyright (C) 2005 Oracle. All rights reserved.
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA.
20 */
21
22#ifndef O2CLUSTER_MASKLOG_H
23#define O2CLUSTER_MASKLOG_H
24
25/*
26 * For now this is a trivial wrapper around printk() that gives the critical
27 * ability to enable sets of debugging output at run-time. In the future this
28 * will almost certainly be redirected to relayfs so that it can pay a
29 * substantially lower heisenberg tax.
30 *
31 * Callers associate the message with a bitmask and a global bitmask is
32 * maintained with help from /proc. If any of the bits match the message is
33 * output.
34 *
35 * We must have efficient bit tests on i386 and it seems gcc still emits crazy
36 * code for the 64bit compare. It emits very good code for the dual unsigned
37 * long tests, though, completely avoiding tests that can never pass if the
38 * caller gives a constant bitmask that fills one of the longs with all 0s. So
39 * the desire is to have almost all of the calls decided on by comparing just
40 * one of the longs. This leads to having infrequently given bits that are
41 * frequently matched in the high bits.
42 *
43 * _ERROR and _NOTICE are used for messages that always go to the console and
44 * have appropriate KERN_ prefixes. We wrap these in our function instead of
45 * just calling printk() so that this can eventually make its way through
46 * relayfs along with the debugging messages. Everything else gets KERN_DEBUG.
47 * The inline tests and macro dance give GCC the opportunity to quite cleverly
48 * only emit the appropriage printk() when the caller passes in a constant
49 * mask, as is almost always the case.
50 *
51 * All this bitmask nonsense is hidden from the /proc interface so that Joel
52 * doesn't have an aneurism. Reading the file gives a straight forward
53 * indication of which bits are on or off:
54 * ENTRY off
55 * EXIT off
56 * TCP off
57 * MSG off
58 * SOCKET off
59 * ERROR off
60 * NOTICE on
61 *
62 * Writing changes the state of a given bit and requires a strictly formatted
63 * single write() call:
64 *
65 * write(fd, "ENTRY on", 8);
66 *
67 * would turn the entry bit on. "1" is also accepted in the place of "on", and
68 * "off" and "0" behave as expected.
69 *
70 * Some trivial shell can flip all the bits on or off:
71 *
72 * log_mask="/proc/fs/ocfs2_nodemanager/log_mask"
73 * cat $log_mask | (
74 * while read bit status; do
75 * # $1 is "on" or "off", say
76 * echo "$bit $1" > $log_mask
77 * done
78 * )
79 */
80
81/* for task_struct */
82#include <linux/sched.h>
83
84/* bits that are frequently given and infrequently matched in the low word */
85/* NOTE: If you add a flag, you need to also update mlog.c! */
86#define ML_ENTRY 0x0000000000000001ULL /* func call entry */
87#define ML_EXIT 0x0000000000000002ULL /* func call exit */
88#define ML_TCP 0x0000000000000004ULL /* net cluster/tcp.c */
89#define ML_MSG 0x0000000000000008ULL /* net network messages */
90#define ML_SOCKET 0x0000000000000010ULL /* net socket lifetime */
91#define ML_HEARTBEAT 0x0000000000000020ULL /* hb all heartbeat tracking */
92#define ML_HB_BIO 0x0000000000000040ULL /* hb io tracing */
93#define ML_DLMFS 0x0000000000000080ULL /* dlm user dlmfs */
94#define ML_DLM 0x0000000000000100ULL /* dlm general debugging */
95#define ML_DLM_DOMAIN 0x0000000000000200ULL /* dlm domain debugging */
96#define ML_DLM_THREAD 0x0000000000000400ULL /* dlm domain thread */
97#define ML_DLM_MASTER 0x0000000000000800ULL /* dlm master functions */
98#define ML_DLM_RECOVERY 0x0000000000001000ULL /* dlm master functions */
99#define ML_AIO 0x0000000000002000ULL /* ocfs2 aio read and write */
100#define ML_JOURNAL 0x0000000000004000ULL /* ocfs2 journalling functions */
101#define ML_DISK_ALLOC 0x0000000000008000ULL /* ocfs2 disk allocation */
102#define ML_SUPER 0x0000000000010000ULL /* ocfs2 mount / umount */
103#define ML_FILE_IO 0x0000000000020000ULL /* ocfs2 file I/O */
104#define ML_EXTENT_MAP 0x0000000000040000ULL /* ocfs2 extent map caching */
105#define ML_DLM_GLUE 0x0000000000080000ULL /* ocfs2 dlm glue layer */
106#define ML_BH_IO 0x0000000000100000ULL /* ocfs2 buffer I/O */
107#define ML_UPTODATE 0x0000000000200000ULL /* ocfs2 caching sequence #'s */
108#define ML_NAMEI 0x0000000000400000ULL /* ocfs2 directory / namespace */
109#define ML_INODE 0x0000000000800000ULL /* ocfs2 inode manipulation */
110#define ML_VOTE 0x0000000001000000ULL /* ocfs2 node messaging */
111#define ML_DCACHE 0x0000000002000000ULL /* ocfs2 dcache operations */
112#define ML_CONN 0x0000000004000000ULL /* net connection management */
113#define ML_QUORUM 0x0000000008000000ULL /* net connection quorum */
114#define ML_EXPORT 0x0000000010000000ULL /* ocfs2 export operations */
115/* bits that are infrequently given and frequently matched in the high word */
116#define ML_ERROR 0x0000000100000000ULL /* sent to KERN_ERR */
117#define ML_NOTICE 0x0000000200000000ULL /* setn to KERN_NOTICE */
118#define ML_KTHREAD 0x0000000400000000ULL /* kernel thread activity */
119
120#define MLOG_INITIAL_AND_MASK (ML_ERROR|ML_NOTICE)
121#define MLOG_INITIAL_NOT_MASK (ML_ENTRY|ML_EXIT)
122#ifndef MLOG_MASK_PREFIX
123#define MLOG_MASK_PREFIX 0
124#endif
125
126#define MLOG_MAX_BITS 64
127
128struct mlog_bits {
129 unsigned long words[MLOG_MAX_BITS / BITS_PER_LONG];
130};
131
132extern struct mlog_bits mlog_and_bits, mlog_not_bits;
133
134#if BITS_PER_LONG == 32
135
136#define __mlog_test_u64(mask, bits) \
137 ( (u32)(mask & 0xffffffff) & bits.words[0] || \
138 ((u64)(mask) >> 32) & bits.words[1] )
139#define __mlog_set_u64(mask, bits) do { \
140 bits.words[0] |= (u32)(mask & 0xffffffff); \
141 bits.words[1] |= (u64)(mask) >> 32; \
142} while (0)
143#define __mlog_clear_u64(mask, bits) do { \
144 bits.words[0] &= ~((u32)(mask & 0xffffffff)); \
145 bits.words[1] &= ~((u64)(mask) >> 32); \
146} while (0)
147#define MLOG_BITS_RHS(mask) { \
148 { \
149 [0] = (u32)(mask & 0xffffffff), \
150 [1] = (u64)(mask) >> 32, \
151 } \
152}
153
154#else /* 32bit long above, 64bit long below */
155
156#define __mlog_test_u64(mask, bits) ((mask) & bits.words[0])
157#define __mlog_set_u64(mask, bits) do { \
158 bits.words[0] |= (mask); \
159} while (0)
160#define __mlog_clear_u64(mask, bits) do { \
161 bits.words[0] &= ~(mask); \
162} while (0)
163#define MLOG_BITS_RHS(mask) { { (mask) } }
164
165#endif
166
167/*
168 * smp_processor_id() "helpfully" screams when called outside preemptible
169 * regions in current kernels. sles doesn't have the variants that don't
170 * scream. just do this instead of trying to guess which we're building
171 * against.. *sigh*.
172 */
173#define __mlog_cpu_guess ({ \
174 unsigned long _cpu = get_cpu(); \
175 put_cpu(); \
176 _cpu; \
177})
178
179/* In the following two macros, the whitespace after the ',' just
180 * before ##args is intentional. Otherwise, gcc 2.95 will eat the
181 * previous token if args expands to nothing.
182 */
183#define __mlog_printk(level, fmt, args...) \
184 printk(level "(%u,%lu):%s:%d " fmt, current->pid, \
185 __mlog_cpu_guess, __PRETTY_FUNCTION__, __LINE__ , \
186 ##args)
187
188#define mlog(mask, fmt, args...) do { \
189 u64 __m = MLOG_MASK_PREFIX | (mask); \
190 if (__mlog_test_u64(__m, mlog_and_bits) && \
191 !__mlog_test_u64(__m, mlog_not_bits)) { \
192 if (__m & ML_ERROR) \
193 __mlog_printk(KERN_ERR, "ERROR: "fmt , ##args); \
194 else if (__m & ML_NOTICE) \
195 __mlog_printk(KERN_NOTICE, fmt , ##args); \
196 else __mlog_printk(KERN_INFO, fmt , ##args); \
197 } \
198} while (0)
199
200#define mlog_errno(st) do { \
201 int _st = (st); \
202 if (_st != -ERESTARTSYS && _st != -EINTR && \
203 _st != AOP_TRUNCATED_PAGE) \
204 mlog(ML_ERROR, "status = %lld\n", (long long)_st); \
205} while (0)
206
207#define mlog_entry(fmt, args...) do { \
208 mlog(ML_ENTRY, "ENTRY:" fmt , ##args); \
209} while (0)
210
211#define mlog_entry_void() do { \
212 mlog(ML_ENTRY, "ENTRY:\n"); \
213} while (0)
214
215/* We disable this for old compilers since they don't have support for
216 * __builtin_types_compatible_p.
217 */
218#if (__GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1)) && \
219 !defined(__CHECKER__)
220#define mlog_exit(st) do { \
221 if (__builtin_types_compatible_p(typeof(st), unsigned long)) \
222 mlog(ML_EXIT, "EXIT: %lu\n", (unsigned long) (st)); \
223 else if (__builtin_types_compatible_p(typeof(st), signed long)) \
224 mlog(ML_EXIT, "EXIT: %ld\n", (signed long) (st)); \
225 else if (__builtin_types_compatible_p(typeof(st), unsigned int) \
226 || __builtin_types_compatible_p(typeof(st), unsigned short) \
227 || __builtin_types_compatible_p(typeof(st), unsigned char)) \
228 mlog(ML_EXIT, "EXIT: %u\n", (unsigned int) (st)); \
229 else if (__builtin_types_compatible_p(typeof(st), signed int) \
230 || __builtin_types_compatible_p(typeof(st), signed short) \
231 || __builtin_types_compatible_p(typeof(st), signed char)) \
232 mlog(ML_EXIT, "EXIT: %d\n", (signed int) (st)); \
233 else if (__builtin_types_compatible_p(typeof(st), long long)) \
234 mlog(ML_EXIT, "EXIT: %lld\n", (long long) (st)); \
235 else \
236 mlog(ML_EXIT, "EXIT: %llu\n", (unsigned long long) (st)); \
237} while (0)
238#else
239#define mlog_exit(st) do { \
240 mlog(ML_EXIT, "EXIT: %lld\n", (long long) (st)); \
241} while (0)
242#endif
243
244#define mlog_exit_ptr(ptr) do { \
245 mlog(ML_EXIT, "EXIT: %p\n", ptr); \
246} while (0)
247
248#define mlog_exit_void() do { \
249 mlog(ML_EXIT, "EXIT\n"); \
250} while (0)
251
252#define mlog_bug_on_msg(cond, fmt, args...) do { \
253 if (cond) { \
254 mlog(ML_ERROR, "bug expression: " #cond "\n"); \
255 mlog(ML_ERROR, fmt, ##args); \
256 BUG(); \
257 } \
258} while (0)
259
260#if (BITS_PER_LONG == 32) || defined(CONFIG_X86_64)
261#define MLFi64 "lld"
262#define MLFu64 "llu"
263#define MLFx64 "llx"
264#else
265#define MLFi64 "ld"
266#define MLFu64 "lu"
267#define MLFx64 "lx"
268#endif
269
270#include <linux/kobject.h>
271#include <linux/sysfs.h>
272int mlog_sys_init(struct subsystem *o2cb_subsys);
273void mlog_sys_shutdown(void);
274
275#endif /* O2CLUSTER_MASKLOG_H */
diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c
new file mode 100644
index 000000000000..5fd60c105913
--- /dev/null
+++ b/fs/ocfs2/cluster/nodemanager.c
@@ -0,0 +1,791 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * Copyright (C) 2004, 2005 Oracle. All rights reserved.
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA.
20 */
21
22#include <linux/kernel.h>
23#include <linux/module.h>
24#include <linux/sysctl.h>
25#include <linux/configfs.h>
26
27#include "endian.h"
28#include "tcp.h"
29#include "nodemanager.h"
30#include "heartbeat.h"
31#include "masklog.h"
32#include "sys.h"
33#include "ver.h"
34
35/* for now we operate under the assertion that there can be only one
36 * cluster active at a time. Changing this will require trickling
37 * cluster references throughout where nodes are looked up */
38static struct o2nm_cluster *o2nm_single_cluster = NULL;
39
40#define OCFS2_MAX_HB_CTL_PATH 256
41static char ocfs2_hb_ctl_path[OCFS2_MAX_HB_CTL_PATH] = "/sbin/ocfs2_hb_ctl";
42
43static ctl_table ocfs2_nm_table[] = {
44 {
45 .ctl_name = 1,
46 .procname = "hb_ctl_path",
47 .data = ocfs2_hb_ctl_path,
48 .maxlen = OCFS2_MAX_HB_CTL_PATH,
49 .mode = 0644,
50 .proc_handler = &proc_dostring,
51 .strategy = &sysctl_string,
52 },
53 { .ctl_name = 0 }
54};
55
56static ctl_table ocfs2_mod_table[] = {
57 {
58 .ctl_name = KERN_OCFS2_NM,
59 .procname = "nm",
60 .data = NULL,
61 .maxlen = 0,
62 .mode = 0555,
63 .child = ocfs2_nm_table
64 },
65 { .ctl_name = 0}
66};
67
68static ctl_table ocfs2_kern_table[] = {
69 {
70 .ctl_name = KERN_OCFS2,
71 .procname = "ocfs2",
72 .data = NULL,
73 .maxlen = 0,
74 .mode = 0555,
75 .child = ocfs2_mod_table
76 },
77 { .ctl_name = 0}
78};
79
80static ctl_table ocfs2_root_table[] = {
81 {
82 .ctl_name = CTL_FS,
83 .procname = "fs",
84 .data = NULL,
85 .maxlen = 0,
86 .mode = 0555,
87 .child = ocfs2_kern_table
88 },
89 { .ctl_name = 0 }
90};
91
92static struct ctl_table_header *ocfs2_table_header = NULL;
93
94const char *o2nm_get_hb_ctl_path(void)
95{
96 return ocfs2_hb_ctl_path;
97}
98EXPORT_SYMBOL_GPL(o2nm_get_hb_ctl_path);
99
100struct o2nm_cluster {
101 struct config_group cl_group;
102 unsigned cl_has_local:1;
103 u8 cl_local_node;
104 rwlock_t cl_nodes_lock;
105 struct o2nm_node *cl_nodes[O2NM_MAX_NODES];
106 struct rb_root cl_node_ip_tree;
107 /* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */
108 unsigned long cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
109};
110
111struct o2nm_node *o2nm_get_node_by_num(u8 node_num)
112{
113 struct o2nm_node *node = NULL;
114
115 if (node_num >= O2NM_MAX_NODES || o2nm_single_cluster == NULL)
116 goto out;
117
118 read_lock(&o2nm_single_cluster->cl_nodes_lock);
119 node = o2nm_single_cluster->cl_nodes[node_num];
120 if (node)
121 config_item_get(&node->nd_item);
122 read_unlock(&o2nm_single_cluster->cl_nodes_lock);
123out:
124 return node;
125}
126EXPORT_SYMBOL_GPL(o2nm_get_node_by_num);
127
128int o2nm_configured_node_map(unsigned long *map, unsigned bytes)
129{
130 struct o2nm_cluster *cluster = o2nm_single_cluster;
131
132 BUG_ON(bytes < (sizeof(cluster->cl_nodes_bitmap)));
133
134 if (cluster == NULL)
135 return -EINVAL;
136
137 read_lock(&cluster->cl_nodes_lock);
138 memcpy(map, cluster->cl_nodes_bitmap, sizeof(cluster->cl_nodes_bitmap));
139 read_unlock(&cluster->cl_nodes_lock);
140
141 return 0;
142}
143EXPORT_SYMBOL_GPL(o2nm_configured_node_map);
144
145static struct o2nm_node *o2nm_node_ip_tree_lookup(struct o2nm_cluster *cluster,
146 __be32 ip_needle,
147 struct rb_node ***ret_p,
148 struct rb_node **ret_parent)
149{
150 struct rb_node **p = &cluster->cl_node_ip_tree.rb_node;
151 struct rb_node *parent = NULL;
152 struct o2nm_node *node, *ret = NULL;
153
154 while (*p) {
155 parent = *p;
156 node = rb_entry(parent, struct o2nm_node, nd_ip_node);
157
158 if (memcmp(&ip_needle, &node->nd_ipv4_address,
159 sizeof(ip_needle)) < 0)
160 p = &(*p)->rb_left;
161 else if (memcmp(&ip_needle, &node->nd_ipv4_address,
162 sizeof(ip_needle)) > 0)
163 p = &(*p)->rb_right;
164 else {
165 ret = node;
166 break;
167 }
168 }
169
170 if (ret_p != NULL)
171 *ret_p = p;
172 if (ret_parent != NULL)
173 *ret_parent = parent;
174
175 return ret;
176}
177
178struct o2nm_node *o2nm_get_node_by_ip(__be32 addr)
179{
180 struct o2nm_node *node = NULL;
181 struct o2nm_cluster *cluster = o2nm_single_cluster;
182
183 if (cluster == NULL)
184 goto out;
185
186 read_lock(&cluster->cl_nodes_lock);
187 node = o2nm_node_ip_tree_lookup(cluster, addr, NULL, NULL);
188 if (node)
189 config_item_get(&node->nd_item);
190 read_unlock(&cluster->cl_nodes_lock);
191
192out:
193 return node;
194}
195EXPORT_SYMBOL_GPL(o2nm_get_node_by_ip);
196
197void o2nm_node_put(struct o2nm_node *node)
198{
199 config_item_put(&node->nd_item);
200}
201EXPORT_SYMBOL_GPL(o2nm_node_put);
202
203void o2nm_node_get(struct o2nm_node *node)
204{
205 config_item_get(&node->nd_item);
206}
207EXPORT_SYMBOL_GPL(o2nm_node_get);
208
209u8 o2nm_this_node(void)
210{
211 u8 node_num = O2NM_MAX_NODES;
212
213 if (o2nm_single_cluster && o2nm_single_cluster->cl_has_local)
214 node_num = o2nm_single_cluster->cl_local_node;
215
216 return node_num;
217}
218EXPORT_SYMBOL_GPL(o2nm_this_node);
219
220/* node configfs bits */
221
222static struct o2nm_cluster *to_o2nm_cluster(struct config_item *item)
223{
224 return item ?
225 container_of(to_config_group(item), struct o2nm_cluster,
226 cl_group)
227 : NULL;
228}
229
230static struct o2nm_node *to_o2nm_node(struct config_item *item)
231{
232 return item ? container_of(item, struct o2nm_node, nd_item) : NULL;
233}
234
235static void o2nm_node_release(struct config_item *item)
236{
237 struct o2nm_node *node = to_o2nm_node(item);
238 kfree(node);
239}
240
241static ssize_t o2nm_node_num_read(struct o2nm_node *node, char *page)
242{
243 return sprintf(page, "%d\n", node->nd_num);
244}
245
246static struct o2nm_cluster *to_o2nm_cluster_from_node(struct o2nm_node *node)
247{
248 /* through the first node_set .parent
249 * mycluster/nodes/mynode == o2nm_cluster->o2nm_node_group->o2nm_node */
250 return to_o2nm_cluster(node->nd_item.ci_parent->ci_parent);
251}
252
253enum {
254 O2NM_NODE_ATTR_NUM = 0,
255 O2NM_NODE_ATTR_PORT,
256 O2NM_NODE_ATTR_ADDRESS,
257 O2NM_NODE_ATTR_LOCAL,
258};
259
260static ssize_t o2nm_node_num_write(struct o2nm_node *node, const char *page,
261 size_t count)
262{
263 struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
264 unsigned long tmp;
265 char *p = (char *)page;
266
267 tmp = simple_strtoul(p, &p, 0);
268 if (!p || (*p && (*p != '\n')))
269 return -EINVAL;
270
271 if (tmp >= O2NM_MAX_NODES)
272 return -ERANGE;
273
274 /* once we're in the cl_nodes tree networking can look us up by
275 * node number and try to use our address and port attributes
276 * to connect to this node.. make sure that they've been set
277 * before writing the node attribute? */
278 if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
279 !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
280 return -EINVAL; /* XXX */
281
282 write_lock(&cluster->cl_nodes_lock);
283 if (cluster->cl_nodes[tmp])
284 p = NULL;
285 else {
286 cluster->cl_nodes[tmp] = node;
287 node->nd_num = tmp;
288 set_bit(tmp, cluster->cl_nodes_bitmap);
289 }
290 write_unlock(&cluster->cl_nodes_lock);
291 if (p == NULL)
292 return -EEXIST;
293
294 return count;
295}
296static ssize_t o2nm_node_ipv4_port_read(struct o2nm_node *node, char *page)
297{
298 return sprintf(page, "%u\n", ntohs(node->nd_ipv4_port));
299}
300
301static ssize_t o2nm_node_ipv4_port_write(struct o2nm_node *node,
302 const char *page, size_t count)
303{
304 unsigned long tmp;
305 char *p = (char *)page;
306
307 tmp = simple_strtoul(p, &p, 0);
308 if (!p || (*p && (*p != '\n')))
309 return -EINVAL;
310
311 if (tmp == 0)
312 return -EINVAL;
313 if (tmp >= (u16)-1)
314 return -ERANGE;
315
316 node->nd_ipv4_port = htons(tmp);
317
318 return count;
319}
320
321static ssize_t o2nm_node_ipv4_address_read(struct o2nm_node *node, char *page)
322{
323 return sprintf(page, "%u.%u.%u.%u\n", NIPQUAD(node->nd_ipv4_address));
324}
325
326static ssize_t o2nm_node_ipv4_address_write(struct o2nm_node *node,
327 const char *page,
328 size_t count)
329{
330 struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
331 int ret, i;
332 struct rb_node **p, *parent;
333 unsigned int octets[4];
334 __be32 ipv4_addr = 0;
335
336 ret = sscanf(page, "%3u.%3u.%3u.%3u", &octets[3], &octets[2],
337 &octets[1], &octets[0]);
338 if (ret != 4)
339 return -EINVAL;
340
341 for (i = 0; i < ARRAY_SIZE(octets); i++) {
342 if (octets[i] > 255)
343 return -ERANGE;
344 be32_add_cpu(&ipv4_addr, octets[i] << (i * 8));
345 }
346
347 ret = 0;
348 write_lock(&cluster->cl_nodes_lock);
349 if (o2nm_node_ip_tree_lookup(cluster, ipv4_addr, &p, &parent))
350 ret = -EEXIST;
351 else {
352 rb_link_node(&node->nd_ip_node, parent, p);
353 rb_insert_color(&node->nd_ip_node, &cluster->cl_node_ip_tree);
354 }
355 write_unlock(&cluster->cl_nodes_lock);
356 if (ret)
357 return ret;
358
359 memcpy(&node->nd_ipv4_address, &ipv4_addr, sizeof(ipv4_addr));
360
361 return count;
362}
363
364static ssize_t o2nm_node_local_read(struct o2nm_node *node, char *page)
365{
366 return sprintf(page, "%d\n", node->nd_local);
367}
368
369static ssize_t o2nm_node_local_write(struct o2nm_node *node, const char *page,
370 size_t count)
371{
372 struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
373 unsigned long tmp;
374 char *p = (char *)page;
375 ssize_t ret;
376
377 tmp = simple_strtoul(p, &p, 0);
378 if (!p || (*p && (*p != '\n')))
379 return -EINVAL;
380
381 tmp = !!tmp; /* boolean of whether this node wants to be local */
382
383 /* setting local turns on networking rx for now so we require having
384 * set everything else first */
385 if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
386 !test_bit(O2NM_NODE_ATTR_NUM, &node->nd_set_attributes) ||
387 !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
388 return -EINVAL; /* XXX */
389
390 /* the only failure case is trying to set a new local node
391 * when a different one is already set */
392 if (tmp && tmp == cluster->cl_has_local &&
393 cluster->cl_local_node != node->nd_num)
394 return -EBUSY;
395
396 /* bring up the rx thread if we're setting the new local node. */
397 if (tmp && !cluster->cl_has_local) {
398 ret = o2net_start_listening(node);
399 if (ret)
400 return ret;
401 }
402
403 if (!tmp && cluster->cl_has_local &&
404 cluster->cl_local_node == node->nd_num) {
405 o2net_stop_listening(node);
406 cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
407 }
408
409 node->nd_local = tmp;
410 if (node->nd_local) {
411 cluster->cl_has_local = tmp;
412 cluster->cl_local_node = node->nd_num;
413 }
414
415 return count;
416}
417
418struct o2nm_node_attribute {
419 struct configfs_attribute attr;
420 ssize_t (*show)(struct o2nm_node *, char *);
421 ssize_t (*store)(struct o2nm_node *, const char *, size_t);
422};
423
424static struct o2nm_node_attribute o2nm_node_attr_num = {
425 .attr = { .ca_owner = THIS_MODULE,
426 .ca_name = "num",
427 .ca_mode = S_IRUGO | S_IWUSR },
428 .show = o2nm_node_num_read,
429 .store = o2nm_node_num_write,
430};
431
432static struct o2nm_node_attribute o2nm_node_attr_ipv4_port = {
433 .attr = { .ca_owner = THIS_MODULE,
434 .ca_name = "ipv4_port",
435 .ca_mode = S_IRUGO | S_IWUSR },
436 .show = o2nm_node_ipv4_port_read,
437 .store = o2nm_node_ipv4_port_write,
438};
439
440static struct o2nm_node_attribute o2nm_node_attr_ipv4_address = {
441 .attr = { .ca_owner = THIS_MODULE,
442 .ca_name = "ipv4_address",
443 .ca_mode = S_IRUGO | S_IWUSR },
444 .show = o2nm_node_ipv4_address_read,
445 .store = o2nm_node_ipv4_address_write,
446};
447
448static struct o2nm_node_attribute o2nm_node_attr_local = {
449 .attr = { .ca_owner = THIS_MODULE,
450 .ca_name = "local",
451 .ca_mode = S_IRUGO | S_IWUSR },
452 .show = o2nm_node_local_read,
453 .store = o2nm_node_local_write,
454};
455
456static struct configfs_attribute *o2nm_node_attrs[] = {
457 [O2NM_NODE_ATTR_NUM] = &o2nm_node_attr_num.attr,
458 [O2NM_NODE_ATTR_PORT] = &o2nm_node_attr_ipv4_port.attr,
459 [O2NM_NODE_ATTR_ADDRESS] = &o2nm_node_attr_ipv4_address.attr,
460 [O2NM_NODE_ATTR_LOCAL] = &o2nm_node_attr_local.attr,
461 NULL,
462};
463
464static int o2nm_attr_index(struct configfs_attribute *attr)
465{
466 int i;
467 for (i = 0; i < ARRAY_SIZE(o2nm_node_attrs); i++) {
468 if (attr == o2nm_node_attrs[i])
469 return i;
470 }
471 BUG();
472 return 0;
473}
474
475static ssize_t o2nm_node_show(struct config_item *item,
476 struct configfs_attribute *attr,
477 char *page)
478{
479 struct o2nm_node *node = to_o2nm_node(item);
480 struct o2nm_node_attribute *o2nm_node_attr =
481 container_of(attr, struct o2nm_node_attribute, attr);
482 ssize_t ret = 0;
483
484 if (o2nm_node_attr->show)
485 ret = o2nm_node_attr->show(node, page);
486 return ret;
487}
488
489static ssize_t o2nm_node_store(struct config_item *item,
490 struct configfs_attribute *attr,
491 const char *page, size_t count)
492{
493 struct o2nm_node *node = to_o2nm_node(item);
494 struct o2nm_node_attribute *o2nm_node_attr =
495 container_of(attr, struct o2nm_node_attribute, attr);
496 ssize_t ret;
497 int attr_index = o2nm_attr_index(attr);
498
499 if (o2nm_node_attr->store == NULL) {
500 ret = -EINVAL;
501 goto out;
502 }
503
504 if (test_bit(attr_index, &node->nd_set_attributes))
505 return -EBUSY;
506
507 ret = o2nm_node_attr->store(node, page, count);
508 if (ret < count)
509 goto out;
510
511 set_bit(attr_index, &node->nd_set_attributes);
512out:
513 return ret;
514}
515
516static struct configfs_item_operations o2nm_node_item_ops = {
517 .release = o2nm_node_release,
518 .show_attribute = o2nm_node_show,
519 .store_attribute = o2nm_node_store,
520};
521
522static struct config_item_type o2nm_node_type = {
523 .ct_item_ops = &o2nm_node_item_ops,
524 .ct_attrs = o2nm_node_attrs,
525 .ct_owner = THIS_MODULE,
526};
527
528/* node set */
529
530struct o2nm_node_group {
531 struct config_group ns_group;
532 /* some stuff? */
533};
534
535#if 0
536static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group)
537{
538 return group ?
539 container_of(group, struct o2nm_node_group, ns_group)
540 : NULL;
541}
542#endif
543
544static struct config_item *o2nm_node_group_make_item(struct config_group *group,
545 const char *name)
546{
547 struct o2nm_node *node = NULL;
548 struct config_item *ret = NULL;
549
550 if (strlen(name) > O2NM_MAX_NAME_LEN)
551 goto out; /* ENAMETOOLONG */
552
553 node = kcalloc(1, sizeof(struct o2nm_node), GFP_KERNEL);
554 if (node == NULL)
555 goto out; /* ENOMEM */
556
557 strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
558 config_item_init_type_name(&node->nd_item, name, &o2nm_node_type);
559 spin_lock_init(&node->nd_lock);
560
561 ret = &node->nd_item;
562
563out:
564 if (ret == NULL)
565 kfree(node);
566
567 return ret;
568}
569
570static void o2nm_node_group_drop_item(struct config_group *group,
571 struct config_item *item)
572{
573 struct o2nm_node *node = to_o2nm_node(item);
574 struct o2nm_cluster *cluster = to_o2nm_cluster(group->cg_item.ci_parent);
575
576 o2net_disconnect_node(node);
577
578 if (cluster->cl_has_local &&
579 (cluster->cl_local_node == node->nd_num)) {
580 cluster->cl_has_local = 0;
581 cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
582 o2net_stop_listening(node);
583 }
584
585 /* XXX call into net to stop this node from trading messages */
586
587 write_lock(&cluster->cl_nodes_lock);
588
589 /* XXX sloppy */
590 if (node->nd_ipv4_address)
591 rb_erase(&node->nd_ip_node, &cluster->cl_node_ip_tree);
592
593 /* nd_num might be 0 if the node number hasn't been set.. */
594 if (cluster->cl_nodes[node->nd_num] == node) {
595 cluster->cl_nodes[node->nd_num] = NULL;
596 clear_bit(node->nd_num, cluster->cl_nodes_bitmap);
597 }
598 write_unlock(&cluster->cl_nodes_lock);
599
600 config_item_put(item);
601}
602
603static struct configfs_group_operations o2nm_node_group_group_ops = {
604 .make_item = o2nm_node_group_make_item,
605 .drop_item = o2nm_node_group_drop_item,
606};
607
608static struct config_item_type o2nm_node_group_type = {
609 .ct_group_ops = &o2nm_node_group_group_ops,
610 .ct_owner = THIS_MODULE,
611};
612
613/* cluster */
614
615static void o2nm_cluster_release(struct config_item *item)
616{
617 struct o2nm_cluster *cluster = to_o2nm_cluster(item);
618
619 kfree(cluster->cl_group.default_groups);
620 kfree(cluster);
621}
622
623static struct configfs_item_operations o2nm_cluster_item_ops = {
624 .release = o2nm_cluster_release,
625};
626
627static struct config_item_type o2nm_cluster_type = {
628 .ct_item_ops = &o2nm_cluster_item_ops,
629 .ct_owner = THIS_MODULE,
630};
631
632/* cluster set */
633
634struct o2nm_cluster_group {
635 struct configfs_subsystem cs_subsys;
636 /* some stuff? */
637};
638
639#if 0
640static struct o2nm_cluster_group *to_o2nm_cluster_group(struct config_group *group)
641{
642 return group ?
643 container_of(to_configfs_subsystem(group), struct o2nm_cluster_group, cs_subsys)
644 : NULL;
645}
646#endif
647
648static struct config_group *o2nm_cluster_group_make_group(struct config_group *group,
649 const char *name)
650{
651 struct o2nm_cluster *cluster = NULL;
652 struct o2nm_node_group *ns = NULL;
653 struct config_group *o2hb_group = NULL, *ret = NULL;
654 void *defs = NULL;
655
656 /* this runs under the parent dir's i_sem; there can be only
657 * one caller in here at a time */
658 if (o2nm_single_cluster)
659 goto out; /* ENOSPC */
660
661 cluster = kcalloc(1, sizeof(struct o2nm_cluster), GFP_KERNEL);
662 ns = kcalloc(1, sizeof(struct o2nm_node_group), GFP_KERNEL);
663 defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL);
664 o2hb_group = o2hb_alloc_hb_set();
665 if (cluster == NULL || ns == NULL || o2hb_group == NULL || defs == NULL)
666 goto out;
667
668 config_group_init_type_name(&cluster->cl_group, name,
669 &o2nm_cluster_type);
670 config_group_init_type_name(&ns->ns_group, "node",
671 &o2nm_node_group_type);
672
673 cluster->cl_group.default_groups = defs;
674 cluster->cl_group.default_groups[0] = &ns->ns_group;
675 cluster->cl_group.default_groups[1] = o2hb_group;
676 cluster->cl_group.default_groups[2] = NULL;
677 rwlock_init(&cluster->cl_nodes_lock);
678 cluster->cl_node_ip_tree = RB_ROOT;
679
680 ret = &cluster->cl_group;
681 o2nm_single_cluster = cluster;
682
683out:
684 if (ret == NULL) {
685 kfree(cluster);
686 kfree(ns);
687 o2hb_free_hb_set(o2hb_group);
688 kfree(defs);
689 }
690
691 return ret;
692}
693
694static void o2nm_cluster_group_drop_item(struct config_group *group, struct config_item *item)
695{
696 struct o2nm_cluster *cluster = to_o2nm_cluster(item);
697 int i;
698 struct config_item *killme;
699
700 BUG_ON(o2nm_single_cluster != cluster);
701 o2nm_single_cluster = NULL;
702
703 for (i = 0; cluster->cl_group.default_groups[i]; i++) {
704 killme = &cluster->cl_group.default_groups[i]->cg_item;
705 cluster->cl_group.default_groups[i] = NULL;
706 config_item_put(killme);
707 }
708
709 config_item_put(item);
710}
711
712static struct configfs_group_operations o2nm_cluster_group_group_ops = {
713 .make_group = o2nm_cluster_group_make_group,
714 .drop_item = o2nm_cluster_group_drop_item,
715};
716
717static struct config_item_type o2nm_cluster_group_type = {
718 .ct_group_ops = &o2nm_cluster_group_group_ops,
719 .ct_owner = THIS_MODULE,
720};
721
722static struct o2nm_cluster_group o2nm_cluster_group = {
723 .cs_subsys = {
724 .su_group = {
725 .cg_item = {
726 .ci_namebuf = "cluster",
727 .ci_type = &o2nm_cluster_group_type,
728 },
729 },
730 },
731};
732
733static void __exit exit_o2nm(void)
734{
735 if (ocfs2_table_header)
736 unregister_sysctl_table(ocfs2_table_header);
737
738 /* XXX sync with hb callbacks and shut down hb? */
739 o2net_unregister_hb_callbacks();
740 configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
741 o2cb_sys_shutdown();
742
743 o2net_exit();
744}
745
746static int __init init_o2nm(void)
747{
748 int ret = -1;
749
750 cluster_print_version();
751
752 o2hb_init();
753 o2net_init();
754
755 ocfs2_table_header = register_sysctl_table(ocfs2_root_table, 0);
756 if (!ocfs2_table_header) {
757 printk(KERN_ERR "nodemanager: unable to register sysctl\n");
758 ret = -ENOMEM; /* or something. */
759 goto out;
760 }
761
762 ret = o2net_register_hb_callbacks();
763 if (ret)
764 goto out_sysctl;
765
766 config_group_init(&o2nm_cluster_group.cs_subsys.su_group);
767 init_MUTEX(&o2nm_cluster_group.cs_subsys.su_sem);
768 ret = configfs_register_subsystem(&o2nm_cluster_group.cs_subsys);
769 if (ret) {
770 printk(KERN_ERR "nodemanager: Registration returned %d\n", ret);
771 goto out_callbacks;
772 }
773
774 ret = o2cb_sys_init();
775 if (!ret)
776 goto out;
777
778 configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
779out_callbacks:
780 o2net_unregister_hb_callbacks();
781out_sysctl:
782 unregister_sysctl_table(ocfs2_table_header);
783out:
784 return ret;
785}
786
787MODULE_AUTHOR("Oracle");
788MODULE_LICENSE("GPL");
789
790module_init(init_o2nm)
791module_exit(exit_o2nm)
diff --git a/fs/ocfs2/cluster/nodemanager.h b/fs/ocfs2/cluster/nodemanager.h
new file mode 100644
index 000000000000..fce8033c310f
--- /dev/null
+++ b/fs/ocfs2/cluster/nodemanager.h
@@ -0,0 +1,64 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * nodemanager.h
5 *
6 * Function prototypes
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27#ifndef O2CLUSTER_NODEMANAGER_H
28#define O2CLUSTER_NODEMANAGER_H
29
30#include "ocfs2_nodemanager.h"
31
32/* This totally doesn't belong here. */
33#include <linux/configfs.h>
34#include <linux/rbtree.h>
35
36#define KERN_OCFS2 988
37#define KERN_OCFS2_NM 1
38
39const char *o2nm_get_hb_ctl_path(void);
40
41struct o2nm_node {
42 spinlock_t nd_lock;
43 struct config_item nd_item;
44 char nd_name[O2NM_MAX_NAME_LEN+1]; /* replace? */
45 __u8 nd_num;
46 /* only one address per node, as attributes, for now. */
47 __be32 nd_ipv4_address;
48 __be16 nd_ipv4_port;
49 struct rb_node nd_ip_node;
50 /* there can be only one local node for now */
51 int nd_local;
52
53 unsigned long nd_set_attributes;
54};
55
56u8 o2nm_this_node(void);
57
58int o2nm_configured_node_map(unsigned long *map, unsigned bytes);
59struct o2nm_node *o2nm_get_node_by_num(u8 node_num);
60struct o2nm_node *o2nm_get_node_by_ip(__be32 addr);
61void o2nm_node_get(struct o2nm_node *node);
62void o2nm_node_put(struct o2nm_node *node);
63
64#endif /* O2CLUSTER_NODEMANAGER_H */
diff --git a/fs/ocfs2/cluster/ocfs2_heartbeat.h b/fs/ocfs2/cluster/ocfs2_heartbeat.h
new file mode 100644
index 000000000000..94096069cb43
--- /dev/null
+++ b/fs/ocfs2/cluster/ocfs2_heartbeat.h
@@ -0,0 +1,37 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * ocfs2_heartbeat.h
5 *
6 * On-disk structures for ocfs2_heartbeat
7 *
8 * Copyright (C) 2002, 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#ifndef _OCFS2_HEARTBEAT_H
27#define _OCFS2_HEARTBEAT_H
28
29struct o2hb_disk_heartbeat_block {
30 __le64 hb_seq;
31 __u8 hb_node;
32 __u8 hb_pad1[3];
33 __le32 hb_cksum;
34 __le64 hb_generation;
35};
36
37#endif /* _OCFS2_HEARTBEAT_H */
diff --git a/fs/ocfs2/cluster/ocfs2_nodemanager.h b/fs/ocfs2/cluster/ocfs2_nodemanager.h
new file mode 100644
index 000000000000..5b9854bad571
--- /dev/null
+++ b/fs/ocfs2/cluster/ocfs2_nodemanager.h
@@ -0,0 +1,39 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * ocfs2_nodemanager.h
5 *
6 * Header describing the interface between userspace and the kernel
7 * for the ocfs2_nodemanager module.
8 *
9 * Copyright (C) 2002, 2004 Oracle. All rights reserved.
10 *
11 * This program is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU General Public
13 * License as published by the Free Software Foundation; either
14 * version 2 of the License, or (at your option) any later version.
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * General Public License for more details.
20 *
21 * You should have received a copy of the GNU General Public
22 * License along with this program; if not, write to the
23 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
24 * Boston, MA 021110-1307, USA.
25 *
26 */
27
28#ifndef _OCFS2_NODEMANAGER_H
29#define _OCFS2_NODEMANAGER_H
30
31#define O2NM_API_VERSION 5
32
33#define O2NM_MAX_NODES 255
34#define O2NM_INVALID_NODE_NUM 255
35
36/* host name, group name, cluster name all 64 bytes */
37#define O2NM_MAX_NAME_LEN 64 // __NEW_UTS_LEN
38
39#endif /* _OCFS2_NODEMANAGER_H */
diff --git a/fs/ocfs2/cluster/quorum.c b/fs/ocfs2/cluster/quorum.c
new file mode 100644
index 000000000000..7bba98fbfc15
--- /dev/null
+++ b/fs/ocfs2/cluster/quorum.c
@@ -0,0 +1,315 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 *
3 * vim: noexpandtab sw=8 ts=8 sts=0:
4 *
5 * Copyright (C) 2005 Oracle. All rights reserved.
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public
18 * License along with this program; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 021110-1307, USA.
21 */
22
23/* This quorum hack is only here until we transition to some more rational
24 * approach that is driven from userspace. Honest. No foolin'.
25 *
26 * Imagine two nodes lose network connectivity to each other but they're still
27 * up and operating in every other way. Presumably a network timeout indicates
28 * that a node is broken and should be recovered. They can't both recover each
29 * other and both carry on without serialising their access to the file system.
30 * They need to decide who is authoritative. Now extend that problem to
31 * arbitrary groups of nodes losing connectivity between each other.
32 *
33 * So we declare that a node which has given up on connecting to a majority
34 * of nodes who are still heartbeating will fence itself.
35 *
36 * There are huge opportunities for races here. After we give up on a node's
37 * connection we need to wait long enough to give heartbeat an opportunity
38 * to declare the node as truly dead. We also need to be careful with the
39 * race between when we see a node start heartbeating and when we connect
40 * to it.
41 *
42 * So nodes that are in this transtion put a hold on the quorum decision
43 * with a counter. As they fall out of this transition they drop the count
44 * and if they're the last, they fire off the decision.
45 */
46#include <linux/kernel.h>
47#include <linux/slab.h>
48#include <linux/workqueue.h>
49
50#include "heartbeat.h"
51#include "nodemanager.h"
52#define MLOG_MASK_PREFIX ML_QUORUM
53#include "masklog.h"
54#include "quorum.h"
55
56static struct o2quo_state {
57 spinlock_t qs_lock;
58 struct work_struct qs_work;
59 int qs_pending;
60 int qs_heartbeating;
61 unsigned long qs_hb_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
62 int qs_connected;
63 unsigned long qs_conn_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
64 int qs_holds;
65 unsigned long qs_hold_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
66} o2quo_state;
67
68/* this is horribly heavy-handed. It should instead flip the file
69 * system RO and call some userspace script. */
70static void o2quo_fence_self(void)
71{
72 /* panic spins with interrupts enabled. with preempt
73 * threads can still schedule, etc, etc */
74 o2hb_stop_all_regions();
75 panic("ocfs2 is very sorry to be fencing this system by panicing\n");
76}
77
78/* Indicate that a timeout occured on a hearbeat region write. The
79 * other nodes in the cluster may consider us dead at that time so we
80 * want to "fence" ourselves so that we don't scribble on the disk
81 * after they think they've recovered us. This can't solve all
82 * problems related to writeout after recovery but this hack can at
83 * least close some of those gaps. When we have real fencing, this can
84 * go away as our node would be fenced externally before other nodes
85 * begin recovery. */
86void o2quo_disk_timeout(void)
87{
88 o2quo_fence_self();
89}
90
91static void o2quo_make_decision(void *arg)
92{
93 int quorum;
94 int lowest_hb, lowest_reachable = 0, fence = 0;
95 struct o2quo_state *qs = &o2quo_state;
96
97 spin_lock(&qs->qs_lock);
98
99 lowest_hb = find_first_bit(qs->qs_hb_bm, O2NM_MAX_NODES);
100 if (lowest_hb != O2NM_MAX_NODES)
101 lowest_reachable = test_bit(lowest_hb, qs->qs_conn_bm);
102
103 mlog(0, "heartbeating: %d, connected: %d, "
104 "lowest: %d (%sreachable)\n", qs->qs_heartbeating,
105 qs->qs_connected, lowest_hb, lowest_reachable ? "" : "un");
106
107 if (!test_bit(o2nm_this_node(), qs->qs_hb_bm) ||
108 qs->qs_heartbeating == 1)
109 goto out;
110
111 if (qs->qs_heartbeating & 1) {
112 /* the odd numbered cluster case is straight forward --
113 * if we can't talk to the majority we're hosed */
114 quorum = (qs->qs_heartbeating + 1)/2;
115 if (qs->qs_connected < quorum) {
116 mlog(ML_ERROR, "fencing this node because it is "
117 "only connected to %u nodes and %u is needed "
118 "to make a quorum out of %u heartbeating nodes\n",
119 qs->qs_connected, quorum,
120 qs->qs_heartbeating);
121 fence = 1;
122 }
123 } else {
124 /* the even numbered cluster adds the possibility of each half
125 * of the cluster being able to talk amongst themselves.. in
126 * that case we're hosed if we can't talk to the group that has
127 * the lowest numbered node */
128 quorum = qs->qs_heartbeating / 2;
129 if (qs->qs_connected < quorum) {
130 mlog(ML_ERROR, "fencing this node because it is "
131 "only connected to %u nodes and %u is needed "
132 "to make a quorum out of %u heartbeating nodes\n",
133 qs->qs_connected, quorum,
134 qs->qs_heartbeating);
135 fence = 1;
136 }
137 else if ((qs->qs_connected == quorum) &&
138 !lowest_reachable) {
139 mlog(ML_ERROR, "fencing this node because it is "
140 "connected to a half-quorum of %u out of %u "
141 "nodes which doesn't include the lowest active "
142 "node %u\n", quorum, qs->qs_heartbeating,
143 lowest_hb);
144 fence = 1;
145 }
146 }
147
148out:
149 spin_unlock(&qs->qs_lock);
150 if (fence)
151 o2quo_fence_self();
152}
153
154static void o2quo_set_hold(struct o2quo_state *qs, u8 node)
155{
156 assert_spin_locked(&qs->qs_lock);
157
158 if (!test_and_set_bit(node, qs->qs_hold_bm)) {
159 qs->qs_holds++;
160 mlog_bug_on_msg(qs->qs_holds == O2NM_MAX_NODES,
161 "node %u\n", node);
162 mlog(0, "node %u, %d total\n", node, qs->qs_holds);
163 }
164}
165
166static void o2quo_clear_hold(struct o2quo_state *qs, u8 node)
167{
168 assert_spin_locked(&qs->qs_lock);
169
170 if (test_and_clear_bit(node, qs->qs_hold_bm)) {
171 mlog(0, "node %u, %d total\n", node, qs->qs_holds - 1);
172 if (--qs->qs_holds == 0) {
173 if (qs->qs_pending) {
174 qs->qs_pending = 0;
175 schedule_work(&qs->qs_work);
176 }
177 }
178 mlog_bug_on_msg(qs->qs_holds < 0, "node %u, holds %d\n",
179 node, qs->qs_holds);
180 }
181}
182
183/* as a node comes up we delay the quorum decision until we know the fate of
184 * the connection. the hold will be droped in conn_up or hb_down. it might be
185 * perpetuated by con_err until hb_down. if we already have a conn, we might
186 * be dropping a hold that conn_up got. */
187void o2quo_hb_up(u8 node)
188{
189 struct o2quo_state *qs = &o2quo_state;
190
191 spin_lock(&qs->qs_lock);
192
193 qs->qs_heartbeating++;
194 mlog_bug_on_msg(qs->qs_heartbeating == O2NM_MAX_NODES,
195 "node %u\n", node);
196 mlog_bug_on_msg(test_bit(node, qs->qs_hb_bm), "node %u\n", node);
197 set_bit(node, qs->qs_hb_bm);
198
199 mlog(0, "node %u, %d total\n", node, qs->qs_heartbeating);
200
201 if (!test_bit(node, qs->qs_conn_bm))
202 o2quo_set_hold(qs, node);
203 else
204 o2quo_clear_hold(qs, node);
205
206 spin_unlock(&qs->qs_lock);
207}
208
209/* hb going down releases any holds we might have had due to this node from
210 * conn_up, conn_err, or hb_up */
211void o2quo_hb_down(u8 node)
212{
213 struct o2quo_state *qs = &o2quo_state;
214
215 spin_lock(&qs->qs_lock);
216
217 qs->qs_heartbeating--;
218 mlog_bug_on_msg(qs->qs_heartbeating < 0,
219 "node %u, %d heartbeating\n",
220 node, qs->qs_heartbeating);
221 mlog_bug_on_msg(!test_bit(node, qs->qs_hb_bm), "node %u\n", node);
222 clear_bit(node, qs->qs_hb_bm);
223
224 mlog(0, "node %u, %d total\n", node, qs->qs_heartbeating);
225
226 o2quo_clear_hold(qs, node);
227
228 spin_unlock(&qs->qs_lock);
229}
230
231/* this tells us that we've decided that the node is still heartbeating
232 * even though we've lost it's conn. it must only be called after conn_err
233 * and indicates that we must now make a quorum decision in the future,
234 * though we might be doing so after waiting for holds to drain. Here
235 * we'll be dropping the hold from conn_err. */
236void o2quo_hb_still_up(u8 node)
237{
238 struct o2quo_state *qs = &o2quo_state;
239
240 spin_lock(&qs->qs_lock);
241
242 mlog(0, "node %u\n", node);
243
244 qs->qs_pending = 1;
245 o2quo_clear_hold(qs, node);
246
247 spin_unlock(&qs->qs_lock);
248}
249
250/* This is analagous to hb_up. as a node's connection comes up we delay the
251 * quorum decision until we see it heartbeating. the hold will be droped in
252 * hb_up or hb_down. it might be perpetuated by con_err until hb_down. if
253 * it's already heartbeating we we might be dropping a hold that conn_up got.
254 * */
255void o2quo_conn_up(u8 node)
256{
257 struct o2quo_state *qs = &o2quo_state;
258
259 spin_lock(&qs->qs_lock);
260
261 qs->qs_connected++;
262 mlog_bug_on_msg(qs->qs_connected == O2NM_MAX_NODES,
263 "node %u\n", node);
264 mlog_bug_on_msg(test_bit(node, qs->qs_conn_bm), "node %u\n", node);
265 set_bit(node, qs->qs_conn_bm);
266
267 mlog(0, "node %u, %d total\n", node, qs->qs_connected);
268
269 if (!test_bit(node, qs->qs_hb_bm))
270 o2quo_set_hold(qs, node);
271 else
272 o2quo_clear_hold(qs, node);
273
274 spin_unlock(&qs->qs_lock);
275}
276
277/* we've decided that we won't ever be connecting to the node again. if it's
278 * still heartbeating we grab a hold that will delay decisions until either the
279 * node stops heartbeating from hb_down or the caller decides that the node is
280 * still up and calls still_up */
281void o2quo_conn_err(u8 node)
282{
283 struct o2quo_state *qs = &o2quo_state;
284
285 spin_lock(&qs->qs_lock);
286
287 if (test_bit(node, qs->qs_conn_bm)) {
288 qs->qs_connected--;
289 mlog_bug_on_msg(qs->qs_connected < 0,
290 "node %u, connected %d\n",
291 node, qs->qs_connected);
292
293 clear_bit(node, qs->qs_conn_bm);
294 }
295
296 mlog(0, "node %u, %d total\n", node, qs->qs_connected);
297
298 if (test_bit(node, qs->qs_hb_bm))
299 o2quo_set_hold(qs, node);
300
301 spin_unlock(&qs->qs_lock);
302}
303
304void o2quo_init(void)
305{
306 struct o2quo_state *qs = &o2quo_state;
307
308 spin_lock_init(&qs->qs_lock);
309 INIT_WORK(&qs->qs_work, o2quo_make_decision, NULL);
310}
311
312void o2quo_exit(void)
313{
314 flush_scheduled_work();
315}
diff --git a/fs/ocfs2/cluster/quorum.h b/fs/ocfs2/cluster/quorum.h
new file mode 100644
index 000000000000..6649cc6f67c9
--- /dev/null
+++ b/fs/ocfs2/cluster/quorum.h
@@ -0,0 +1,36 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * Copyright (C) 2005 Oracle. All rights reserved.
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA.
20 *
21 */
22
23#ifndef O2CLUSTER_QUORUM_H
24#define O2CLUSTER_QUORUM_H
25
26void o2quo_init(void);
27void o2quo_exit(void);
28
29void o2quo_hb_up(u8 node);
30void o2quo_hb_down(u8 node);
31void o2quo_hb_still_up(u8 node);
32void o2quo_conn_up(u8 node);
33void o2quo_conn_err(u8 node);
34void o2quo_disk_timeout(void);
35
36#endif /* O2CLUSTER_QUORUM_H */
diff --git a/fs/ocfs2/cluster/sys.c b/fs/ocfs2/cluster/sys.c
new file mode 100644
index 000000000000..1d9f6acafa2e
--- /dev/null
+++ b/fs/ocfs2/cluster/sys.c
@@ -0,0 +1,124 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * sys.c
5 *
6 * OCFS2 cluster sysfs interface
7 *
8 * Copyright (C) 2005 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation,
13 * version 2 of the License.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27#include <linux/kernel.h>
28#include <linux/module.h>
29#include <linux/kobject.h>
30#include <linux/sysfs.h>
31
32#include "ocfs2_nodemanager.h"
33#include "masklog.h"
34#include "sys.h"
35
36struct o2cb_attribute {
37 struct attribute attr;
38 ssize_t (*show)(char *buf);
39 ssize_t (*store)(const char *buf, size_t count);
40};
41
42#define O2CB_ATTR(_name, _mode, _show, _store) \
43struct o2cb_attribute o2cb_attr_##_name = __ATTR(_name, _mode, _show, _store)
44
45#define to_o2cb_subsys(k) container_of(to_kset(k), struct subsystem, kset)
46#define to_o2cb_attr(_attr) container_of(_attr, struct o2cb_attribute, attr)
47
48static ssize_t o2cb_interface_revision_show(char *buf)
49{
50 return snprintf(buf, PAGE_SIZE, "%u\n", O2NM_API_VERSION);
51}
52
53static O2CB_ATTR(interface_revision, S_IFREG | S_IRUGO, o2cb_interface_revision_show, NULL);
54
55static struct attribute *o2cb_attrs[] = {
56 &o2cb_attr_interface_revision.attr,
57 NULL,
58};
59
60static ssize_t
61o2cb_show(struct kobject * kobj, struct attribute * attr, char * buffer);
62static ssize_t
63o2cb_store(struct kobject * kobj, struct attribute * attr,
64 const char * buffer, size_t count);
65static struct sysfs_ops o2cb_sysfs_ops = {
66 .show = o2cb_show,
67 .store = o2cb_store,
68};
69
70static struct kobj_type o2cb_subsys_type = {
71 .default_attrs = o2cb_attrs,
72 .sysfs_ops = &o2cb_sysfs_ops,
73};
74
75/* gives us o2cb_subsys */
76static decl_subsys(o2cb, NULL, NULL);
77
78static ssize_t
79o2cb_show(struct kobject * kobj, struct attribute * attr, char * buffer)
80{
81 struct o2cb_attribute *o2cb_attr = to_o2cb_attr(attr);
82 struct subsystem *sbs = to_o2cb_subsys(kobj);
83
84 BUG_ON(sbs != &o2cb_subsys);
85
86 if (o2cb_attr->show)
87 return o2cb_attr->show(buffer);
88 return -EIO;
89}
90
91static ssize_t
92o2cb_store(struct kobject * kobj, struct attribute * attr,
93 const char * buffer, size_t count)
94{
95 struct o2cb_attribute *o2cb_attr = to_o2cb_attr(attr);
96 struct subsystem *sbs = to_o2cb_subsys(kobj);
97
98 BUG_ON(sbs != &o2cb_subsys);
99
100 if (o2cb_attr->store)
101 return o2cb_attr->store(buffer, count);
102 return -EIO;
103}
104
105void o2cb_sys_shutdown(void)
106{
107 mlog_sys_shutdown();
108 subsystem_unregister(&o2cb_subsys);
109}
110
111int o2cb_sys_init(void)
112{
113 int ret;
114
115 o2cb_subsys.kset.kobj.ktype = &o2cb_subsys_type;
116 ret = subsystem_register(&o2cb_subsys);
117 if (ret)
118 return ret;
119
120 ret = mlog_sys_init(&o2cb_subsys);
121 if (ret)
122 subsystem_unregister(&o2cb_subsys);
123 return ret;
124}
diff --git a/fs/ocfs2/cluster/sys.h b/fs/ocfs2/cluster/sys.h
new file mode 100644
index 000000000000..d66b8ab0045e
--- /dev/null
+++ b/fs/ocfs2/cluster/sys.h
@@ -0,0 +1,33 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * sys.h
5 *
6 * Function prototypes for o2cb sysfs interface
7 *
8 * Copyright (C) 2005 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation,
13 * version 2 of the License.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27#ifndef O2CLUSTER_SYS_H
28#define O2CLUSTER_SYS_H
29
30void o2cb_sys_shutdown(void);
31int o2cb_sys_init(void);
32
33#endif /* O2CLUSTER_SYS_H */
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
new file mode 100644
index 000000000000..35d92c01a972
--- /dev/null
+++ b/fs/ocfs2/cluster/tcp.c
@@ -0,0 +1,1829 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 *
3 * vim: noexpandtab sw=8 ts=8 sts=0:
4 *
5 * Copyright (C) 2004 Oracle. All rights reserved.
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public
18 * License along with this program; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20 * Boston, MA 021110-1307, USA.
21 *
22 * ----
23 *
24 * Callers for this were originally written against a very simple synchronus
25 * API. This implementation reflects those simple callers. Some day I'm sure
26 * we'll need to move to a more robust posting/callback mechanism.
27 *
28 * Transmit calls pass in kernel virtual addresses and block copying this into
29 * the socket's tx buffers via a usual blocking sendmsg. They'll block waiting
30 * for a failed socket to timeout. TX callers can also pass in a poniter to an
31 * 'int' which gets filled with an errno off the wire in response to the
32 * message they send.
33 *
34 * Handlers for unsolicited messages are registered. Each socket has a page
35 * that incoming data is copied into. First the header, then the data.
36 * Handlers are called from only one thread with a reference to this per-socket
37 * page. This page is destroyed after the handler call, so it can't be
38 * referenced beyond the call. Handlers may block but are discouraged from
39 * doing so.
40 *
41 * Any framing errors (bad magic, large payload lengths) close a connection.
42 *
43 * Our sock_container holds the state we associate with a socket. It's current
44 * framing state is held there as well as the refcounting we do around when it
45 * is safe to tear down the socket. The socket is only finally torn down from
46 * the container when the container loses all of its references -- so as long
47 * as you hold a ref on the container you can trust that the socket is valid
48 * for use with kernel socket APIs.
49 *
50 * Connections are initiated between a pair of nodes when the node with the
51 * higher node number gets a heartbeat callback which indicates that the lower
52 * numbered node has started heartbeating. The lower numbered node is passive
53 * and only accepts the connection if the higher numbered node is heartbeating.
54 */
55
56#include <linux/kernel.h>
57#include <linux/jiffies.h>
58#include <linux/slab.h>
59#include <linux/idr.h>
60#include <linux/kref.h>
61#include <net/tcp.h>
62
63#include <asm/uaccess.h>
64
65#include "heartbeat.h"
66#include "tcp.h"
67#include "nodemanager.h"
68#define MLOG_MASK_PREFIX ML_TCP
69#include "masklog.h"
70#include "quorum.h"
71
72#include "tcp_internal.h"
73
74/*
75 * The linux network stack isn't sparse endian clean.. It has macros like
76 * ntohs() which perform the endian checks and structs like sockaddr_in
77 * which aren't annotated. So __force is found here to get the build
78 * clean. When they emerge from the dark ages and annotate the code
79 * we can remove these.
80 */
81
82#define SC_NODEF_FMT "node %s (num %u) at %u.%u.%u.%u:%u"
83#define SC_NODEF_ARGS(sc) sc->sc_node->nd_name, sc->sc_node->nd_num, \
84 NIPQUAD(sc->sc_node->nd_ipv4_address), \
85 ntohs(sc->sc_node->nd_ipv4_port)
86
87/*
88 * In the following two log macros, the whitespace after the ',' just
89 * before ##args is intentional. Otherwise, gcc 2.95 will eat the
90 * previous token if args expands to nothing.
91 */
92#define msglog(hdr, fmt, args...) do { \
93 typeof(hdr) __hdr = (hdr); \
94 mlog(ML_MSG, "[mag %u len %u typ %u stat %d sys_stat %d " \
95 "key %08x num %u] " fmt, \
96 be16_to_cpu(__hdr->magic), be16_to_cpu(__hdr->data_len), \
97 be16_to_cpu(__hdr->msg_type), be32_to_cpu(__hdr->status), \
98 be32_to_cpu(__hdr->sys_status), be32_to_cpu(__hdr->key), \
99 be32_to_cpu(__hdr->msg_num) , ##args); \
100} while (0)
101
102#define sclog(sc, fmt, args...) do { \
103 typeof(sc) __sc = (sc); \
104 mlog(ML_SOCKET, "[sc %p refs %d sock %p node %u page %p " \
105 "pg_off %zu] " fmt, __sc, \
106 atomic_read(&__sc->sc_kref.refcount), __sc->sc_sock, \
107 __sc->sc_node->nd_num, __sc->sc_page, __sc->sc_page_off , \
108 ##args); \
109} while (0)
110
111static rwlock_t o2net_handler_lock = RW_LOCK_UNLOCKED;
112static struct rb_root o2net_handler_tree = RB_ROOT;
113
114static struct o2net_node o2net_nodes[O2NM_MAX_NODES];
115
116/* XXX someday we'll need better accounting */
117static struct socket *o2net_listen_sock = NULL;
118
119/*
120 * listen work is only queued by the listening socket callbacks on the
121 * o2net_wq. teardown detaches the callbacks before destroying the workqueue.
122 * quorum work is queued as sock containers are shutdown.. stop_listening
123 * tears down all the node's sock containers, preventing future shutdowns
124 * and queued quroum work, before canceling delayed quorum work and
125 * destroying the work queue.
126 */
127static struct workqueue_struct *o2net_wq;
128static struct work_struct o2net_listen_work;
129
130static struct o2hb_callback_func o2net_hb_up, o2net_hb_down;
131#define O2NET_HB_PRI 0x1
132
133static struct o2net_handshake *o2net_hand;
134static struct o2net_msg *o2net_keep_req, *o2net_keep_resp;
135
136static int o2net_sys_err_translations[O2NET_ERR_MAX] =
137 {[O2NET_ERR_NONE] = 0,
138 [O2NET_ERR_NO_HNDLR] = -ENOPROTOOPT,
139 [O2NET_ERR_OVERFLOW] = -EOVERFLOW,
140 [O2NET_ERR_DIED] = -EHOSTDOWN,};
141
142/* can't quite avoid *all* internal declarations :/ */
143static void o2net_sc_connect_completed(void *arg);
144static void o2net_rx_until_empty(void *arg);
145static void o2net_shutdown_sc(void *arg);
146static void o2net_listen_data_ready(struct sock *sk, int bytes);
147static void o2net_sc_send_keep_req(void *arg);
148static void o2net_idle_timer(unsigned long data);
149static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
150
151static inline int o2net_sys_err_to_errno(enum o2net_system_error err)
152{
153 int trans;
154 BUG_ON(err >= O2NET_ERR_MAX);
155 trans = o2net_sys_err_translations[err];
156
157 /* Just in case we mess up the translation table above */
158 BUG_ON(err != O2NET_ERR_NONE && trans == 0);
159 return trans;
160}
161
162static struct o2net_node * o2net_nn_from_num(u8 node_num)
163{
164 BUG_ON(node_num >= ARRAY_SIZE(o2net_nodes));
165 return &o2net_nodes[node_num];
166}
167
168static u8 o2net_num_from_nn(struct o2net_node *nn)
169{
170 BUG_ON(nn == NULL);
171 return nn - o2net_nodes;
172}
173
174/* ------------------------------------------------------------ */
175
176static int o2net_prep_nsw(struct o2net_node *nn, struct o2net_status_wait *nsw)
177{
178 int ret = 0;
179
180 do {
181 if (!idr_pre_get(&nn->nn_status_idr, GFP_ATOMIC)) {
182 ret = -EAGAIN;
183 break;
184 }
185 spin_lock(&nn->nn_lock);
186 ret = idr_get_new(&nn->nn_status_idr, nsw, &nsw->ns_id);
187 if (ret == 0)
188 list_add_tail(&nsw->ns_node_item,
189 &nn->nn_status_list);
190 spin_unlock(&nn->nn_lock);
191 } while (ret == -EAGAIN);
192
193 if (ret == 0) {
194 init_waitqueue_head(&nsw->ns_wq);
195 nsw->ns_sys_status = O2NET_ERR_NONE;
196 nsw->ns_status = 0;
197 }
198
199 return ret;
200}
201
202static void o2net_complete_nsw_locked(struct o2net_node *nn,
203 struct o2net_status_wait *nsw,
204 enum o2net_system_error sys_status,
205 s32 status)
206{
207 assert_spin_locked(&nn->nn_lock);
208
209 if (!list_empty(&nsw->ns_node_item)) {
210 list_del_init(&nsw->ns_node_item);
211 nsw->ns_sys_status = sys_status;
212 nsw->ns_status = status;
213 idr_remove(&nn->nn_status_idr, nsw->ns_id);
214 wake_up(&nsw->ns_wq);
215 }
216}
217
218static void o2net_complete_nsw(struct o2net_node *nn,
219 struct o2net_status_wait *nsw,
220 u64 id, enum o2net_system_error sys_status,
221 s32 status)
222{
223 spin_lock(&nn->nn_lock);
224 if (nsw == NULL) {
225 if (id > INT_MAX)
226 goto out;
227
228 nsw = idr_find(&nn->nn_status_idr, id);
229 if (nsw == NULL)
230 goto out;
231 }
232
233 o2net_complete_nsw_locked(nn, nsw, sys_status, status);
234
235out:
236 spin_unlock(&nn->nn_lock);
237 return;
238}
239
240static void o2net_complete_nodes_nsw(struct o2net_node *nn)
241{
242 struct list_head *iter, *tmp;
243 unsigned int num_kills = 0;
244 struct o2net_status_wait *nsw;
245
246 assert_spin_locked(&nn->nn_lock);
247
248 list_for_each_safe(iter, tmp, &nn->nn_status_list) {
249 nsw = list_entry(iter, struct o2net_status_wait, ns_node_item);
250 o2net_complete_nsw_locked(nn, nsw, O2NET_ERR_DIED, 0);
251 num_kills++;
252 }
253
254 mlog(0, "completed %d messages for node %u\n", num_kills,
255 o2net_num_from_nn(nn));
256}
257
258static int o2net_nsw_completed(struct o2net_node *nn,
259 struct o2net_status_wait *nsw)
260{
261 int completed;
262 spin_lock(&nn->nn_lock);
263 completed = list_empty(&nsw->ns_node_item);
264 spin_unlock(&nn->nn_lock);
265 return completed;
266}
267
268/* ------------------------------------------------------------ */
269
270static void sc_kref_release(struct kref *kref)
271{
272 struct o2net_sock_container *sc = container_of(kref,
273 struct o2net_sock_container, sc_kref);
274 sclog(sc, "releasing\n");
275
276 if (sc->sc_sock) {
277 sock_release(sc->sc_sock);
278 sc->sc_sock = NULL;
279 }
280
281 o2nm_node_put(sc->sc_node);
282 sc->sc_node = NULL;
283
284 kfree(sc);
285}
286
287static void sc_put(struct o2net_sock_container *sc)
288{
289 sclog(sc, "put\n");
290 kref_put(&sc->sc_kref, sc_kref_release);
291}
292static void sc_get(struct o2net_sock_container *sc)
293{
294 sclog(sc, "get\n");
295 kref_get(&sc->sc_kref);
296}
297static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
298{
299 struct o2net_sock_container *sc, *ret = NULL;
300 struct page *page = NULL;
301
302 page = alloc_page(GFP_NOFS);
303 sc = kcalloc(1, sizeof(*sc), GFP_NOFS);
304 if (sc == NULL || page == NULL)
305 goto out;
306
307 kref_init(&sc->sc_kref);
308 o2nm_node_get(node);
309 sc->sc_node = node;
310
311 INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed, sc);
312 INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty, sc);
313 INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc, sc);
314 INIT_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req, sc);
315
316 init_timer(&sc->sc_idle_timeout);
317 sc->sc_idle_timeout.function = o2net_idle_timer;
318 sc->sc_idle_timeout.data = (unsigned long)sc;
319
320 sclog(sc, "alloced\n");
321
322 ret = sc;
323 sc->sc_page = page;
324 sc = NULL;
325 page = NULL;
326
327out:
328 if (page)
329 __free_page(page);
330 kfree(sc);
331
332 return ret;
333}
334
335/* ------------------------------------------------------------ */
336
337static void o2net_sc_queue_work(struct o2net_sock_container *sc,
338 struct work_struct *work)
339{
340 sc_get(sc);
341 if (!queue_work(o2net_wq, work))
342 sc_put(sc);
343}
344static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
345 struct work_struct *work,
346 int delay)
347{
348 sc_get(sc);
349 if (!queue_delayed_work(o2net_wq, work, delay))
350 sc_put(sc);
351}
352static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
353 struct work_struct *work)
354{
355 if (cancel_delayed_work(work))
356 sc_put(sc);
357}
358
359static void o2net_set_nn_state(struct o2net_node *nn,
360 struct o2net_sock_container *sc,
361 unsigned valid, int err)
362{
363 int was_valid = nn->nn_sc_valid;
364 int was_err = nn->nn_persistent_error;
365 struct o2net_sock_container *old_sc = nn->nn_sc;
366
367 assert_spin_locked(&nn->nn_lock);
368
369 /* the node num comparison and single connect/accept path should stop
370 * an non-null sc from being overwritten with another */
371 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
372 mlog_bug_on_msg(err && valid, "err %d valid %u\n", err, valid);
373 mlog_bug_on_msg(valid && !sc, "valid %u sc %p\n", valid, sc);
374
375 /* we won't reconnect after our valid conn goes away for
376 * this hb iteration.. here so it shows up in the logs */
377 if (was_valid && !valid && err == 0)
378 err = -ENOTCONN;
379
380 mlog(ML_CONN, "node %u sc: %p -> %p, valid %u -> %u, err %d -> %d\n",
381 o2net_num_from_nn(nn), nn->nn_sc, sc, nn->nn_sc_valid, valid,
382 nn->nn_persistent_error, err);
383
384 nn->nn_sc = sc;
385 nn->nn_sc_valid = valid ? 1 : 0;
386 nn->nn_persistent_error = err;
387
388 /* mirrors o2net_tx_can_proceed() */
389 if (nn->nn_persistent_error || nn->nn_sc_valid)
390 wake_up(&nn->nn_sc_wq);
391
392 if (!was_err && nn->nn_persistent_error) {
393 o2quo_conn_err(o2net_num_from_nn(nn));
394 queue_delayed_work(o2net_wq, &nn->nn_still_up,
395 msecs_to_jiffies(O2NET_QUORUM_DELAY_MS));
396 }
397
398 if (was_valid && !valid) {
399 mlog(ML_NOTICE, "no longer connected to " SC_NODEF_FMT "\n",
400 SC_NODEF_ARGS(old_sc));
401 o2net_complete_nodes_nsw(nn);
402 }
403
404 if (!was_valid && valid) {
405 o2quo_conn_up(o2net_num_from_nn(nn));
406 /* this is a bit of a hack. we only try reconnecting
407 * when heartbeating starts until we get a connection.
408 * if that connection then dies we don't try reconnecting.
409 * the only way to start connecting again is to down
410 * heartbeat and bring it back up. */
411 cancel_delayed_work(&nn->nn_connect_expired);
412 mlog(ML_NOTICE, "%s " SC_NODEF_FMT "\n",
413 o2nm_this_node() > sc->sc_node->nd_num ?
414 "connected to" : "accepted connection from",
415 SC_NODEF_ARGS(sc));
416 }
417
418 /* trigger the connecting worker func as long as we're not valid,
419 * it will back off if it shouldn't connect. This can be called
420 * from node config teardown and so needs to be careful about
421 * the work queue actually being up. */
422 if (!valid && o2net_wq) {
423 unsigned long delay;
424 /* delay if we're withing a RECONNECT_DELAY of the
425 * last attempt */
426 delay = (nn->nn_last_connect_attempt +
427 msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
428 - jiffies;
429 if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
430 delay = 0;
431 mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
432 queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
433 }
434
435 /* keep track of the nn's sc ref for the caller */
436 if ((old_sc == NULL) && sc)
437 sc_get(sc);
438 if (old_sc && (old_sc != sc)) {
439 o2net_sc_queue_work(old_sc, &old_sc->sc_shutdown_work);
440 sc_put(old_sc);
441 }
442}
443
444/* see o2net_register_callbacks() */
445static void o2net_data_ready(struct sock *sk, int bytes)
446{
447 void (*ready)(struct sock *sk, int bytes);
448
449 read_lock(&sk->sk_callback_lock);
450 if (sk->sk_user_data) {
451 struct o2net_sock_container *sc = sk->sk_user_data;
452 sclog(sc, "data_ready hit\n");
453 do_gettimeofday(&sc->sc_tv_data_ready);
454 o2net_sc_queue_work(sc, &sc->sc_rx_work);
455 ready = sc->sc_data_ready;
456 } else {
457 ready = sk->sk_data_ready;
458 }
459 read_unlock(&sk->sk_callback_lock);
460
461 ready(sk, bytes);
462}
463
464/* see o2net_register_callbacks() */
465static void o2net_state_change(struct sock *sk)
466{
467 void (*state_change)(struct sock *sk);
468 struct o2net_sock_container *sc;
469
470 read_lock(&sk->sk_callback_lock);
471 sc = sk->sk_user_data;
472 if (sc == NULL) {
473 state_change = sk->sk_state_change;
474 goto out;
475 }
476
477 sclog(sc, "state_change to %d\n", sk->sk_state);
478
479 state_change = sc->sc_state_change;
480
481 switch(sk->sk_state) {
482 /* ignore connecting sockets as they make progress */
483 case TCP_SYN_SENT:
484 case TCP_SYN_RECV:
485 break;
486 case TCP_ESTABLISHED:
487 o2net_sc_queue_work(sc, &sc->sc_connect_work);
488 break;
489 default:
490 o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
491 break;
492 }
493out:
494 read_unlock(&sk->sk_callback_lock);
495 state_change(sk);
496}
497
498/*
499 * we register callbacks so we can queue work on events before calling
500 * the original callbacks. our callbacks our careful to test user_data
501 * to discover when they've reaced with o2net_unregister_callbacks().
502 */
503static void o2net_register_callbacks(struct sock *sk,
504 struct o2net_sock_container *sc)
505{
506 write_lock_bh(&sk->sk_callback_lock);
507
508 /* accepted sockets inherit the old listen socket data ready */
509 if (sk->sk_data_ready == o2net_listen_data_ready) {
510 sk->sk_data_ready = sk->sk_user_data;
511 sk->sk_user_data = NULL;
512 }
513
514 BUG_ON(sk->sk_user_data != NULL);
515 sk->sk_user_data = sc;
516 sc_get(sc);
517
518 sc->sc_data_ready = sk->sk_data_ready;
519 sc->sc_state_change = sk->sk_state_change;
520 sk->sk_data_ready = o2net_data_ready;
521 sk->sk_state_change = o2net_state_change;
522
523 write_unlock_bh(&sk->sk_callback_lock);
524}
525
526static int o2net_unregister_callbacks(struct sock *sk,
527 struct o2net_sock_container *sc)
528{
529 int ret = 0;
530
531 write_lock_bh(&sk->sk_callback_lock);
532 if (sk->sk_user_data == sc) {
533 ret = 1;
534 sk->sk_user_data = NULL;
535 sk->sk_data_ready = sc->sc_data_ready;
536 sk->sk_state_change = sc->sc_state_change;
537 }
538 write_unlock_bh(&sk->sk_callback_lock);
539
540 return ret;
541}
542
543/*
544 * this is a little helper that is called by callers who have seen a problem
545 * with an sc and want to detach it from the nn if someone already hasn't beat
546 * them to it. if an error is given then the shutdown will be persistent
547 * and pending transmits will be canceled.
548 */
549static void o2net_ensure_shutdown(struct o2net_node *nn,
550 struct o2net_sock_container *sc,
551 int err)
552{
553 spin_lock(&nn->nn_lock);
554 if (nn->nn_sc == sc)
555 o2net_set_nn_state(nn, NULL, 0, err);
556 spin_unlock(&nn->nn_lock);
557}
558
559/*
560 * This work queue function performs the blocking parts of socket shutdown. A
561 * few paths lead here. set_nn_state will trigger this callback if it sees an
562 * sc detached from the nn. state_change will also trigger this callback
563 * directly when it sees errors. In that case we need to call set_nn_state
564 * ourselves as state_change couldn't get the nn_lock and call set_nn_state
565 * itself.
566 */
567static void o2net_shutdown_sc(void *arg)
568{
569 struct o2net_sock_container *sc = arg;
570 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
571
572 sclog(sc, "shutting down\n");
573
574 /* drop the callbacks ref and call shutdown only once */
575 if (o2net_unregister_callbacks(sc->sc_sock->sk, sc)) {
576 /* we shouldn't flush as we're in the thread, the
577 * races with pending sc work structs are harmless */
578 del_timer_sync(&sc->sc_idle_timeout);
579 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
580 sc_put(sc);
581 sc->sc_sock->ops->shutdown(sc->sc_sock,
582 RCV_SHUTDOWN|SEND_SHUTDOWN);
583 }
584
585 /* not fatal so failed connects before the other guy has our
586 * heartbeat can be retried */
587 o2net_ensure_shutdown(nn, sc, 0);
588 sc_put(sc);
589}
590
591/* ------------------------------------------------------------ */
592
593static int o2net_handler_cmp(struct o2net_msg_handler *nmh, u32 msg_type,
594 u32 key)
595{
596 int ret = memcmp(&nmh->nh_key, &key, sizeof(key));
597
598 if (ret == 0)
599 ret = memcmp(&nmh->nh_msg_type, &msg_type, sizeof(msg_type));
600
601 return ret;
602}
603
604static struct o2net_msg_handler *
605o2net_handler_tree_lookup(u32 msg_type, u32 key, struct rb_node ***ret_p,
606 struct rb_node **ret_parent)
607{
608 struct rb_node **p = &o2net_handler_tree.rb_node;
609 struct rb_node *parent = NULL;
610 struct o2net_msg_handler *nmh, *ret = NULL;
611 int cmp;
612
613 while (*p) {
614 parent = *p;
615 nmh = rb_entry(parent, struct o2net_msg_handler, nh_node);
616 cmp = o2net_handler_cmp(nmh, msg_type, key);
617
618 if (cmp < 0)
619 p = &(*p)->rb_left;
620 else if (cmp > 0)
621 p = &(*p)->rb_right;
622 else {
623 ret = nmh;
624 break;
625 }
626 }
627
628 if (ret_p != NULL)
629 *ret_p = p;
630 if (ret_parent != NULL)
631 *ret_parent = parent;
632
633 return ret;
634}
635
636static void o2net_handler_kref_release(struct kref *kref)
637{
638 struct o2net_msg_handler *nmh;
639 nmh = container_of(kref, struct o2net_msg_handler, nh_kref);
640
641 kfree(nmh);
642}
643
644static void o2net_handler_put(struct o2net_msg_handler *nmh)
645{
646 kref_put(&nmh->nh_kref, o2net_handler_kref_release);
647}
648
649/* max_len is protection for the handler func. incoming messages won't
650 * be given to the handler if their payload is longer than the max. */
651int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
652 o2net_msg_handler_func *func, void *data,
653 struct list_head *unreg_list)
654{
655 struct o2net_msg_handler *nmh = NULL;
656 struct rb_node **p, *parent;
657 int ret = 0;
658
659 if (max_len > O2NET_MAX_PAYLOAD_BYTES) {
660 mlog(0, "max_len for message handler out of range: %u\n",
661 max_len);
662 ret = -EINVAL;
663 goto out;
664 }
665
666 if (!msg_type) {
667 mlog(0, "no message type provided: %u, %p\n", msg_type, func);
668 ret = -EINVAL;
669 goto out;
670
671 }
672 if (!func) {
673 mlog(0, "no message handler provided: %u, %p\n",
674 msg_type, func);
675 ret = -EINVAL;
676 goto out;
677 }
678
679 nmh = kcalloc(1, sizeof(struct o2net_msg_handler), GFP_NOFS);
680 if (nmh == NULL) {
681 ret = -ENOMEM;
682 goto out;
683 }
684
685 nmh->nh_func = func;
686 nmh->nh_func_data = data;
687 nmh->nh_msg_type = msg_type;
688 nmh->nh_max_len = max_len;
689 nmh->nh_key = key;
690 /* the tree and list get this ref.. they're both removed in
691 * unregister when this ref is dropped */
692 kref_init(&nmh->nh_kref);
693 INIT_LIST_HEAD(&nmh->nh_unregister_item);
694
695 write_lock(&o2net_handler_lock);
696 if (o2net_handler_tree_lookup(msg_type, key, &p, &parent))
697 ret = -EEXIST;
698 else {
699 rb_link_node(&nmh->nh_node, parent, p);
700 rb_insert_color(&nmh->nh_node, &o2net_handler_tree);
701 list_add_tail(&nmh->nh_unregister_item, unreg_list);
702
703 mlog(ML_TCP, "registered handler func %p type %u key %08x\n",
704 func, msg_type, key);
705 /* we've had some trouble with handlers seemingly vanishing. */
706 mlog_bug_on_msg(o2net_handler_tree_lookup(msg_type, key, &p,
707 &parent) == NULL,
708 "couldn't find handler we *just* registerd "
709 "for type %u key %08x\n", msg_type, key);
710 }
711 write_unlock(&o2net_handler_lock);
712 if (ret)
713 goto out;
714
715out:
716 if (ret)
717 kfree(nmh);
718
719 return ret;
720}
721EXPORT_SYMBOL_GPL(o2net_register_handler);
722
723void o2net_unregister_handler_list(struct list_head *list)
724{
725 struct list_head *pos, *n;
726 struct o2net_msg_handler *nmh;
727
728 write_lock(&o2net_handler_lock);
729 list_for_each_safe(pos, n, list) {
730 nmh = list_entry(pos, struct o2net_msg_handler,
731 nh_unregister_item);
732 mlog(ML_TCP, "unregistering handler func %p type %u key %08x\n",
733 nmh->nh_func, nmh->nh_msg_type, nmh->nh_key);
734 rb_erase(&nmh->nh_node, &o2net_handler_tree);
735 list_del_init(&nmh->nh_unregister_item);
736 kref_put(&nmh->nh_kref, o2net_handler_kref_release);
737 }
738 write_unlock(&o2net_handler_lock);
739}
740EXPORT_SYMBOL_GPL(o2net_unregister_handler_list);
741
742static struct o2net_msg_handler *o2net_handler_get(u32 msg_type, u32 key)
743{
744 struct o2net_msg_handler *nmh;
745
746 read_lock(&o2net_handler_lock);
747 nmh = o2net_handler_tree_lookup(msg_type, key, NULL, NULL);
748 if (nmh)
749 kref_get(&nmh->nh_kref);
750 read_unlock(&o2net_handler_lock);
751
752 return nmh;
753}
754
755/* ------------------------------------------------------------ */
756
757static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len)
758{
759 int ret;
760 mm_segment_t oldfs;
761 struct kvec vec = {
762 .iov_len = len,
763 .iov_base = data,
764 };
765 struct msghdr msg = {
766 .msg_iovlen = 1,
767 .msg_iov = (struct iovec *)&vec,
768 .msg_flags = MSG_DONTWAIT,
769 };
770
771 oldfs = get_fs();
772 set_fs(get_ds());
773 ret = sock_recvmsg(sock, &msg, len, msg.msg_flags);
774 set_fs(oldfs);
775
776 return ret;
777}
778
779static int o2net_send_tcp_msg(struct socket *sock, struct kvec *vec,
780 size_t veclen, size_t total)
781{
782 int ret;
783 mm_segment_t oldfs;
784 struct msghdr msg = {
785 .msg_iov = (struct iovec *)vec,
786 .msg_iovlen = veclen,
787 };
788
789 if (sock == NULL) {
790 ret = -EINVAL;
791 goto out;
792 }
793
794 oldfs = get_fs();
795 set_fs(get_ds());
796 ret = sock_sendmsg(sock, &msg, total);
797 set_fs(oldfs);
798 if (ret != total) {
799 mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret,
800 total);
801 if (ret >= 0)
802 ret = -EPIPE; /* should be smarter, I bet */
803 goto out;
804 }
805
806 ret = 0;
807out:
808 if (ret < 0)
809 mlog(0, "returning error: %d\n", ret);
810 return ret;
811}
812
813static void o2net_sendpage(struct o2net_sock_container *sc,
814 void *kmalloced_virt,
815 size_t size)
816{
817 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
818 ssize_t ret;
819
820
821 ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
822 virt_to_page(kmalloced_virt),
823 (long)kmalloced_virt & ~PAGE_MASK,
824 size, MSG_DONTWAIT);
825 if (ret != size) {
826 mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT
827 " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret);
828 o2net_ensure_shutdown(nn, sc, 0);
829 }
830}
831
832static void o2net_init_msg(struct o2net_msg *msg, u16 data_len, u16 msg_type, u32 key)
833{
834 memset(msg, 0, sizeof(struct o2net_msg));
835 msg->magic = cpu_to_be16(O2NET_MSG_MAGIC);
836 msg->data_len = cpu_to_be16(data_len);
837 msg->msg_type = cpu_to_be16(msg_type);
838 msg->sys_status = cpu_to_be32(O2NET_ERR_NONE);
839 msg->status = 0;
840 msg->key = cpu_to_be32(key);
841}
842
843static int o2net_tx_can_proceed(struct o2net_node *nn,
844 struct o2net_sock_container **sc_ret,
845 int *error)
846{
847 int ret = 0;
848
849 spin_lock(&nn->nn_lock);
850 if (nn->nn_persistent_error) {
851 ret = 1;
852 *sc_ret = NULL;
853 *error = nn->nn_persistent_error;
854 } else if (nn->nn_sc_valid) {
855 kref_get(&nn->nn_sc->sc_kref);
856
857 ret = 1;
858 *sc_ret = nn->nn_sc;
859 *error = 0;
860 }
861 spin_unlock(&nn->nn_lock);
862
863 return ret;
864}
865
866int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec,
867 size_t caller_veclen, u8 target_node, int *status)
868{
869 int ret, error = 0;
870 struct o2net_msg *msg = NULL;
871 size_t veclen, caller_bytes = 0;
872 struct kvec *vec = NULL;
873 struct o2net_sock_container *sc = NULL;
874 struct o2net_node *nn = o2net_nn_from_num(target_node);
875 struct o2net_status_wait nsw = {
876 .ns_node_item = LIST_HEAD_INIT(nsw.ns_node_item),
877 };
878
879 if (o2net_wq == NULL) {
880 mlog(0, "attempt to tx without o2netd running\n");
881 ret = -ESRCH;
882 goto out;
883 }
884
885 if (caller_veclen == 0) {
886 mlog(0, "bad kvec array length\n");
887 ret = -EINVAL;
888 goto out;
889 }
890
891 caller_bytes = iov_length((struct iovec *)caller_vec, caller_veclen);
892 if (caller_bytes > O2NET_MAX_PAYLOAD_BYTES) {
893 mlog(0, "total payload len %zu too large\n", caller_bytes);
894 ret = -EINVAL;
895 goto out;
896 }
897
898 if (target_node == o2nm_this_node()) {
899 ret = -ELOOP;
900 goto out;
901 }
902
903 ret = wait_event_interruptible(nn->nn_sc_wq,
904 o2net_tx_can_proceed(nn, &sc, &error));
905 if (!ret && error)
906 ret = error;
907 if (ret)
908 goto out;
909
910 veclen = caller_veclen + 1;
911 vec = kmalloc(sizeof(struct kvec) * veclen, GFP_ATOMIC);
912 if (vec == NULL) {
913 mlog(0, "failed to %zu element kvec!\n", veclen);
914 ret = -ENOMEM;
915 goto out;
916 }
917
918 msg = kmalloc(sizeof(struct o2net_msg), GFP_ATOMIC);
919 if (!msg) {
920 mlog(0, "failed to allocate a o2net_msg!\n");
921 ret = -ENOMEM;
922 goto out;
923 }
924
925 o2net_init_msg(msg, caller_bytes, msg_type, key);
926
927 vec[0].iov_len = sizeof(struct o2net_msg);
928 vec[0].iov_base = msg;
929 memcpy(&vec[1], caller_vec, caller_veclen * sizeof(struct kvec));
930
931 ret = o2net_prep_nsw(nn, &nsw);
932 if (ret)
933 goto out;
934
935 msg->msg_num = cpu_to_be32(nsw.ns_id);
936
937 /* finally, convert the message header to network byte-order
938 * and send */
939 ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen,
940 sizeof(struct o2net_msg) + caller_bytes);
941 msglog(msg, "sending returned %d\n", ret);
942 if (ret < 0) {
943 mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret);
944 goto out;
945 }
946
947 /* wait on other node's handler */
948 wait_event(nsw.ns_wq, o2net_nsw_completed(nn, &nsw));
949
950 /* Note that we avoid overwriting the callers status return
951 * variable if a system error was reported on the other
952 * side. Callers beware. */
953 ret = o2net_sys_err_to_errno(nsw.ns_sys_status);
954 if (status && !ret)
955 *status = nsw.ns_status;
956
957 mlog(0, "woken, returning system status %d, user status %d\n",
958 ret, nsw.ns_status);
959out:
960 if (sc)
961 sc_put(sc);
962 if (vec)
963 kfree(vec);
964 if (msg)
965 kfree(msg);
966 o2net_complete_nsw(nn, &nsw, 0, 0, 0);
967 return ret;
968}
969EXPORT_SYMBOL_GPL(o2net_send_message_vec);
970
971int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len,
972 u8 target_node, int *status)
973{
974 struct kvec vec = {
975 .iov_base = data,
976 .iov_len = len,
977 };
978 return o2net_send_message_vec(msg_type, key, &vec, 1,
979 target_node, status);
980}
981EXPORT_SYMBOL_GPL(o2net_send_message);
982
983static int o2net_send_status_magic(struct socket *sock, struct o2net_msg *hdr,
984 enum o2net_system_error syserr, int err)
985{
986 struct kvec vec = {
987 .iov_base = hdr,
988 .iov_len = sizeof(struct o2net_msg),
989 };
990
991 BUG_ON(syserr >= O2NET_ERR_MAX);
992
993 /* leave other fields intact from the incoming message, msg_num
994 * in particular */
995 hdr->sys_status = cpu_to_be32(syserr);
996 hdr->status = cpu_to_be32(err);
997 hdr->magic = cpu_to_be16(O2NET_MSG_STATUS_MAGIC); // twiddle the magic
998 hdr->data_len = 0;
999
1000 msglog(hdr, "about to send status magic %d\n", err);
1001 /* hdr has been in host byteorder this whole time */
1002 return o2net_send_tcp_msg(sock, &vec, 1, sizeof(struct o2net_msg));
1003}
1004
1005/* this returns -errno if the header was unknown or too large, etc.
1006 * after this is called the buffer us reused for the next message */
1007static int o2net_process_message(struct o2net_sock_container *sc,
1008 struct o2net_msg *hdr)
1009{
1010 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
1011 int ret = 0, handler_status;
1012 enum o2net_system_error syserr;
1013 struct o2net_msg_handler *nmh = NULL;
1014
1015 msglog(hdr, "processing message\n");
1016
1017 o2net_sc_postpone_idle(sc);
1018
1019 switch(be16_to_cpu(hdr->magic)) {
1020 case O2NET_MSG_STATUS_MAGIC:
1021 /* special type for returning message status */
1022 o2net_complete_nsw(nn, NULL,
1023 be32_to_cpu(hdr->msg_num),
1024 be32_to_cpu(hdr->sys_status),
1025 be32_to_cpu(hdr->status));
1026 goto out;
1027 case O2NET_MSG_KEEP_REQ_MAGIC:
1028 o2net_sendpage(sc, o2net_keep_resp,
1029 sizeof(*o2net_keep_resp));
1030 goto out;
1031 case O2NET_MSG_KEEP_RESP_MAGIC:
1032 goto out;
1033 case O2NET_MSG_MAGIC:
1034 break;
1035 default:
1036 msglog(hdr, "bad magic\n");
1037 ret = -EINVAL;
1038 goto out;
1039 break;
1040 }
1041
1042 /* find a handler for it */
1043 handler_status = 0;
1044 nmh = o2net_handler_get(be16_to_cpu(hdr->msg_type),
1045 be32_to_cpu(hdr->key));
1046 if (!nmh) {
1047 mlog(ML_TCP, "couldn't find handler for type %u key %08x\n",
1048 be16_to_cpu(hdr->msg_type), be32_to_cpu(hdr->key));
1049 syserr = O2NET_ERR_NO_HNDLR;
1050 goto out_respond;
1051 }
1052
1053 syserr = O2NET_ERR_NONE;
1054
1055 if (be16_to_cpu(hdr->data_len) > nmh->nh_max_len)
1056 syserr = O2NET_ERR_OVERFLOW;
1057
1058 if (syserr != O2NET_ERR_NONE)
1059 goto out_respond;
1060
1061 do_gettimeofday(&sc->sc_tv_func_start);
1062 sc->sc_msg_key = be32_to_cpu(hdr->key);
1063 sc->sc_msg_type = be16_to_cpu(hdr->msg_type);
1064 handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) +
1065 be16_to_cpu(hdr->data_len),
1066 nmh->nh_func_data);
1067 do_gettimeofday(&sc->sc_tv_func_stop);
1068
1069out_respond:
1070 /* this destroys the hdr, so don't use it after this */
1071 ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr,
1072 handler_status);
1073 hdr = NULL;
1074 mlog(0, "sending handler status %d, syserr %d returned %d\n",
1075 handler_status, syserr, ret);
1076
1077out:
1078 if (nmh)
1079 o2net_handler_put(nmh);
1080 return ret;
1081}
1082
1083static int o2net_check_handshake(struct o2net_sock_container *sc)
1084{
1085 struct o2net_handshake *hand = page_address(sc->sc_page);
1086 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
1087
1088 if (hand->protocol_version != cpu_to_be64(O2NET_PROTOCOL_VERSION)) {
1089 mlog(ML_NOTICE, SC_NODEF_FMT " advertised net protocol "
1090 "version %llu but %llu is required, disconnecting\n",
1091 SC_NODEF_ARGS(sc),
1092 (unsigned long long)be64_to_cpu(hand->protocol_version),
1093 O2NET_PROTOCOL_VERSION);
1094
1095 /* don't bother reconnecting if its the wrong version. */
1096 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1097 return -1;
1098 }
1099
1100 sc->sc_handshake_ok = 1;
1101
1102 spin_lock(&nn->nn_lock);
1103 /* set valid and queue the idle timers only if it hasn't been
1104 * shut down already */
1105 if (nn->nn_sc == sc) {
1106 o2net_sc_postpone_idle(sc);
1107 o2net_set_nn_state(nn, sc, 1, 0);
1108 }
1109 spin_unlock(&nn->nn_lock);
1110
1111 /* shift everything up as though it wasn't there */
1112 sc->sc_page_off -= sizeof(struct o2net_handshake);
1113 if (sc->sc_page_off)
1114 memmove(hand, hand + 1, sc->sc_page_off);
1115
1116 return 0;
1117}
1118
1119/* this demuxes the queued rx bytes into header or payload bits and calls
1120 * handlers as each full message is read off the socket. it returns -error,
1121 * == 0 eof, or > 0 for progress made.*/
1122static int o2net_advance_rx(struct o2net_sock_container *sc)
1123{
1124 struct o2net_msg *hdr;
1125 int ret = 0;
1126 void *data;
1127 size_t datalen;
1128
1129 sclog(sc, "receiving\n");
1130 do_gettimeofday(&sc->sc_tv_advance_start);
1131
1132 /* do we need more header? */
1133 if (sc->sc_page_off < sizeof(struct o2net_msg)) {
1134 data = page_address(sc->sc_page) + sc->sc_page_off;
1135 datalen = sizeof(struct o2net_msg) - sc->sc_page_off;
1136 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1137 if (ret > 0) {
1138 sc->sc_page_off += ret;
1139
1140 /* this working relies on the handshake being
1141 * smaller than the normal message header */
1142 if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
1143 !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
1144 ret = -EPROTO;
1145 goto out;
1146 }
1147
1148 /* only swab incoming here.. we can
1149 * only get here once as we cross from
1150 * being under to over */
1151 if (sc->sc_page_off == sizeof(struct o2net_msg)) {
1152 hdr = page_address(sc->sc_page);
1153 if (be16_to_cpu(hdr->data_len) >
1154 O2NET_MAX_PAYLOAD_BYTES)
1155 ret = -EOVERFLOW;
1156 }
1157 }
1158 if (ret <= 0)
1159 goto out;
1160 }
1161
1162 if (sc->sc_page_off < sizeof(struct o2net_msg)) {
1163 /* oof, still don't have a header */
1164 goto out;
1165 }
1166
1167 /* this was swabbed above when we first read it */
1168 hdr = page_address(sc->sc_page);
1169
1170 msglog(hdr, "at page_off %zu\n", sc->sc_page_off);
1171
1172 /* do we need more payload? */
1173 if (sc->sc_page_off - sizeof(struct o2net_msg) < be16_to_cpu(hdr->data_len)) {
1174 /* need more payload */
1175 data = page_address(sc->sc_page) + sc->sc_page_off;
1176 datalen = (sizeof(struct o2net_msg) + be16_to_cpu(hdr->data_len)) -
1177 sc->sc_page_off;
1178 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1179 if (ret > 0)
1180 sc->sc_page_off += ret;
1181 if (ret <= 0)
1182 goto out;
1183 }
1184
1185 if (sc->sc_page_off - sizeof(struct o2net_msg) == be16_to_cpu(hdr->data_len)) {
1186 /* we can only get here once, the first time we read
1187 * the payload.. so set ret to progress if the handler
1188 * works out. after calling this the message is toast */
1189 ret = o2net_process_message(sc, hdr);
1190 if (ret == 0)
1191 ret = 1;
1192 sc->sc_page_off = 0;
1193 }
1194
1195out:
1196 sclog(sc, "ret = %d\n", ret);
1197 do_gettimeofday(&sc->sc_tv_advance_stop);
1198 return ret;
1199}
1200
1201/* this work func is triggerd by data ready. it reads until it can read no
1202 * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing
1203 * our work the work struct will be marked and we'll be called again. */
1204static void o2net_rx_until_empty(void *arg)
1205{
1206 struct o2net_sock_container *sc = arg;
1207 int ret;
1208
1209 do {
1210 ret = o2net_advance_rx(sc);
1211 } while (ret > 0);
1212
1213 if (ret <= 0 && ret != -EAGAIN) {
1214 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
1215 sclog(sc, "saw error %d, closing\n", ret);
1216 /* not permanent so read failed handshake can retry */
1217 o2net_ensure_shutdown(nn, sc, 0);
1218 }
1219
1220 sc_put(sc);
1221}
1222
1223static int o2net_set_nodelay(struct socket *sock)
1224{
1225 int ret, val = 1;
1226 mm_segment_t oldfs;
1227
1228 oldfs = get_fs();
1229 set_fs(KERNEL_DS);
1230
1231 /*
1232 * Dear unsuspecting programmer,
1233 *
1234 * Don't use sock_setsockopt() for SOL_TCP. It doesn't check its level
1235 * argument and assumes SOL_SOCKET so, say, your TCP_NODELAY will
1236 * silently turn into SO_DEBUG.
1237 *
1238 * Yours,
1239 * Keeper of hilariously fragile interfaces.
1240 */
1241 ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY,
1242 (char __user *)&val, sizeof(val));
1243
1244 set_fs(oldfs);
1245 return ret;
1246}
1247
1248/* ------------------------------------------------------------ */
1249
1250/* called when a connect completes and after a sock is accepted. the
1251 * rx path will see the response and mark the sc valid */
1252static void o2net_sc_connect_completed(void *arg)
1253{
1254 struct o2net_sock_container *sc = arg;
1255
1256 mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n",
1257 (unsigned long long)O2NET_PROTOCOL_VERSION,
1258 (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
1259
1260 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1261 sc_put(sc);
1262}
1263
1264/* this is called as a work_struct func. */
1265static void o2net_sc_send_keep_req(void *arg)
1266{
1267 struct o2net_sock_container *sc = arg;
1268
1269 o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req));
1270 sc_put(sc);
1271}
1272
1273/* socket shutdown does a del_timer_sync against this as it tears down.
1274 * we can't start this timer until we've got to the point in sc buildup
1275 * where shutdown is going to be involved */
1276static void o2net_idle_timer(unsigned long data)
1277{
1278 struct o2net_sock_container *sc = (struct o2net_sock_container *)data;
1279 struct timeval now;
1280
1281 do_gettimeofday(&now);
1282
1283 mlog(ML_NOTICE, "connection to " SC_NODEF_FMT " has been idle for 10 "
1284 "seconds, shutting it down.\n", SC_NODEF_ARGS(sc));
1285 mlog(ML_NOTICE, "here are some times that might help debug the "
1286 "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv "
1287 "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n",
1288 sc->sc_tv_timer.tv_sec, sc->sc_tv_timer.tv_usec,
1289 now.tv_sec, now.tv_usec,
1290 sc->sc_tv_data_ready.tv_sec, sc->sc_tv_data_ready.tv_usec,
1291 sc->sc_tv_advance_start.tv_sec, sc->sc_tv_advance_start.tv_usec,
1292 sc->sc_tv_advance_stop.tv_sec, sc->sc_tv_advance_stop.tv_usec,
1293 sc->sc_msg_key, sc->sc_msg_type,
1294 sc->sc_tv_func_start.tv_sec, sc->sc_tv_func_start.tv_usec,
1295 sc->sc_tv_func_stop.tv_sec, sc->sc_tv_func_stop.tv_usec);
1296
1297 o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
1298}
1299
1300static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
1301{
1302 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
1303 o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work,
1304 O2NET_KEEPALIVE_DELAY_SECS * HZ);
1305 do_gettimeofday(&sc->sc_tv_timer);
1306 mod_timer(&sc->sc_idle_timeout,
1307 jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ));
1308}
1309
1310/* this work func is kicked whenever a path sets the nn state which doesn't
1311 * have valid set. This includes seeing hb come up, losing a connection,
1312 * having a connect attempt fail, etc. This centralizes the logic which decides
1313 * if a connect attempt should be made or if we should give up and all future
1314 * transmit attempts should fail */
1315static void o2net_start_connect(void *arg)
1316{
1317 struct o2net_node *nn = arg;
1318 struct o2net_sock_container *sc = NULL;
1319 struct o2nm_node *node = NULL;
1320 struct socket *sock = NULL;
1321 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
1322 int ret = 0;
1323
1324 /* if we're greater we initiate tx, otherwise we accept */
1325 if (o2nm_this_node() <= o2net_num_from_nn(nn))
1326 goto out;
1327
1328 /* watch for racing with tearing a node down */
1329 node = o2nm_get_node_by_num(o2net_num_from_nn(nn));
1330 if (node == NULL) {
1331 ret = 0;
1332 goto out;
1333 }
1334
1335 spin_lock(&nn->nn_lock);
1336 /* see if we already have one pending or have given up */
1337 if (nn->nn_sc || nn->nn_persistent_error)
1338 arg = NULL;
1339 spin_unlock(&nn->nn_lock);
1340 if (arg == NULL) /* *shrug*, needed some indicator */
1341 goto out;
1342
1343 nn->nn_last_connect_attempt = jiffies;
1344
1345 sc = sc_alloc(node);
1346 if (sc == NULL) {
1347 mlog(0, "couldn't allocate sc\n");
1348 ret = -ENOMEM;
1349 goto out;
1350 }
1351
1352 ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1353 if (ret < 0) {
1354 mlog(0, "can't create socket: %d\n", ret);
1355 goto out;
1356 }
1357 sc->sc_sock = sock; /* freed by sc_kref_release */
1358
1359 sock->sk->sk_allocation = GFP_ATOMIC;
1360
1361 myaddr.sin_family = AF_INET;
1362 myaddr.sin_port = (__force u16)htons(0); /* any port */
1363
1364 ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr,
1365 sizeof(myaddr));
1366 if (ret) {
1367 mlog(0, "bind failed: %d\n", ret);
1368 goto out;
1369 }
1370
1371 ret = o2net_set_nodelay(sc->sc_sock);
1372 if (ret) {
1373 mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret);
1374 goto out;
1375 }
1376
1377 o2net_register_callbacks(sc->sc_sock->sk, sc);
1378
1379 spin_lock(&nn->nn_lock);
1380 /* handshake completion will set nn->nn_sc_valid */
1381 o2net_set_nn_state(nn, sc, 0, 0);
1382 spin_unlock(&nn->nn_lock);
1383
1384 remoteaddr.sin_family = AF_INET;
1385 remoteaddr.sin_addr.s_addr = (__force u32)node->nd_ipv4_address;
1386 remoteaddr.sin_port = (__force u16)node->nd_ipv4_port;
1387
1388 ret = sc->sc_sock->ops->connect(sc->sc_sock,
1389 (struct sockaddr *)&remoteaddr,
1390 sizeof(remoteaddr),
1391 O_NONBLOCK);
1392 if (ret == -EINPROGRESS)
1393 ret = 0;
1394
1395out:
1396 if (ret) {
1397 mlog(ML_NOTICE, "connect attempt to " SC_NODEF_FMT " failed "
1398 "with errno %d\n", SC_NODEF_ARGS(sc), ret);
1399 /* 0 err so that another will be queued and attempted
1400 * from set_nn_state */
1401 if (sc)
1402 o2net_ensure_shutdown(nn, sc, 0);
1403 }
1404 if (sc)
1405 sc_put(sc);
1406 if (node)
1407 o2nm_node_put(node);
1408
1409 return;
1410}
1411
1412static void o2net_connect_expired(void *arg)
1413{
1414 struct o2net_node *nn = arg;
1415
1416 spin_lock(&nn->nn_lock);
1417 if (!nn->nn_sc_valid) {
1418 mlog(ML_ERROR, "no connection established with node %u after "
1419 "%u seconds, giving up and returning errors.\n",
1420 o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS);
1421
1422 o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
1423 }
1424 spin_unlock(&nn->nn_lock);
1425}
1426
1427static void o2net_still_up(void *arg)
1428{
1429 struct o2net_node *nn = arg;
1430
1431 o2quo_hb_still_up(o2net_num_from_nn(nn));
1432}
1433
1434/* ------------------------------------------------------------ */
1435
1436void o2net_disconnect_node(struct o2nm_node *node)
1437{
1438 struct o2net_node *nn = o2net_nn_from_num(node->nd_num);
1439
1440 /* don't reconnect until it's heartbeating again */
1441 spin_lock(&nn->nn_lock);
1442 o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
1443 spin_unlock(&nn->nn_lock);
1444
1445 if (o2net_wq) {
1446 cancel_delayed_work(&nn->nn_connect_expired);
1447 cancel_delayed_work(&nn->nn_connect_work);
1448 cancel_delayed_work(&nn->nn_still_up);
1449 flush_workqueue(o2net_wq);
1450 }
1451}
1452
1453static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
1454 void *data)
1455{
1456 o2quo_hb_down(node_num);
1457
1458 if (node_num != o2nm_this_node())
1459 o2net_disconnect_node(node);
1460}
1461
1462static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
1463 void *data)
1464{
1465 struct o2net_node *nn = o2net_nn_from_num(node_num);
1466
1467 o2quo_hb_up(node_num);
1468
1469 /* ensure an immediate connect attempt */
1470 nn->nn_last_connect_attempt = jiffies -
1471 (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1);
1472
1473 if (node_num != o2nm_this_node()) {
1474 /* heartbeat doesn't work unless a local node number is
1475 * configured and doing so brings up the o2net_wq, so we can
1476 * use it.. */
1477 queue_delayed_work(o2net_wq, &nn->nn_connect_expired,
1478 O2NET_IDLE_TIMEOUT_SECS * HZ);
1479
1480 /* believe it or not, accept and node hearbeating testing
1481 * can succeed for this node before we got here.. so
1482 * only use set_nn_state to clear the persistent error
1483 * if that hasn't already happened */
1484 spin_lock(&nn->nn_lock);
1485 if (nn->nn_persistent_error)
1486 o2net_set_nn_state(nn, NULL, 0, 0);
1487 spin_unlock(&nn->nn_lock);
1488 }
1489}
1490
1491void o2net_unregister_hb_callbacks(void)
1492{
1493 int ret;
1494
1495 ret = o2hb_unregister_callback(&o2net_hb_up);
1496 if (ret < 0)
1497 mlog(ML_ERROR, "Status return %d unregistering heartbeat up "
1498 "callback!\n", ret);
1499
1500 ret = o2hb_unregister_callback(&o2net_hb_down);
1501 if (ret < 0)
1502 mlog(ML_ERROR, "Status return %d unregistering heartbeat down "
1503 "callback!\n", ret);
1504}
1505
1506int o2net_register_hb_callbacks(void)
1507{
1508 int ret;
1509
1510 o2hb_setup_callback(&o2net_hb_down, O2HB_NODE_DOWN_CB,
1511 o2net_hb_node_down_cb, NULL, O2NET_HB_PRI);
1512 o2hb_setup_callback(&o2net_hb_up, O2HB_NODE_UP_CB,
1513 o2net_hb_node_up_cb, NULL, O2NET_HB_PRI);
1514
1515 ret = o2hb_register_callback(&o2net_hb_up);
1516 if (ret == 0)
1517 ret = o2hb_register_callback(&o2net_hb_down);
1518
1519 if (ret)
1520 o2net_unregister_hb_callbacks();
1521
1522 return ret;
1523}
1524
1525/* ------------------------------------------------------------ */
1526
1527static int o2net_accept_one(struct socket *sock)
1528{
1529 int ret, slen;
1530 struct sockaddr_in sin;
1531 struct socket *new_sock = NULL;
1532 struct o2nm_node *node = NULL;
1533 struct o2net_sock_container *sc = NULL;
1534 struct o2net_node *nn;
1535
1536 BUG_ON(sock == NULL);
1537 ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
1538 sock->sk->sk_protocol, &new_sock);
1539 if (ret)
1540 goto out;
1541
1542 new_sock->type = sock->type;
1543 new_sock->ops = sock->ops;
1544 ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);
1545 if (ret < 0)
1546 goto out;
1547
1548 new_sock->sk->sk_allocation = GFP_ATOMIC;
1549
1550 ret = o2net_set_nodelay(new_sock);
1551 if (ret) {
1552 mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret);
1553 goto out;
1554 }
1555
1556 slen = sizeof(sin);
1557 ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
1558 &slen, 1);
1559 if (ret < 0)
1560 goto out;
1561
1562 node = o2nm_get_node_by_ip((__force __be32)sin.sin_addr.s_addr);
1563 if (node == NULL) {
1564 mlog(ML_NOTICE, "attempt to connect from unknown node at "
1565 "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr),
1566 ntohs((__force __be16)sin.sin_port));
1567 ret = -EINVAL;
1568 goto out;
1569 }
1570
1571 if (o2nm_this_node() > node->nd_num) {
1572 mlog(ML_NOTICE, "unexpected connect attempted from a lower "
1573 "numbered node '%s' at " "%u.%u.%u.%u:%d with num %u\n",
1574 node->nd_name, NIPQUAD(sin.sin_addr.s_addr),
1575 ntohs((__force __be16)sin.sin_port), node->nd_num);
1576 ret = -EINVAL;
1577 goto out;
1578 }
1579
1580 /* this happens all the time when the other node sees our heartbeat
1581 * and tries to connect before we see their heartbeat */
1582 if (!o2hb_check_node_heartbeating_from_callback(node->nd_num)) {
1583 mlog(ML_CONN, "attempt to connect from node '%s' at "
1584 "%u.%u.%u.%u:%d but it isn't heartbeating\n",
1585 node->nd_name, NIPQUAD(sin.sin_addr.s_addr),
1586 ntohs((__force __be16)sin.sin_port));
1587 ret = -EINVAL;
1588 goto out;
1589 }
1590
1591 nn = o2net_nn_from_num(node->nd_num);
1592
1593 spin_lock(&nn->nn_lock);
1594 if (nn->nn_sc)
1595 ret = -EBUSY;
1596 else
1597 ret = 0;
1598 spin_unlock(&nn->nn_lock);
1599 if (ret) {
1600 mlog(ML_NOTICE, "attempt to connect from node '%s' at "
1601 "%u.%u.%u.%u:%d but it already has an open connection\n",
1602 node->nd_name, NIPQUAD(sin.sin_addr.s_addr),
1603 ntohs((__force __be16)sin.sin_port));
1604 goto out;
1605 }
1606
1607 sc = sc_alloc(node);
1608 if (sc == NULL) {
1609 ret = -ENOMEM;
1610 goto out;
1611 }
1612
1613 sc->sc_sock = new_sock;
1614 new_sock = NULL;
1615
1616 spin_lock(&nn->nn_lock);
1617 o2net_set_nn_state(nn, sc, 0, 0);
1618 spin_unlock(&nn->nn_lock);
1619
1620 o2net_register_callbacks(sc->sc_sock->sk, sc);
1621 o2net_sc_queue_work(sc, &sc->sc_rx_work);
1622
1623 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1624
1625out:
1626 if (new_sock)
1627 sock_release(new_sock);
1628 if (node)
1629 o2nm_node_put(node);
1630 if (sc)
1631 sc_put(sc);
1632 return ret;
1633}
1634
1635static void o2net_accept_many(void *arg)
1636{
1637 struct socket *sock = arg;
1638 while (o2net_accept_one(sock) == 0)
1639 cond_resched();
1640}
1641
1642static void o2net_listen_data_ready(struct sock *sk, int bytes)
1643{
1644 void (*ready)(struct sock *sk, int bytes);
1645
1646 read_lock(&sk->sk_callback_lock);
1647 ready = sk->sk_user_data;
1648 if (ready == NULL) { /* check for teardown race */
1649 ready = sk->sk_data_ready;
1650 goto out;
1651 }
1652
1653 /* ->sk_data_ready is also called for a newly established child socket
1654 * before it has been accepted and the acceptor has set up their
1655 * data_ready.. we only want to queue listen work for our listening
1656 * socket */
1657 if (sk->sk_state == TCP_LISTEN) {
1658 mlog(ML_TCP, "bytes: %d\n", bytes);
1659 queue_work(o2net_wq, &o2net_listen_work);
1660 }
1661
1662out:
1663 read_unlock(&sk->sk_callback_lock);
1664 ready(sk, bytes);
1665}
1666
1667static int o2net_open_listening_sock(__be16 port)
1668{
1669 struct socket *sock = NULL;
1670 int ret;
1671 struct sockaddr_in sin = {
1672 .sin_family = PF_INET,
1673 .sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) },
1674 .sin_port = (__force u16)port,
1675 };
1676
1677 ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1678 if (ret < 0) {
1679 mlog(ML_ERROR, "unable to create socket, ret=%d\n", ret);
1680 goto out;
1681 }
1682
1683 sock->sk->sk_allocation = GFP_ATOMIC;
1684
1685 write_lock_bh(&sock->sk->sk_callback_lock);
1686 sock->sk->sk_user_data = sock->sk->sk_data_ready;
1687 sock->sk->sk_data_ready = o2net_listen_data_ready;
1688 write_unlock_bh(&sock->sk->sk_callback_lock);
1689
1690 o2net_listen_sock = sock;
1691 INIT_WORK(&o2net_listen_work, o2net_accept_many, sock);
1692
1693 sock->sk->sk_reuse = 1;
1694 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
1695 if (ret < 0) {
1696 mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n",
1697 ntohs(port), ret);
1698 goto out;
1699 }
1700
1701 ret = sock->ops->listen(sock, 64);
1702 if (ret < 0) {
1703 mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n",
1704 ntohs(port), ret);
1705 }
1706
1707out:
1708 if (ret) {
1709 o2net_listen_sock = NULL;
1710 if (sock)
1711 sock_release(sock);
1712 }
1713 return ret;
1714}
1715
1716/*
1717 * called from node manager when we should bring up our network listening
1718 * socket. node manager handles all the serialization to only call this
1719 * once and to match it with o2net_stop_listening(). note,
1720 * o2nm_this_node() doesn't work yet as we're being called while it
1721 * is being set up.
1722 */
1723int o2net_start_listening(struct o2nm_node *node)
1724{
1725 int ret = 0;
1726
1727 BUG_ON(o2net_wq != NULL);
1728 BUG_ON(o2net_listen_sock != NULL);
1729
1730 mlog(ML_KTHREAD, "starting o2net thread...\n");
1731 o2net_wq = create_singlethread_workqueue("o2net");
1732 if (o2net_wq == NULL) {
1733 mlog(ML_ERROR, "unable to launch o2net thread\n");
1734 return -ENOMEM; /* ? */
1735 }
1736
1737 ret = o2net_open_listening_sock(node->nd_ipv4_port);
1738 if (ret) {
1739 destroy_workqueue(o2net_wq);
1740 o2net_wq = NULL;
1741 } else
1742 o2quo_conn_up(node->nd_num);
1743
1744 return ret;
1745}
1746
1747/* again, o2nm_this_node() doesn't work here as we're involved in
1748 * tearing it down */
1749void o2net_stop_listening(struct o2nm_node *node)
1750{
1751 struct socket *sock = o2net_listen_sock;
1752 size_t i;
1753
1754 BUG_ON(o2net_wq == NULL);
1755 BUG_ON(o2net_listen_sock == NULL);
1756
1757 /* stop the listening socket from generating work */
1758 write_lock_bh(&sock->sk->sk_callback_lock);
1759 sock->sk->sk_data_ready = sock->sk->sk_user_data;
1760 sock->sk->sk_user_data = NULL;
1761 write_unlock_bh(&sock->sk->sk_callback_lock);
1762
1763 for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
1764 struct o2nm_node *node = o2nm_get_node_by_num(i);
1765 if (node) {
1766 o2net_disconnect_node(node);
1767 o2nm_node_put(node);
1768 }
1769 }
1770
1771 /* finish all work and tear down the work queue */
1772 mlog(ML_KTHREAD, "waiting for o2net thread to exit....\n");
1773 destroy_workqueue(o2net_wq);
1774 o2net_wq = NULL;
1775
1776 sock_release(o2net_listen_sock);
1777 o2net_listen_sock = NULL;
1778
1779 o2quo_conn_err(node->nd_num);
1780}
1781
1782/* ------------------------------------------------------------ */
1783
1784int o2net_init(void)
1785{
1786 unsigned long i;
1787
1788 o2quo_init();
1789
1790 o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL);
1791 o2net_keep_req = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL);
1792 o2net_keep_resp = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL);
1793 if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) {
1794 kfree(o2net_hand);
1795 kfree(o2net_keep_req);
1796 kfree(o2net_keep_resp);
1797 return -ENOMEM;
1798 }
1799
1800 o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION);
1801 o2net_hand->connector_id = cpu_to_be64(1);
1802
1803 o2net_keep_req->magic = cpu_to_be16(O2NET_MSG_KEEP_REQ_MAGIC);
1804 o2net_keep_resp->magic = cpu_to_be16(O2NET_MSG_KEEP_RESP_MAGIC);
1805
1806 for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
1807 struct o2net_node *nn = o2net_nn_from_num(i);
1808
1809 spin_lock_init(&nn->nn_lock);
1810 INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn);
1811 INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn);
1812 INIT_WORK(&nn->nn_still_up, o2net_still_up, nn);
1813 /* until we see hb from a node we'll return einval */
1814 nn->nn_persistent_error = -ENOTCONN;
1815 init_waitqueue_head(&nn->nn_sc_wq);
1816 idr_init(&nn->nn_status_idr);
1817 INIT_LIST_HEAD(&nn->nn_status_list);
1818 }
1819
1820 return 0;
1821}
1822
1823void o2net_exit(void)
1824{
1825 o2quo_exit();
1826 kfree(o2net_hand);
1827 kfree(o2net_keep_req);
1828 kfree(o2net_keep_resp);
1829}
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
new file mode 100644
index 000000000000..a6f4585501c8
--- /dev/null
+++ b/fs/ocfs2/cluster/tcp.h
@@ -0,0 +1,113 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * tcp.h
5 *
6 * Function prototypes
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27#ifndef O2CLUSTER_TCP_H
28#define O2CLUSTER_TCP_H
29
30#include <linux/socket.h>
31#ifdef __KERNEL__
32#include <net/sock.h>
33#include <linux/tcp.h>
34#else
35#include <sys/socket.h>
36#endif
37#include <linux/inet.h>
38#include <linux/in.h>
39
40struct o2net_msg
41{
42 __be16 magic;
43 __be16 data_len;
44 __be16 msg_type;
45 __be16 pad1;
46 __be32 sys_status;
47 __be32 status;
48 __be32 key;
49 __be32 msg_num;
50 __u8 buf[0];
51};
52
53typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data);
54
55#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(struct o2net_msg))
56
57/* TODO: figure this out.... */
58static inline int o2net_link_down(int err, struct socket *sock)
59{
60 if (sock) {
61 if (sock->sk->sk_state != TCP_ESTABLISHED &&
62 sock->sk->sk_state != TCP_CLOSE_WAIT)
63 return 1;
64 }
65
66 if (err >= 0)
67 return 0;
68 switch (err) {
69 /* ????????????????????????? */
70 case -ERESTARTSYS:
71 case -EBADF:
72 /* When the server has died, an ICMP port unreachable
73 * message prompts ECONNREFUSED. */
74 case -ECONNREFUSED:
75 case -ENOTCONN:
76 case -ECONNRESET:
77 case -EPIPE:
78 return 1;
79 }
80 return 0;
81}
82
83enum {
84 O2NET_DRIVER_UNINITED,
85 O2NET_DRIVER_READY,
86};
87
88int o2net_init_tcp_sock(struct inode *inode);
89int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len,
90 u8 target_node, int *status);
91int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *vec,
92 size_t veclen, u8 target_node, int *status);
93int o2net_broadcast_message(u32 msg_type, u32 key, void *data, u32 len,
94 struct inode *group);
95
96int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
97 o2net_msg_handler_func *func, void *data,
98 struct list_head *unreg_list);
99void o2net_unregister_handler_list(struct list_head *list);
100
101struct o2nm_node;
102int o2net_register_hb_callbacks(void);
103void o2net_unregister_hb_callbacks(void);
104int o2net_start_listening(struct o2nm_node *node);
105void o2net_stop_listening(struct o2nm_node *node);
106void o2net_disconnect_node(struct o2nm_node *node);
107
108int o2net_init(void);
109void o2net_exit(void);
110int o2net_proc_init(struct proc_dir_entry *parent);
111void o2net_proc_exit(struct proc_dir_entry *parent);
112
113#endif /* O2CLUSTER_TCP_H */
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
new file mode 100644
index 000000000000..ff9e2e2104c2
--- /dev/null
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -0,0 +1,174 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * Copyright (C) 2005 Oracle. All rights reserved.
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public
17 * License along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA.
20 */
21
22#ifndef O2CLUSTER_TCP_INTERNAL_H
23#define O2CLUSTER_TCP_INTERNAL_H
24
25#define O2NET_MSG_MAGIC ((u16)0xfa55)
26#define O2NET_MSG_STATUS_MAGIC ((u16)0xfa56)
27#define O2NET_MSG_KEEP_REQ_MAGIC ((u16)0xfa57)
28#define O2NET_MSG_KEEP_RESP_MAGIC ((u16)0xfa58)
29
30/* same as hb delay, we're waiting for another node to recognize our hb */
31#define O2NET_RECONNECT_DELAY_MS O2HB_REGION_TIMEOUT_MS
32
33/* we're delaying our quorum decision so that heartbeat will have timed
34 * out truly dead nodes by the time we come around to making decisions
35 * on their number */
36#define O2NET_QUORUM_DELAY_MS ((o2hb_dead_threshold + 2) * O2HB_REGION_TIMEOUT_MS)
37
38#define O2NET_KEEPALIVE_DELAY_SECS 5
39#define O2NET_IDLE_TIMEOUT_SECS 10
40
41/*
42 * This version number represents quite a lot, unfortunately. It not
43 * only represents the raw network message protocol on the wire but also
44 * locking semantics of the file system using the protocol. It should
45 * be somewhere else, I'm sure, but right now it isn't.
46 *
47 * New in version 2:
48 * - full 64 bit i_size in the metadata lock lvbs
49 * - introduction of "rw" lock and pushing meta/data locking down
50 */
51#define O2NET_PROTOCOL_VERSION 2ULL
52struct o2net_handshake {
53 __be64 protocol_version;
54 __be64 connector_id;
55};
56
57struct o2net_node {
58 /* this is never called from int/bh */
59 spinlock_t nn_lock;
60
61 /* set the moment an sc is allocated and a connect is started */
62 struct o2net_sock_container *nn_sc;
63 /* _valid is only set after the handshake passes and tx can happen */
64 unsigned nn_sc_valid:1;
65 /* if this is set tx just returns it */
66 int nn_persistent_error;
67
68 /* threads waiting for an sc to arrive wait on the wq for generation
69 * to increase. it is increased when a connecting socket succeeds
70 * or fails or when an accepted socket is attached. */
71 wait_queue_head_t nn_sc_wq;
72
73 struct idr nn_status_idr;
74 struct list_head nn_status_list;
75
76 /* connects are attempted from when heartbeat comes up until either hb
77 * goes down, the node is unconfigured, no connect attempts succeed
78 * before O2NET_CONN_IDLE_DELAY, or a connect succeeds. connect_work
79 * is queued from set_nn_state both from hb up and from itself if a
80 * connect attempt fails and so can be self-arming. shutdown is
81 * careful to first mark the nn such that no connects will be attempted
82 * before canceling delayed connect work and flushing the queue. */
83 struct work_struct nn_connect_work;
84 unsigned long nn_last_connect_attempt;
85
86 /* this is queued as nodes come up and is canceled when a connection is
87 * established. this expiring gives up on the node and errors out
88 * transmits */
89 struct work_struct nn_connect_expired;
90
91 /* after we give up on a socket we wait a while before deciding
92 * that it is still heartbeating and that we should do some
93 * quorum work */
94 struct work_struct nn_still_up;
95};
96
97struct o2net_sock_container {
98 struct kref sc_kref;
99 /* the next two are vaild for the life time of the sc */
100 struct socket *sc_sock;
101 struct o2nm_node *sc_node;
102
103 /* all of these sc work structs hold refs on the sc while they are
104 * queued. they should not be able to ref a freed sc. the teardown
105 * race is with o2net_wq destruction in o2net_stop_listening() */
106
107 /* rx and connect work are generated from socket callbacks. sc
108 * shutdown removes the callbacks and then flushes the work queue */
109 struct work_struct sc_rx_work;
110 struct work_struct sc_connect_work;
111 /* shutdown work is triggered in two ways. the simple way is
112 * for a code path calls ensure_shutdown which gets a lock, removes
113 * the sc from the nn, and queues the work. in this case the
114 * work is single-shot. the work is also queued from a sock
115 * callback, though, and in this case the work will find the sc
116 * still on the nn and will call ensure_shutdown itself.. this
117 * ends up triggering the shutdown work again, though nothing
118 * will be done in that second iteration. so work queue teardown
119 * has to be careful to remove the sc from the nn before waiting
120 * on the work queue so that the shutdown work doesn't remove the
121 * sc and rearm itself.
122 */
123 struct work_struct sc_shutdown_work;
124
125 struct timer_list sc_idle_timeout;
126 struct work_struct sc_keepalive_work;
127
128 unsigned sc_handshake_ok:1;
129
130 struct page *sc_page;
131 size_t sc_page_off;
132
133 /* original handlers for the sockets */
134 void (*sc_state_change)(struct sock *sk);
135 void (*sc_data_ready)(struct sock *sk, int bytes);
136
137 struct timeval sc_tv_timer;
138 struct timeval sc_tv_data_ready;
139 struct timeval sc_tv_advance_start;
140 struct timeval sc_tv_advance_stop;
141 struct timeval sc_tv_func_start;
142 struct timeval sc_tv_func_stop;
143 u32 sc_msg_key;
144 u16 sc_msg_type;
145};
146
147struct o2net_msg_handler {
148 struct rb_node nh_node;
149 u32 nh_max_len;
150 u32 nh_msg_type;
151 u32 nh_key;
152 o2net_msg_handler_func *nh_func;
153 o2net_msg_handler_func *nh_func_data;
154 struct kref nh_kref;
155 struct list_head nh_unregister_item;
156};
157
158enum o2net_system_error {
159 O2NET_ERR_NONE = 0,
160 O2NET_ERR_NO_HNDLR,
161 O2NET_ERR_OVERFLOW,
162 O2NET_ERR_DIED,
163 O2NET_ERR_MAX
164};
165
166struct o2net_status_wait {
167 enum o2net_system_error ns_sys_status;
168 s32 ns_status;
169 int ns_id;
170 wait_queue_head_t ns_wq;
171 struct list_head ns_node_item;
172};
173
174#endif /* O2CLUSTER_TCP_INTERNAL_H */
diff --git a/fs/ocfs2/cluster/ver.c b/fs/ocfs2/cluster/ver.c
new file mode 100644
index 000000000000..7286c48bb30d
--- /dev/null
+++ b/fs/ocfs2/cluster/ver.c
@@ -0,0 +1,42 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * ver.c
5 *
6 * version string
7 *
8 * Copyright (C) 2002, 2005 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/module.h>
27#include <linux/kernel.h>
28
29#include "ver.h"
30
31#define CLUSTER_BUILD_VERSION "1.3.3"
32
33#define VERSION_STR "OCFS2 Node Manager " CLUSTER_BUILD_VERSION
34
35void cluster_print_version(void)
36{
37 printk(KERN_INFO "%s\n", VERSION_STR);
38}
39
40MODULE_DESCRIPTION(VERSION_STR);
41
42MODULE_VERSION(CLUSTER_BUILD_VERSION);
diff --git a/fs/ocfs2/cluster/ver.h b/fs/ocfs2/cluster/ver.h
new file mode 100644
index 000000000000..32554c3382c2
--- /dev/null
+++ b/fs/ocfs2/cluster/ver.h
@@ -0,0 +1,31 @@
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * ver.h
5 *
6 * Function prototypes
7 *
8 * Copyright (C) 2005 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#ifndef O2CLUSTER_VER_H
27#define O2CLUSTER_VER_H
28
29void cluster_print_version(void);
30
31#endif /* O2CLUSTER_VER_H */