aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@woody.osdl.org>2006-12-07 12:13:20 -0500
committerLinus Torvalds <torvalds@woody.osdl.org>2006-12-07 12:13:20 -0500
commit1c1afa3c053d4ccdf44e5a4e159005cdfd48bfc6 (patch)
tree3e686ad4cf1ae2300e7190ff83afc3f3dd4ba740 /fs/dlm
parent0a01707b289853f56d1c000057b27e243c039722 (diff)
parentac33d0710595579e3cfca42dde2257eb0b123f6d (diff)
Merge master.kernel.org:/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw
* master.kernel.org:/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw: (73 commits) [DLM] Clean up lowcomms [GFS2] Change gfs2_fsync() to use write_inode_now() [GFS2] Fix indent in recovery.c [GFS2] Don't flush everything on fdatasync [GFS2] Add a comment about reading the super block [GFS2] Mount problem with the GFS2 code [GFS2] Remove gfs2_check_acl() [DLM] fix format warnings in rcom.c and recoverd.c [GFS2] lock function parameter [DLM] don't accept replies to old recovery messages [DLM] fix size of STATUS_REPLY message [GFS2] fs/gfs2/log.c:log_bmap() fix printk format warning [DLM] fix add_requestqueue checking nodes list [GFS2] Fix recursive locking in gfs2_getattr [GFS2] Fix recursive locking in gfs2_permission [GFS2] Reduce number of arguments to meta_io.c:getbuf() [GFS2] Move gfs2_meta_syncfs() into log.c [GFS2] Fix journal flush problem [GFS2] mark_inode_dirty after write to stuffed file [GFS2] Fix glock ordering on inode creation ...
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/Kconfig20
-rw-r--r--fs/dlm/Makefile4
-rw-r--r--fs/dlm/dlm_internal.h4
-rw-r--r--fs/dlm/lock.c16
-rw-r--r--fs/dlm/lockspace.c4
-rw-r--r--fs/dlm/lowcomms-sctp.c (renamed from fs/dlm/lowcomms.c)264
-rw-r--r--fs/dlm/lowcomms-tcp.c1189
-rw-r--r--fs/dlm/lowcomms.h2
-rw-r--r--fs/dlm/main.c10
-rw-r--r--fs/dlm/member.c8
-rw-r--r--fs/dlm/rcom.c58
-rw-r--r--fs/dlm/recover.c1
-rw-r--r--fs/dlm/recoverd.c44
-rw-r--r--fs/dlm/requestqueue.c26
-rw-r--r--fs/dlm/requestqueue.h2
15 files changed, 1463 insertions, 189 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig
index 81b2c6465eeb..b5654a284fef 100644
--- a/fs/dlm/Kconfig
+++ b/fs/dlm/Kconfig
@@ -1,14 +1,32 @@
1menu "Distributed Lock Manager" 1menu "Distributed Lock Manager"
2 depends on INET && IP_SCTP && EXPERIMENTAL 2 depends on EXPERIMENTAL && INET
3 3
4config DLM 4config DLM
5 tristate "Distributed Lock Manager (DLM)" 5 tristate "Distributed Lock Manager (DLM)"
6 depends on IPV6 || IPV6=n 6 depends on IPV6 || IPV6=n
7 select CONFIGFS_FS 7 select CONFIGFS_FS
8 select IP_SCTP if DLM_SCTP
8 help 9 help
9 A general purpose distributed lock manager for kernel or userspace 10 A general purpose distributed lock manager for kernel or userspace
10 applications. 11 applications.
11 12
13choice
14 prompt "Select DLM communications protocol"
15 depends on DLM
16 default DLM_TCP
17 help
18 The DLM Can use TCP or SCTP for it's network communications.
19 SCTP supports multi-homed operations whereas TCP doesn't.
20 However, SCTP seems to have stability problems at the moment.
21
22config DLM_TCP
23 bool "TCP/IP"
24
25config DLM_SCTP
26 bool "SCTP"
27
28endchoice
29
12config DLM_DEBUG 30config DLM_DEBUG
13 bool "DLM debugging" 31 bool "DLM debugging"
14 depends on DLM 32 depends on DLM
diff --git a/fs/dlm/Makefile b/fs/dlm/Makefile
index 1832e0297f7d..65388944eba0 100644
--- a/fs/dlm/Makefile
+++ b/fs/dlm/Makefile
@@ -4,7 +4,6 @@ dlm-y := ast.o \
4 dir.o \ 4 dir.o \
5 lock.o \ 5 lock.o \
6 lockspace.o \ 6 lockspace.o \
7 lowcomms.o \
8 main.o \ 7 main.o \
9 member.o \ 8 member.o \
10 memory.o \ 9 memory.o \
@@ -17,3 +16,6 @@ dlm-y := ast.o \
17 util.o 16 util.o
18dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o 17dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o
19 18
19dlm-$(CONFIG_DLM_TCP) += lowcomms-tcp.o
20
21dlm-$(CONFIG_DLM_SCTP) += lowcomms-sctp.o \ No newline at end of file
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 1e5cd67e1b7a..1ee8195e6fc0 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -471,6 +471,7 @@ struct dlm_ls {
471 char *ls_recover_buf; 471 char *ls_recover_buf;
472 int ls_recover_nodeid; /* for debugging */ 472 int ls_recover_nodeid; /* for debugging */
473 uint64_t ls_rcom_seq; 473 uint64_t ls_rcom_seq;
474 spinlock_t ls_rcom_spin;
474 struct list_head ls_recover_list; 475 struct list_head ls_recover_list;
475 spinlock_t ls_recover_list_lock; 476 spinlock_t ls_recover_list_lock;
476 int ls_recover_list_count; 477 int ls_recover_list_count;
@@ -488,7 +489,8 @@ struct dlm_ls {
488#define LSFL_RUNNING 1 489#define LSFL_RUNNING 1
489#define LSFL_RECOVERY_STOP 2 490#define LSFL_RECOVERY_STOP 2
490#define LSFL_RCOM_READY 3 491#define LSFL_RCOM_READY 3
491#define LSFL_UEVENT_WAIT 4 492#define LSFL_RCOM_WAIT 4
493#define LSFL_UEVENT_WAIT 5
492 494
493/* much of this is just saving user space pointers associated with the 495/* much of this is just saving user space pointers associated with the
494 lock that we pass back to the user lib with an ast */ 496 lock that we pass back to the user lib with an ast */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 3f2befa4797b..30878defaeb6 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -2372,6 +2372,7 @@ static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2372static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) 2372static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2373{ 2373{
2374 lkb->lkb_exflags = ms->m_exflags; 2374 lkb->lkb_exflags = ms->m_exflags;
2375 lkb->lkb_sbflags = ms->m_sbflags;
2375 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 2376 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2376 (ms->m_flags & 0x0000FFFF); 2377 (ms->m_flags & 0x0000FFFF);
2377} 2378}
@@ -3028,10 +3029,17 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3028 3029
3029 while (1) { 3030 while (1) {
3030 if (dlm_locking_stopped(ls)) { 3031 if (dlm_locking_stopped(ls)) {
3031 if (!recovery) 3032 if (recovery) {
3032 dlm_add_requestqueue(ls, nodeid, hd); 3033 error = -EINTR;
3033 error = -EINTR; 3034 goto out;
3034 goto out; 3035 }
3036 error = dlm_add_requestqueue(ls, nodeid, hd);
3037 if (error == -EAGAIN)
3038 continue;
3039 else {
3040 error = -EINTR;
3041 goto out;
3042 }
3035 } 3043 }
3036 3044
3037 if (lock_recovery_try(ls)) 3045 if (lock_recovery_try(ls))
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index f8842ca443c2..59012b089e8d 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -22,6 +22,7 @@
22#include "memory.h" 22#include "memory.h"
23#include "lock.h" 23#include "lock.h"
24#include "recover.h" 24#include "recover.h"
25#include "requestqueue.h"
25 26
26#ifdef CONFIG_DLM_DEBUG 27#ifdef CONFIG_DLM_DEBUG
27int dlm_create_debug_file(struct dlm_ls *ls); 28int dlm_create_debug_file(struct dlm_ls *ls);
@@ -478,6 +479,8 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
478 ls->ls_recoverd_task = NULL; 479 ls->ls_recoverd_task = NULL;
479 mutex_init(&ls->ls_recoverd_active); 480 mutex_init(&ls->ls_recoverd_active);
480 spin_lock_init(&ls->ls_recover_lock); 481 spin_lock_init(&ls->ls_recover_lock);
482 spin_lock_init(&ls->ls_rcom_spin);
483 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
481 ls->ls_recover_status = 0; 484 ls->ls_recover_status = 0;
482 ls->ls_recover_seq = 0; 485 ls->ls_recover_seq = 0;
483 ls->ls_recover_args = NULL; 486 ls->ls_recover_args = NULL;
@@ -684,6 +687,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
684 * Free structures on any other lists 687 * Free structures on any other lists
685 */ 688 */
686 689
690 dlm_purge_requestqueue(ls);
687 kfree(ls->ls_recover_args); 691 kfree(ls->ls_recover_args);
688 dlm_clear_free_entries(ls); 692 dlm_clear_free_entries(ls);
689 dlm_clear_members(ls); 693 dlm_clear_members(ls);
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms-sctp.c
index 6da6b14d5a61..fe158d7a9285 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms-sctp.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -75,13 +75,13 @@ struct nodeinfo {
75}; 75};
76 76
77static DEFINE_IDR(nodeinfo_idr); 77static DEFINE_IDR(nodeinfo_idr);
78static struct rw_semaphore nodeinfo_lock; 78static DECLARE_RWSEM(nodeinfo_lock);
79static int max_nodeid; 79static int max_nodeid;
80 80
81struct cbuf { 81struct cbuf {
82 unsigned base; 82 unsigned int base;
83 unsigned len; 83 unsigned int len;
84 unsigned mask; 84 unsigned int mask;
85}; 85};
86 86
87/* Just the one of these, now. But this struct keeps 87/* Just the one of these, now. But this struct keeps
@@ -90,9 +90,9 @@ struct cbuf {
90#define CF_READ_PENDING 1 90#define CF_READ_PENDING 1
91 91
92struct connection { 92struct connection {
93 struct socket *sock; 93 struct socket *sock;
94 unsigned long flags; 94 unsigned long flags;
95 struct page *rx_page; 95 struct page *rx_page;
96 atomic_t waiting_requests; 96 atomic_t waiting_requests;
97 struct cbuf cb; 97 struct cbuf cb;
98 int eagain_flag; 98 int eagain_flag;
@@ -102,36 +102,40 @@ struct connection {
102 102
103struct writequeue_entry { 103struct writequeue_entry {
104 struct list_head list; 104 struct list_head list;
105 struct page *page; 105 struct page *page;
106 int offset; 106 int offset;
107 int len; 107 int len;
108 int end; 108 int end;
109 int users; 109 int users;
110 struct nodeinfo *ni; 110 struct nodeinfo *ni;
111}; 111};
112 112
113#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) 113static void cbuf_add(struct cbuf *cb, int n)
114#define CBUF_EMPTY(cb) ((cb)->len == 0) 114{
115#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) 115 cb->len += n;
116#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) 116}
117 117
118#define CBUF_INIT(cb, size) \ 118static int cbuf_data(struct cbuf *cb)
119do { \ 119{
120 (cb)->base = (cb)->len = 0; \ 120 return ((cb->base + cb->len) & cb->mask);
121 (cb)->mask = ((size)-1); \ 121}
122} while(0)
123 122
124#define CBUF_EAT(cb, n) \ 123static void cbuf_init(struct cbuf *cb, int size)
125do { \ 124{
126 (cb)->len -= (n); \ 125 cb->base = cb->len = 0;
127 (cb)->base += (n); \ 126 cb->mask = size-1;
128 (cb)->base &= (cb)->mask; \ 127}
129} while(0)
130 128
129static void cbuf_eat(struct cbuf *cb, int n)
130{
131 cb->len -= n;
132 cb->base += n;
133 cb->base &= cb->mask;
134}
131 135
132/* List of nodes which have writes pending */ 136/* List of nodes which have writes pending */
133static struct list_head write_nodes; 137static LIST_HEAD(write_nodes);
134static spinlock_t write_nodes_lock; 138static DEFINE_SPINLOCK(write_nodes_lock);
135 139
136/* Maximum number of incoming messages to process before 140/* Maximum number of incoming messages to process before
137 * doing a schedule() 141 * doing a schedule()
@@ -141,8 +145,7 @@ static spinlock_t write_nodes_lock;
141/* Manage daemons */ 145/* Manage daemons */
142static struct task_struct *recv_task; 146static struct task_struct *recv_task;
143static struct task_struct *send_task; 147static struct task_struct *send_task;
144static wait_queue_head_t lowcomms_recv_wait; 148static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait);
145static atomic_t accepting;
146 149
147/* The SCTP connection */ 150/* The SCTP connection */
148static struct connection sctp_con; 151static struct connection sctp_con;
@@ -161,11 +164,11 @@ static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
161 return error; 164 return error;
162 165
163 if (dlm_local_addr[0]->ss_family == AF_INET) { 166 if (dlm_local_addr[0]->ss_family == AF_INET) {
164 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; 167 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
165 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; 168 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
166 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 169 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
167 } else { 170 } else {
168 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; 171 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
169 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; 172 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
170 memcpy(&ret6->sin6_addr, &in6->sin6_addr, 173 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
171 sizeof(in6->sin6_addr)); 174 sizeof(in6->sin6_addr));
@@ -174,6 +177,8 @@ static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
174 return 0; 177 return 0;
175} 178}
176 179
180/* If alloc is 0 here we will not attempt to allocate a new
181 nodeinfo struct */
177static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) 182static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
178{ 183{
179 struct nodeinfo *ni; 184 struct nodeinfo *ni;
@@ -184,44 +189,45 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
184 ni = idr_find(&nodeinfo_idr, nodeid); 189 ni = idr_find(&nodeinfo_idr, nodeid);
185 up_read(&nodeinfo_lock); 190 up_read(&nodeinfo_lock);
186 191
187 if (!ni && alloc) { 192 if (ni || !alloc)
188 down_write(&nodeinfo_lock); 193 return ni;
189 194
190 ni = idr_find(&nodeinfo_idr, nodeid); 195 down_write(&nodeinfo_lock);
191 if (ni)
192 goto out_up;
193 196
194 r = idr_pre_get(&nodeinfo_idr, alloc); 197 ni = idr_find(&nodeinfo_idr, nodeid);
195 if (!r) 198 if (ni)
196 goto out_up; 199 goto out_up;
197 200
198 ni = kmalloc(sizeof(struct nodeinfo), alloc); 201 r = idr_pre_get(&nodeinfo_idr, alloc);
199 if (!ni) 202 if (!r)
200 goto out_up; 203 goto out_up;
201 204
202 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n); 205 ni = kmalloc(sizeof(struct nodeinfo), alloc);
203 if (r) { 206 if (!ni)
204 kfree(ni); 207 goto out_up;
205 ni = NULL; 208
206 goto out_up; 209 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
207 } 210 if (r) {
208 if (n != nodeid) { 211 kfree(ni);
209 idr_remove(&nodeinfo_idr, n); 212 ni = NULL;
210 kfree(ni); 213 goto out_up;
211 ni = NULL;
212 goto out_up;
213 }
214 memset(ni, 0, sizeof(struct nodeinfo));
215 spin_lock_init(&ni->lock);
216 INIT_LIST_HEAD(&ni->writequeue);
217 spin_lock_init(&ni->writequeue_lock);
218 ni->nodeid = nodeid;
219
220 if (nodeid > max_nodeid)
221 max_nodeid = nodeid;
222 out_up:
223 up_write(&nodeinfo_lock);
224 } 214 }
215 if (n != nodeid) {
216 idr_remove(&nodeinfo_idr, n);
217 kfree(ni);
218 ni = NULL;
219 goto out_up;
220 }
221 memset(ni, 0, sizeof(struct nodeinfo));
222 spin_lock_init(&ni->lock);
223 INIT_LIST_HEAD(&ni->writequeue);
224 spin_lock_init(&ni->writequeue_lock);
225 ni->nodeid = nodeid;
226
227 if (nodeid > max_nodeid)
228 max_nodeid = nodeid;
229out_up:
230 up_write(&nodeinfo_lock);
225 231
226 return ni; 232 return ni;
227} 233}
@@ -279,13 +285,13 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
279 in4_addr->sin_port = cpu_to_be16(port); 285 in4_addr->sin_port = cpu_to_be16(port);
280 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 286 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
281 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) - 287 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
282 sizeof(struct sockaddr_in)); 288 sizeof(struct sockaddr_in));
283 *addr_len = sizeof(struct sockaddr_in); 289 *addr_len = sizeof(struct sockaddr_in);
284 } else { 290 } else {
285 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 291 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
286 in6_addr->sin6_port = cpu_to_be16(port); 292 in6_addr->sin6_port = cpu_to_be16(port);
287 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) - 293 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
288 sizeof(struct sockaddr_in6)); 294 sizeof(struct sockaddr_in6));
289 *addr_len = sizeof(struct sockaddr_in6); 295 *addr_len = sizeof(struct sockaddr_in6);
290 } 296 }
291} 297}
@@ -324,7 +330,7 @@ static void send_shutdown(sctp_assoc_t associd)
324 cmsg->cmsg_type = SCTP_SNDRCV; 330 cmsg->cmsg_type = SCTP_SNDRCV;
325 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 331 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
326 outmessage.msg_controllen = cmsg->cmsg_len; 332 outmessage.msg_controllen = cmsg->cmsg_len;
327 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 333 sinfo = CMSG_DATA(cmsg);
328 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 334 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
329 335
330 sinfo->sinfo_flags |= MSG_EOF; 336 sinfo->sinfo_flags |= MSG_EOF;
@@ -387,7 +393,7 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
387 393
388 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 394 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
389 log_print("COMM_UP for invalid assoc ID %d", 395 log_print("COMM_UP for invalid assoc ID %d",
390 (int)sn->sn_assoc_change.sac_assoc_id); 396 (int)sn->sn_assoc_change.sac_assoc_id);
391 init_failed(); 397 init_failed();
392 return; 398 return;
393 } 399 }
@@ -398,15 +404,18 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
398 fs = get_fs(); 404 fs = get_fs();
399 set_fs(get_ds()); 405 set_fs(get_ds());
400 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock, 406 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
401 IPPROTO_SCTP, SCTP_PRIMARY_ADDR, 407 IPPROTO_SCTP,
402 (char*)&prim, &prim_len); 408 SCTP_PRIMARY_ADDR,
409 (char*)&prim,
410 &prim_len);
403 set_fs(fs); 411 set_fs(fs);
404 if (ret < 0) { 412 if (ret < 0) {
405 struct nodeinfo *ni; 413 struct nodeinfo *ni;
406 414
407 log_print("getsockopt/sctp_primary_addr on " 415 log_print("getsockopt/sctp_primary_addr on "
408 "new assoc %d failed : %d", 416 "new assoc %d failed : %d",
409 (int)sn->sn_assoc_change.sac_assoc_id, ret); 417 (int)sn->sn_assoc_change.sac_assoc_id,
418 ret);
410 419
411 /* Retry INIT later */ 420 /* Retry INIT later */
412 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); 421 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
@@ -426,12 +435,10 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
426 return; 435 return;
427 436
428 /* Save the assoc ID */ 437 /* Save the assoc ID */
429 spin_lock(&ni->lock);
430 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id; 438 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
431 spin_unlock(&ni->lock);
432 439
433 log_print("got new/restarted association %d nodeid %d", 440 log_print("got new/restarted association %d nodeid %d",
434 (int)sn->sn_assoc_change.sac_assoc_id, nodeid); 441 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
435 442
436 /* Send any pending writes */ 443 /* Send any pending writes */
437 clear_bit(NI_INIT_PENDING, &ni->flags); 444 clear_bit(NI_INIT_PENDING, &ni->flags);
@@ -507,13 +514,12 @@ static int receive_from_sock(void)
507 sctp_con.rx_page = alloc_page(GFP_ATOMIC); 514 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
508 if (sctp_con.rx_page == NULL) 515 if (sctp_con.rx_page == NULL)
509 goto out_resched; 516 goto out_resched;
510 CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE); 517 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
511 } 518 }
512 519
513 memset(&incmsg, 0, sizeof(incmsg)); 520 memset(&incmsg, 0, sizeof(incmsg));
514 memset(&msgname, 0, sizeof(msgname)); 521 memset(&msgname, 0, sizeof(msgname));
515 522
516 memset(incmsg, 0, sizeof(incmsg));
517 msg.msg_name = &msgname; 523 msg.msg_name = &msgname;
518 msg.msg_namelen = sizeof(msgname); 524 msg.msg_namelen = sizeof(msgname);
519 msg.msg_flags = 0; 525 msg.msg_flags = 0;
@@ -532,17 +538,17 @@ static int receive_from_sock(void)
532 * iov[0] is the bit of the circular buffer between the current end 538 * iov[0] is the bit of the circular buffer between the current end
533 * point (cb.base + cb.len) and the end of the buffer. 539 * point (cb.base + cb.len) and the end of the buffer.
534 */ 540 */
535 iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb); 541 iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
536 iov[0].iov_base = page_address(sctp_con.rx_page) + 542 iov[0].iov_base = page_address(sctp_con.rx_page) +
537 CBUF_DATA(&sctp_con.cb); 543 cbuf_data(&sctp_con.cb);
538 iov[1].iov_len = 0; 544 iov[1].iov_len = 0;
539 545
540 /* 546 /*
541 * iov[1] is the bit of the circular buffer between the start of the 547 * iov[1] is the bit of the circular buffer between the start of the
542 * buffer and the start of the currently used section (cb.base) 548 * buffer and the start of the currently used section (cb.base)
543 */ 549 */
544 if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) { 550 if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
545 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb); 551 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
546 iov[1].iov_len = sctp_con.cb.base; 552 iov[1].iov_len = sctp_con.cb.base;
547 iov[1].iov_base = page_address(sctp_con.rx_page); 553 iov[1].iov_base = page_address(sctp_con.rx_page);
548 msg.msg_iovlen = 2; 554 msg.msg_iovlen = 2;
@@ -557,7 +563,7 @@ static int receive_from_sock(void)
557 msg.msg_control = incmsg; 563 msg.msg_control = incmsg;
558 msg.msg_controllen = sizeof(incmsg); 564 msg.msg_controllen = sizeof(incmsg);
559 cmsg = CMSG_FIRSTHDR(&msg); 565 cmsg = CMSG_FIRSTHDR(&msg);
560 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 566 sinfo = CMSG_DATA(cmsg);
561 567
562 if (msg.msg_flags & MSG_NOTIFICATION) { 568 if (msg.msg_flags & MSG_NOTIFICATION) {
563 process_sctp_notification(&msg, page_address(sctp_con.rx_page)); 569 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
@@ -583,29 +589,29 @@ static int receive_from_sock(void)
583 if (r == 1) 589 if (r == 1)
584 return 0; 590 return 0;
585 591
586 CBUF_ADD(&sctp_con.cb, ret); 592 cbuf_add(&sctp_con.cb, ret);
587 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), 593 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
588 page_address(sctp_con.rx_page), 594 page_address(sctp_con.rx_page),
589 sctp_con.cb.base, sctp_con.cb.len, 595 sctp_con.cb.base, sctp_con.cb.len,
590 PAGE_CACHE_SIZE); 596 PAGE_CACHE_SIZE);
591 if (ret < 0) 597 if (ret < 0)
592 goto out_close; 598 goto out_close;
593 CBUF_EAT(&sctp_con.cb, ret); 599 cbuf_eat(&sctp_con.cb, ret);
594 600
595 out: 601out:
596 ret = 0; 602 ret = 0;
597 goto out_ret; 603 goto out_ret;
598 604
599 out_resched: 605out_resched:
600 lowcomms_data_ready(sctp_con.sock->sk, 0); 606 lowcomms_data_ready(sctp_con.sock->sk, 0);
601 ret = 0; 607 ret = 0;
602 schedule(); 608 cond_resched();
603 goto out_ret; 609 goto out_ret;
604 610
605 out_close: 611out_close:
606 if (ret != -EAGAIN) 612 if (ret != -EAGAIN)
607 log_print("error reading from sctp socket: %d", ret); 613 log_print("error reading from sctp socket: %d", ret);
608 out_ret: 614out_ret:
609 return ret; 615 return ret;
610} 616}
611 617
@@ -619,10 +625,12 @@ static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
619 set_fs(get_ds()); 625 set_fs(get_ds());
620 if (num == 1) 626 if (num == 1)
621 result = sctp_con.sock->ops->bind(sctp_con.sock, 627 result = sctp_con.sock->ops->bind(sctp_con.sock,
622 (struct sockaddr *) addr, addr_len); 628 (struct sockaddr *) addr,
629 addr_len);
623 else 630 else
624 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP, 631 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
625 SCTP_SOCKOPT_BINDX_ADD, (char *)addr, addr_len); 632 SCTP_SOCKOPT_BINDX_ADD,
633 (char *)addr, addr_len);
626 set_fs(fs); 634 set_fs(fs);
627 635
628 if (result < 0) 636 if (result < 0)
@@ -719,10 +727,10 @@ static int init_sock(void)
719 727
720 return 0; 728 return 0;
721 729
722 create_delsock: 730create_delsock:
723 sock_release(sock); 731 sock_release(sock);
724 sctp_con.sock = NULL; 732 sctp_con.sock = NULL;
725 out: 733out:
726 return result; 734 return result;
727} 735}
728 736
@@ -756,16 +764,13 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
756 int users = 0; 764 int users = 0;
757 struct nodeinfo *ni; 765 struct nodeinfo *ni;
758 766
759 if (!atomic_read(&accepting))
760 return NULL;
761
762 ni = nodeid2nodeinfo(nodeid, allocation); 767 ni = nodeid2nodeinfo(nodeid, allocation);
763 if (!ni) 768 if (!ni)
764 return NULL; 769 return NULL;
765 770
766 spin_lock(&ni->writequeue_lock); 771 spin_lock(&ni->writequeue_lock);
767 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list); 772 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
768 if (((struct list_head *) e == &ni->writequeue) || 773 if ((&e->list == &ni->writequeue) ||
769 (PAGE_CACHE_SIZE - e->end < len)) { 774 (PAGE_CACHE_SIZE - e->end < len)) {
770 e = NULL; 775 e = NULL;
771 } else { 776 } else {
@@ -776,7 +781,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
776 spin_unlock(&ni->writequeue_lock); 781 spin_unlock(&ni->writequeue_lock);
777 782
778 if (e) { 783 if (e) {
779 got_one: 784 got_one:
780 if (users == 0) 785 if (users == 0)
781 kmap(e->page); 786 kmap(e->page);
782 *ppc = page_address(e->page) + offset; 787 *ppc = page_address(e->page) + offset;
@@ -803,9 +808,6 @@ void dlm_lowcomms_commit_buffer(void *arg)
803 int users; 808 int users;
804 struct nodeinfo *ni = e->ni; 809 struct nodeinfo *ni = e->ni;
805 810
806 if (!atomic_read(&accepting))
807 return;
808
809 spin_lock(&ni->writequeue_lock); 811 spin_lock(&ni->writequeue_lock);
810 users = --e->users; 812 users = --e->users;
811 if (users) 813 if (users)
@@ -822,7 +824,7 @@ void dlm_lowcomms_commit_buffer(void *arg)
822 } 824 }
823 return; 825 return;
824 826
825 out: 827out:
826 spin_unlock(&ni->writequeue_lock); 828 spin_unlock(&ni->writequeue_lock);
827 return; 829 return;
828} 830}
@@ -878,7 +880,7 @@ static void initiate_association(int nodeid)
878 cmsg->cmsg_level = IPPROTO_SCTP; 880 cmsg->cmsg_level = IPPROTO_SCTP;
879 cmsg->cmsg_type = SCTP_SNDRCV; 881 cmsg->cmsg_type = SCTP_SNDRCV;
880 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 882 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
881 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 883 sinfo = CMSG_DATA(cmsg);
882 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 884 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
883 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); 885 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
884 886
@@ -892,7 +894,7 @@ static void initiate_association(int nodeid)
892} 894}
893 895
894/* Send a message */ 896/* Send a message */
895static int send_to_sock(struct nodeinfo *ni) 897static void send_to_sock(struct nodeinfo *ni)
896{ 898{
897 int ret = 0; 899 int ret = 0;
898 struct writequeue_entry *e; 900 struct writequeue_entry *e;
@@ -903,13 +905,13 @@ static int send_to_sock(struct nodeinfo *ni)
903 struct sctp_sndrcvinfo *sinfo; 905 struct sctp_sndrcvinfo *sinfo;
904 struct kvec iov; 906 struct kvec iov;
905 907
906 /* See if we need to init an association before we start 908 /* See if we need to init an association before we start
907 sending precious messages */ 909 sending precious messages */
908 spin_lock(&ni->lock); 910 spin_lock(&ni->lock);
909 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { 911 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
910 spin_unlock(&ni->lock); 912 spin_unlock(&ni->lock);
911 initiate_association(ni->nodeid); 913 initiate_association(ni->nodeid);
912 return 0; 914 return;
913 } 915 }
914 spin_unlock(&ni->lock); 916 spin_unlock(&ni->lock);
915 917
@@ -923,7 +925,7 @@ static int send_to_sock(struct nodeinfo *ni)
923 cmsg->cmsg_level = IPPROTO_SCTP; 925 cmsg->cmsg_level = IPPROTO_SCTP;
924 cmsg->cmsg_type = SCTP_SNDRCV; 926 cmsg->cmsg_type = SCTP_SNDRCV;
925 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 927 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
926 sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); 928 sinfo = CMSG_DATA(cmsg);
927 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 929 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
928 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); 930 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
929 sinfo->sinfo_assoc_id = ni->assoc_id; 931 sinfo->sinfo_assoc_id = ni->assoc_id;
@@ -955,7 +957,7 @@ static int send_to_sock(struct nodeinfo *ni)
955 goto send_error; 957 goto send_error;
956 } else { 958 } else {
957 /* Don't starve people filling buffers */ 959 /* Don't starve people filling buffers */
958 schedule(); 960 cond_resched();
959 } 961 }
960 962
961 spin_lock(&ni->writequeue_lock); 963 spin_lock(&ni->writequeue_lock);
@@ -964,15 +966,16 @@ static int send_to_sock(struct nodeinfo *ni)
964 966
965 if (e->len == 0 && e->users == 0) { 967 if (e->len == 0 && e->users == 0) {
966 list_del(&e->list); 968 list_del(&e->list);
969 kunmap(e->page);
967 free_entry(e); 970 free_entry(e);
968 continue; 971 continue;
969 } 972 }
970 } 973 }
971 spin_unlock(&ni->writequeue_lock); 974 spin_unlock(&ni->writequeue_lock);
972 out: 975out:
973 return ret; 976 return;
974 977
975 send_error: 978send_error:
976 log_print("Error sending to node %d %d", ni->nodeid, ret); 979 log_print("Error sending to node %d %d", ni->nodeid, ret);
977 spin_lock(&ni->lock); 980 spin_lock(&ni->lock);
978 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { 981 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
@@ -982,7 +985,7 @@ static int send_to_sock(struct nodeinfo *ni)
982 } else 985 } else
983 spin_unlock(&ni->lock); 986 spin_unlock(&ni->lock);
984 987
985 return ret; 988 return;
986} 989}
987 990
988/* Try to send any messages that are pending */ 991/* Try to send any messages that are pending */
@@ -994,7 +997,7 @@ static void process_output_queue(void)
994 spin_lock_bh(&write_nodes_lock); 997 spin_lock_bh(&write_nodes_lock);
995 list_for_each_safe(list, temp, &write_nodes) { 998 list_for_each_safe(list, temp, &write_nodes) {
996 struct nodeinfo *ni = 999 struct nodeinfo *ni =
997 list_entry(list, struct nodeinfo, write_list); 1000 list_entry(list, struct nodeinfo, write_list);
998 clear_bit(NI_WRITE_PENDING, &ni->flags); 1001 clear_bit(NI_WRITE_PENDING, &ni->flags);
999 list_del(&ni->write_list); 1002 list_del(&ni->write_list);
1000 1003
@@ -1106,7 +1109,7 @@ static int dlm_recvd(void *data)
1106 set_current_state(TASK_INTERRUPTIBLE); 1109 set_current_state(TASK_INTERRUPTIBLE);
1107 add_wait_queue(&lowcomms_recv_wait, &wait); 1110 add_wait_queue(&lowcomms_recv_wait, &wait);
1108 if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) 1111 if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
1109 schedule(); 1112 cond_resched();
1110 remove_wait_queue(&lowcomms_recv_wait, &wait); 1113 remove_wait_queue(&lowcomms_recv_wait, &wait);
1111 set_current_state(TASK_RUNNING); 1114 set_current_state(TASK_RUNNING);
1112 1115
@@ -1118,12 +1121,12 @@ static int dlm_recvd(void *data)
1118 1121
1119 /* Don't starve out everyone else */ 1122 /* Don't starve out everyone else */
1120 if (++count >= MAX_RX_MSG_COUNT) { 1123 if (++count >= MAX_RX_MSG_COUNT) {
1121 schedule(); 1124 cond_resched();
1122 count = 0; 1125 count = 0;
1123 } 1126 }
1124 } while (!kthread_should_stop() && ret >=0); 1127 } while (!kthread_should_stop() && ret >=0);
1125 } 1128 }
1126 schedule(); 1129 cond_resched();
1127 } 1130 }
1128 1131
1129 return 0; 1132 return 0;
@@ -1138,7 +1141,7 @@ static int dlm_sendd(void *data)
1138 while (!kthread_should_stop()) { 1141 while (!kthread_should_stop()) {
1139 set_current_state(TASK_INTERRUPTIBLE); 1142 set_current_state(TASK_INTERRUPTIBLE);
1140 if (write_list_empty()) 1143 if (write_list_empty())
1141 schedule(); 1144 cond_resched();
1142 set_current_state(TASK_RUNNING); 1145 set_current_state(TASK_RUNNING);
1143 1146
1144 if (sctp_con.eagain_flag) { 1147 if (sctp_con.eagain_flag) {
@@ -1166,7 +1169,7 @@ static int daemons_start(void)
1166 1169
1167 p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); 1170 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1168 error = IS_ERR(p); 1171 error = IS_ERR(p);
1169 if (error) { 1172 if (error) {
1170 log_print("can't start dlm_recvd %d", error); 1173 log_print("can't start dlm_recvd %d", error);
1171 return error; 1174 return error;
1172 } 1175 }
@@ -1174,7 +1177,7 @@ static int daemons_start(void)
1174 1177
1175 p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); 1178 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1176 error = IS_ERR(p); 1179 error = IS_ERR(p);
1177 if (error) { 1180 if (error) {
1178 log_print("can't start dlm_sendd %d", error); 1181 log_print("can't start dlm_sendd %d", error);
1179 kthread_stop(recv_task); 1182 kthread_stop(recv_task);
1180 return error; 1183 return error;
@@ -1197,43 +1200,28 @@ int dlm_lowcomms_start(void)
1197 error = daemons_start(); 1200 error = daemons_start();
1198 if (error) 1201 if (error)
1199 goto fail_sock; 1202 goto fail_sock;
1200 atomic_set(&accepting, 1);
1201 return 0; 1203 return 0;
1202 1204
1203 fail_sock: 1205fail_sock:
1204 close_connection(); 1206 close_connection();
1205 return error; 1207 return error;
1206} 1208}
1207 1209
1208/* Set all the activity flags to prevent any socket activity. */
1209
1210void dlm_lowcomms_stop(void) 1210void dlm_lowcomms_stop(void)
1211{ 1211{
1212 atomic_set(&accepting, 0); 1212 int i;
1213
1213 sctp_con.flags = 0x7; 1214 sctp_con.flags = 0x7;
1214 daemons_stop(); 1215 daemons_stop();
1215 clean_writequeues(); 1216 clean_writequeues();
1216 close_connection(); 1217 close_connection();
1217 dealloc_nodeinfo(); 1218 dealloc_nodeinfo();
1218 max_nodeid = 0; 1219 max_nodeid = 0;
1219}
1220 1220
1221int dlm_lowcomms_init(void) 1221 dlm_local_count = 0;
1222{ 1222 dlm_local_nodeid = 0;
1223 init_waitqueue_head(&lowcomms_recv_wait);
1224 spin_lock_init(&write_nodes_lock);
1225 INIT_LIST_HEAD(&write_nodes);
1226 init_rwsem(&nodeinfo_lock);
1227 return 0;
1228}
1229
1230void dlm_lowcomms_exit(void)
1231{
1232 int i;
1233 1223
1234 for (i = 0; i < dlm_local_count; i++) 1224 for (i = 0; i < dlm_local_count; i++)
1235 kfree(dlm_local_addr[i]); 1225 kfree(dlm_local_addr[i]);
1236 dlm_local_count = 0;
1237 dlm_local_nodeid = 0;
1238} 1226}
1239 1227
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
new file mode 100644
index 000000000000..8f2791fc8447
--- /dev/null
+++ b/fs/dlm/lowcomms-tcp.c
@@ -0,0 +1,1189 @@
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never waits.
42 *
43 */
44
45
46#include <asm/ioctls.h>
47#include <net/sock.h>
48#include <net/tcp.h>
49#include <linux/pagemap.h>
50
51#include "dlm_internal.h"
52#include "lowcomms.h"
53#include "midcomms.h"
54#include "config.h"
55
56struct cbuf {
57 unsigned int base;
58 unsigned int len;
59 unsigned int mask;
60};
61
62#define NODE_INCREMENT 32
63static void cbuf_add(struct cbuf *cb, int n)
64{
65 cb->len += n;
66}
67
68static int cbuf_data(struct cbuf *cb)
69{
70 return ((cb->base + cb->len) & cb->mask);
71}
72
73static void cbuf_init(struct cbuf *cb, int size)
74{
75 cb->base = cb->len = 0;
76 cb->mask = size-1;
77}
78
79static void cbuf_eat(struct cbuf *cb, int n)
80{
81 cb->len -= n;
82 cb->base += n;
83 cb->base &= cb->mask;
84}
85
86static bool cbuf_empty(struct cbuf *cb)
87{
88 return cb->len == 0;
89}
90
91/* Maximum number of incoming messages to process before
92 doing a cond_resched()
93*/
94#define MAX_RX_MSG_COUNT 25
95
96struct connection {
97 struct socket *sock; /* NULL if not connected */
98 uint32_t nodeid; /* So we know who we are in the list */
99 struct rw_semaphore sock_sem; /* Stop connect races */
100 struct list_head read_list; /* On this list when ready for reading */
101 struct list_head write_list; /* On this list when ready for writing */
102 struct list_head state_list; /* On this list when ready to connect */
103 unsigned long flags; /* bit 1,2 = We are on the read/write lists */
104#define CF_READ_PENDING 1
105#define CF_WRITE_PENDING 2
106#define CF_CONNECT_PENDING 3
107#define CF_IS_OTHERCON 4
108 struct list_head writequeue; /* List of outgoing writequeue_entries */
109 struct list_head listenlist; /* List of allocated listening sockets */
110 spinlock_t writequeue_lock;
111 int (*rx_action) (struct connection *); /* What to do when active */
112 struct page *rx_page;
113 struct cbuf cb;
114 int retries;
115 atomic_t waiting_requests;
116#define MAX_CONNECT_RETRIES 3
117 struct connection *othercon;
118};
119#define sock2con(x) ((struct connection *)(x)->sk_user_data)
120
121/* An entry waiting to be sent */
122struct writequeue_entry {
123 struct list_head list;
124 struct page *page;
125 int offset;
126 int len;
127 int end;
128 int users;
129 struct connection *con;
130};
131
132static struct sockaddr_storage dlm_local_addr;
133
134/* Manage daemons */
135static struct task_struct *recv_task;
136static struct task_struct *send_task;
137
138static wait_queue_t lowcomms_send_waitq_head;
139static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
140static wait_queue_t lowcomms_recv_waitq_head;
141static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
142
143/* An array of pointers to connections, indexed by NODEID */
144static struct connection **connections;
145static DECLARE_MUTEX(connections_lock);
146static kmem_cache_t *con_cache;
147static int conn_array_size;
148
149/* List of sockets that have reads pending */
150static LIST_HEAD(read_sockets);
151static DEFINE_SPINLOCK(read_sockets_lock);
152
153/* List of sockets which have writes pending */
154static LIST_HEAD(write_sockets);
155static DEFINE_SPINLOCK(write_sockets_lock);
156
157/* List of sockets which have connects pending */
158static LIST_HEAD(state_sockets);
159static DEFINE_SPINLOCK(state_sockets_lock);
160
161static struct connection *nodeid2con(int nodeid, gfp_t allocation)
162{
163 struct connection *con = NULL;
164
165 down(&connections_lock);
166 if (nodeid >= conn_array_size) {
167 int new_size = nodeid + NODE_INCREMENT;
168 struct connection **new_conns;
169
170 new_conns = kzalloc(sizeof(struct connection *) *
171 new_size, allocation);
172 if (!new_conns)
173 goto finish;
174
175 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
176 conn_array_size = new_size;
177 kfree(connections);
178 connections = new_conns;
179
180 }
181
182 con = connections[nodeid];
183 if (con == NULL && allocation) {
184 con = kmem_cache_zalloc(con_cache, allocation);
185 if (!con)
186 goto finish;
187
188 con->nodeid = nodeid;
189 init_rwsem(&con->sock_sem);
190 INIT_LIST_HEAD(&con->writequeue);
191 spin_lock_init(&con->writequeue_lock);
192
193 connections[nodeid] = con;
194 }
195
196finish:
197 up(&connections_lock);
198 return con;
199}
200
201/* Data available on socket or listen socket received a connect */
202static void lowcomms_data_ready(struct sock *sk, int count_unused)
203{
204 struct connection *con = sock2con(sk);
205
206 atomic_inc(&con->waiting_requests);
207 if (test_and_set_bit(CF_READ_PENDING, &con->flags))
208 return;
209
210 spin_lock_bh(&read_sockets_lock);
211 list_add_tail(&con->read_list, &read_sockets);
212 spin_unlock_bh(&read_sockets_lock);
213
214 wake_up_interruptible(&lowcomms_recv_waitq);
215}
216
217static void lowcomms_write_space(struct sock *sk)
218{
219 struct connection *con = sock2con(sk);
220
221 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
222 return;
223
224 spin_lock_bh(&write_sockets_lock);
225 list_add_tail(&con->write_list, &write_sockets);
226 spin_unlock_bh(&write_sockets_lock);
227
228 wake_up_interruptible(&lowcomms_send_waitq);
229}
230
231static inline void lowcomms_connect_sock(struct connection *con)
232{
233 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
234 return;
235
236 spin_lock_bh(&state_sockets_lock);
237 list_add_tail(&con->state_list, &state_sockets);
238 spin_unlock_bh(&state_sockets_lock);
239
240 wake_up_interruptible(&lowcomms_send_waitq);
241}
242
243static void lowcomms_state_change(struct sock *sk)
244{
245 if (sk->sk_state == TCP_ESTABLISHED)
246 lowcomms_write_space(sk);
247}
248
249/* Make a socket active */
250static int add_sock(struct socket *sock, struct connection *con)
251{
252 con->sock = sock;
253
254 /* Install a data_ready callback */
255 con->sock->sk->sk_data_ready = lowcomms_data_ready;
256 con->sock->sk->sk_write_space = lowcomms_write_space;
257 con->sock->sk->sk_state_change = lowcomms_state_change;
258
259 return 0;
260}
261
262/* Add the port number to an IP6 or 4 sockaddr and return the address
263 length */
264static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
265 int *addr_len)
266{
267 saddr->ss_family = dlm_local_addr.ss_family;
268 if (saddr->ss_family == AF_INET) {
269 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
270 in4_addr->sin_port = cpu_to_be16(port);
271 *addr_len = sizeof(struct sockaddr_in);
272 } else {
273 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
274 in6_addr->sin6_port = cpu_to_be16(port);
275 *addr_len = sizeof(struct sockaddr_in6);
276 }
277}
278
279/* Close a remote connection and tidy up */
280static void close_connection(struct connection *con, bool and_other)
281{
282 down_write(&con->sock_sem);
283
284 if (con->sock) {
285 sock_release(con->sock);
286 con->sock = NULL;
287 }
288 if (con->othercon && and_other) {
289 /* Will only re-enter once. */
290 close_connection(con->othercon, false);
291 }
292 if (con->rx_page) {
293 __free_page(con->rx_page);
294 con->rx_page = NULL;
295 }
296 con->retries = 0;
297 up_write(&con->sock_sem);
298}
299
300/* Data received from remote end */
301static int receive_from_sock(struct connection *con)
302{
303 int ret = 0;
304 struct msghdr msg;
305 struct iovec iov[2];
306 mm_segment_t fs;
307 unsigned len;
308 int r;
309 int call_again_soon = 0;
310
311 down_read(&con->sock_sem);
312
313 if (con->sock == NULL)
314 goto out;
315 if (con->rx_page == NULL) {
316 /*
317 * This doesn't need to be atomic, but I think it should
318 * improve performance if it is.
319 */
320 con->rx_page = alloc_page(GFP_ATOMIC);
321 if (con->rx_page == NULL)
322 goto out_resched;
323 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
324 }
325
326 msg.msg_control = NULL;
327 msg.msg_controllen = 0;
328 msg.msg_iovlen = 1;
329 msg.msg_iov = iov;
330 msg.msg_name = NULL;
331 msg.msg_namelen = 0;
332 msg.msg_flags = 0;
333
334 /*
335 * iov[0] is the bit of the circular buffer between the current end
336 * point (cb.base + cb.len) and the end of the buffer.
337 */
338 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
339 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
340 iov[1].iov_len = 0;
341
342 /*
343 * iov[1] is the bit of the circular buffer between the start of the
344 * buffer and the start of the currently used section (cb.base)
345 */
346 if (cbuf_data(&con->cb) >= con->cb.base) {
347 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
348 iov[1].iov_len = con->cb.base;
349 iov[1].iov_base = page_address(con->rx_page);
350 msg.msg_iovlen = 2;
351 }
352 len = iov[0].iov_len + iov[1].iov_len;
353
354 fs = get_fs();
355 set_fs(get_ds());
356 r = ret = sock_recvmsg(con->sock, &msg, len,
357 MSG_DONTWAIT | MSG_NOSIGNAL);
358 set_fs(fs);
359
360 if (ret <= 0)
361 goto out_close;
362 if (ret == len)
363 call_again_soon = 1;
364 cbuf_add(&con->cb, ret);
365 ret = dlm_process_incoming_buffer(con->nodeid,
366 page_address(con->rx_page),
367 con->cb.base, con->cb.len,
368 PAGE_CACHE_SIZE);
369 if (ret == -EBADMSG) {
370 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
371 "iov_len=%u, iov_base[0]=%p, read=%d\n",
372 page_address(con->rx_page), con->cb.base, con->cb.len,
373 len, iov[0].iov_base, r);
374 }
375 if (ret < 0)
376 goto out_close;
377 cbuf_eat(&con->cb, ret);
378
379 if (cbuf_empty(&con->cb) && !call_again_soon) {
380 __free_page(con->rx_page);
381 con->rx_page = NULL;
382 }
383
384out:
385 if (call_again_soon)
386 goto out_resched;
387 up_read(&con->sock_sem);
388 return 0;
389
390out_resched:
391 lowcomms_data_ready(con->sock->sk, 0);
392 up_read(&con->sock_sem);
393 cond_resched();
394 return 0;
395
396out_close:
397 up_read(&con->sock_sem);
398 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
399 close_connection(con, false);
400 /* Reconnect when there is something to send */
401 }
402
403 return ret;
404}
405
406/* Listening socket is busy, accept a connection */
407static int accept_from_sock(struct connection *con)
408{
409 int result;
410 struct sockaddr_storage peeraddr;
411 struct socket *newsock;
412 int len;
413 int nodeid;
414 struct connection *newcon;
415
416 memset(&peeraddr, 0, sizeof(peeraddr));
417 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
418 IPPROTO_TCP, &newsock);
419 if (result < 0)
420 return -ENOMEM;
421
422 down_read(&con->sock_sem);
423
424 result = -ENOTCONN;
425 if (con->sock == NULL)
426 goto accept_err;
427
428 newsock->type = con->sock->type;
429 newsock->ops = con->sock->ops;
430
431 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
432 if (result < 0)
433 goto accept_err;
434
435 /* Get the connected socket's peer */
436 memset(&peeraddr, 0, sizeof(peeraddr));
437 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
438 &len, 2)) {
439 result = -ECONNABORTED;
440 goto accept_err;
441 }
442
443 /* Get the new node's NODEID */
444 make_sockaddr(&peeraddr, 0, &len);
445 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
446 printk("dlm: connect from non cluster node\n");
447 sock_release(newsock);
448 up_read(&con->sock_sem);
449 return -1;
450 }
451
452 log_print("got connection from %d", nodeid);
453
454 /* Check to see if we already have a connection to this node. This
455 * could happen if the two nodes initiate a connection at roughly
456 * the same time and the connections cross on the wire.
457 * TEMPORARY FIX:
458 * In this case we store the incoming one in "othercon"
459 */
460 newcon = nodeid2con(nodeid, GFP_KERNEL);
461 if (!newcon) {
462 result = -ENOMEM;
463 goto accept_err;
464 }
465 down_write(&newcon->sock_sem);
466 if (newcon->sock) {
467 struct connection *othercon = newcon->othercon;
468
469 if (!othercon) {
470 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
471 if (!othercon) {
472 printk("dlm: failed to allocate incoming socket\n");
473 up_write(&newcon->sock_sem);
474 result = -ENOMEM;
475 goto accept_err;
476 }
477 othercon->nodeid = nodeid;
478 othercon->rx_action = receive_from_sock;
479 init_rwsem(&othercon->sock_sem);
480 set_bit(CF_IS_OTHERCON, &othercon->flags);
481 newcon->othercon = othercon;
482 }
483 othercon->sock = newsock;
484 newsock->sk->sk_user_data = othercon;
485 add_sock(newsock, othercon);
486 }
487 else {
488 newsock->sk->sk_user_data = newcon;
489 newcon->rx_action = receive_from_sock;
490 add_sock(newsock, newcon);
491
492 }
493
494 up_write(&newcon->sock_sem);
495
496 /*
497 * Add it to the active queue in case we got data
498 * beween processing the accept adding the socket
499 * to the read_sockets list
500 */
501 lowcomms_data_ready(newsock->sk, 0);
502 up_read(&con->sock_sem);
503
504 return 0;
505
506accept_err:
507 up_read(&con->sock_sem);
508 sock_release(newsock);
509
510 if (result != -EAGAIN)
511 printk("dlm: error accepting connection from node: %d\n", result);
512 return result;
513}
514
515/* Connect a new socket to its peer */
516static void connect_to_sock(struct connection *con)
517{
518 int result = -EHOSTUNREACH;
519 struct sockaddr_storage saddr;
520 int addr_len;
521 struct socket *sock;
522
523 if (con->nodeid == 0) {
524 log_print("attempt to connect sock 0 foiled");
525 return;
526 }
527
528 down_write(&con->sock_sem);
529 if (con->retries++ > MAX_CONNECT_RETRIES)
530 goto out;
531
532 /* Some odd races can cause double-connects, ignore them */
533 if (con->sock) {
534 result = 0;
535 goto out;
536 }
537
538 /* Create a socket to communicate with */
539 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
540 IPPROTO_TCP, &sock);
541 if (result < 0)
542 goto out_err;
543
544 memset(&saddr, 0, sizeof(saddr));
545 if (dlm_nodeid_to_addr(con->nodeid, &saddr))
546 goto out_err;
547
548 sock->sk->sk_user_data = con;
549 con->rx_action = receive_from_sock;
550
551 make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len);
552
553 add_sock(sock, con);
554
555 log_print("connecting to %d", con->nodeid);
556 result =
557 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
558 O_NONBLOCK);
559 if (result == -EINPROGRESS)
560 result = 0;
561 if (result == 0)
562 goto out;
563
564out_err:
565 if (con->sock) {
566 sock_release(con->sock);
567 con->sock = NULL;
568 }
569 /*
570 * Some errors are fatal and this list might need adjusting. For other
571 * errors we try again until the max number of retries is reached.
572 */
573 if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
574 result != -ENETDOWN && result != EINVAL
575 && result != -EPROTONOSUPPORT) {
576 lowcomms_connect_sock(con);
577 result = 0;
578 }
579out:
580 up_write(&con->sock_sem);
581 return;
582}
583
584static struct socket *create_listen_sock(struct connection *con,
585 struct sockaddr_storage *saddr)
586{
587 struct socket *sock = NULL;
588 mm_segment_t fs;
589 int result = 0;
590 int one = 1;
591 int addr_len;
592
593 if (dlm_local_addr.ss_family == AF_INET)
594 addr_len = sizeof(struct sockaddr_in);
595 else
596 addr_len = sizeof(struct sockaddr_in6);
597
598 /* Create a socket to communicate with */
599 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
600 if (result < 0) {
601 printk("dlm: Can't create listening comms socket\n");
602 goto create_out;
603 }
604
605 fs = get_fs();
606 set_fs(get_ds());
607 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
608 (char *)&one, sizeof(one));
609 set_fs(fs);
610 if (result < 0) {
611 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
612 result);
613 }
614 sock->sk->sk_user_data = con;
615 con->rx_action = accept_from_sock;
616 con->sock = sock;
617
618 /* Bind to our port */
619 make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
620 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
621 if (result < 0) {
622 printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
623 sock_release(sock);
624 sock = NULL;
625 con->sock = NULL;
626 goto create_out;
627 }
628
629 fs = get_fs();
630 set_fs(get_ds());
631
632 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
633 (char *)&one, sizeof(one));
634 set_fs(fs);
635 if (result < 0) {
636 printk("dlm: Set keepalive failed: %d\n", result);
637 }
638
639 result = sock->ops->listen(sock, 5);
640 if (result < 0) {
641 printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
642 sock_release(sock);
643 sock = NULL;
644 goto create_out;
645 }
646
647create_out:
648 return sock;
649}
650
651
652/* Listen on all interfaces */
653static int listen_for_all(void)
654{
655 struct socket *sock = NULL;
656 struct connection *con = nodeid2con(0, GFP_KERNEL);
657 int result = -EINVAL;
658
659 /* We don't support multi-homed hosts */
660 set_bit(CF_IS_OTHERCON, &con->flags);
661
662 sock = create_listen_sock(con, &dlm_local_addr);
663 if (sock) {
664 add_sock(sock, con);
665 result = 0;
666 }
667 else {
668 result = -EADDRINUSE;
669 }
670
671 return result;
672}
673
674
675
676static struct writequeue_entry *new_writequeue_entry(struct connection *con,
677 gfp_t allocation)
678{
679 struct writequeue_entry *entry;
680
681 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
682 if (!entry)
683 return NULL;
684
685 entry->page = alloc_page(allocation);
686 if (!entry->page) {
687 kfree(entry);
688 return NULL;
689 }
690
691 entry->offset = 0;
692 entry->len = 0;
693 entry->end = 0;
694 entry->users = 0;
695 entry->con = con;
696
697 return entry;
698}
699
700void *dlm_lowcomms_get_buffer(int nodeid, int len,
701 gfp_t allocation, char **ppc)
702{
703 struct connection *con;
704 struct writequeue_entry *e;
705 int offset = 0;
706 int users = 0;
707
708 con = nodeid2con(nodeid, allocation);
709 if (!con)
710 return NULL;
711
712 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
713 if ((&e->list == &con->writequeue) ||
714 (PAGE_CACHE_SIZE - e->end < len)) {
715 e = NULL;
716 } else {
717 offset = e->end;
718 e->end += len;
719 users = e->users++;
720 }
721 spin_unlock(&con->writequeue_lock);
722
723 if (e) {
724 got_one:
725 if (users == 0)
726 kmap(e->page);
727 *ppc = page_address(e->page) + offset;
728 return e;
729 }
730
731 e = new_writequeue_entry(con, allocation);
732 if (e) {
733 spin_lock(&con->writequeue_lock);
734 offset = e->end;
735 e->end += len;
736 users = e->users++;
737 list_add_tail(&e->list, &con->writequeue);
738 spin_unlock(&con->writequeue_lock);
739 goto got_one;
740 }
741 return NULL;
742}
743
744void dlm_lowcomms_commit_buffer(void *mh)
745{
746 struct writequeue_entry *e = (struct writequeue_entry *)mh;
747 struct connection *con = e->con;
748 int users;
749
750 users = --e->users;
751 if (users)
752 goto out;
753 e->len = e->end - e->offset;
754 kunmap(e->page);
755 spin_unlock(&con->writequeue_lock);
756
757 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
758 spin_lock_bh(&write_sockets_lock);
759 list_add_tail(&con->write_list, &write_sockets);
760 spin_unlock_bh(&write_sockets_lock);
761
762 wake_up_interruptible(&lowcomms_send_waitq);
763 }
764 return;
765
766out:
767 spin_unlock(&con->writequeue_lock);
768 return;
769}
770
771static void free_entry(struct writequeue_entry *e)
772{
773 __free_page(e->page);
774 kfree(e);
775}
776
777/* Send a message */
778static void send_to_sock(struct connection *con)
779{
780 int ret = 0;
781 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
782 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
783 struct writequeue_entry *e;
784 int len, offset;
785
786 down_read(&con->sock_sem);
787 if (con->sock == NULL)
788 goto out_connect;
789
790 sendpage = con->sock->ops->sendpage;
791
792 spin_lock(&con->writequeue_lock);
793 for (;;) {
794 e = list_entry(con->writequeue.next, struct writequeue_entry,
795 list);
796 if ((struct list_head *) e == &con->writequeue)
797 break;
798
799 len = e->len;
800 offset = e->offset;
801 BUG_ON(len == 0 && e->users == 0);
802 spin_unlock(&con->writequeue_lock);
803
804 ret = 0;
805 if (len) {
806 ret = sendpage(con->sock, e->page, offset, len,
807 msg_flags);
808 if (ret == -EAGAIN || ret == 0)
809 goto out;
810 if (ret <= 0)
811 goto send_error;
812 }
813 else {
814 /* Don't starve people filling buffers */
815 cond_resched();
816 }
817
818 spin_lock(&con->writequeue_lock);
819 e->offset += ret;
820 e->len -= ret;
821
822 if (e->len == 0 && e->users == 0) {
823 list_del(&e->list);
824 kunmap(e->page);
825 free_entry(e);
826 continue;
827 }
828 }
829 spin_unlock(&con->writequeue_lock);
830out:
831 up_read(&con->sock_sem);
832 return;
833
834send_error:
835 up_read(&con->sock_sem);
836 close_connection(con, false);
837 lowcomms_connect_sock(con);
838 return;
839
840out_connect:
841 up_read(&con->sock_sem);
842 lowcomms_connect_sock(con);
843 return;
844}
845
846static void clean_one_writequeue(struct connection *con)
847{
848 struct list_head *list;
849 struct list_head *temp;
850
851 spin_lock(&con->writequeue_lock);
852 list_for_each_safe(list, temp, &con->writequeue) {
853 struct writequeue_entry *e =
854 list_entry(list, struct writequeue_entry, list);
855 list_del(&e->list);
856 free_entry(e);
857 }
858 spin_unlock(&con->writequeue_lock);
859}
860
861/* Called from recovery when it knows that a node has
862 left the cluster */
863int dlm_lowcomms_close(int nodeid)
864{
865 struct connection *con;
866
867 if (!connections)
868 goto out;
869
870 log_print("closing connection to node %d", nodeid);
871 con = nodeid2con(nodeid, 0);
872 if (con) {
873 clean_one_writequeue(con);
874 close_connection(con, true);
875 atomic_set(&con->waiting_requests, 0);
876 }
877 return 0;
878
879out:
880 return -1;
881}
882
883/* API send message call, may queue the request */
884/* N.B. This is the old interface - use the new one for new calls */
885int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
886{
887 struct writequeue_entry *e;
888 char *b;
889
890 e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
891 if (e) {
892 memcpy(b, buf, len);
893 dlm_lowcomms_commit_buffer(e);
894 return 0;
895 }
896 return -ENOBUFS;
897}
898
899/* Look for activity on active sockets */
900static void process_sockets(void)
901{
902 struct list_head *list;
903 struct list_head *temp;
904 int count = 0;
905
906 spin_lock_bh(&read_sockets_lock);
907 list_for_each_safe(list, temp, &read_sockets) {
908
909 struct connection *con =
910 list_entry(list, struct connection, read_list);
911 list_del(&con->read_list);
912 clear_bit(CF_READ_PENDING, &con->flags);
913
914 spin_unlock_bh(&read_sockets_lock);
915
916 /* This can reach zero if we are processing requests
917 * as they come in.
918 */
919 if (atomic_read(&con->waiting_requests) == 0) {
920 spin_lock_bh(&read_sockets_lock);
921 continue;
922 }
923
924 do {
925 con->rx_action(con);
926
927 /* Don't starve out everyone else */
928 if (++count >= MAX_RX_MSG_COUNT) {
929 cond_resched();
930 count = 0;
931 }
932
933 } while (!atomic_dec_and_test(&con->waiting_requests) &&
934 !kthread_should_stop());
935
936 spin_lock_bh(&read_sockets_lock);
937 }
938 spin_unlock_bh(&read_sockets_lock);
939}
940
941/* Try to send any messages that are pending
942 */
943static void process_output_queue(void)
944{
945 struct list_head *list;
946 struct list_head *temp;
947
948 spin_lock_bh(&write_sockets_lock);
949 list_for_each_safe(list, temp, &write_sockets) {
950 struct connection *con =
951 list_entry(list, struct connection, write_list);
952 clear_bit(CF_WRITE_PENDING, &con->flags);
953 list_del(&con->write_list);
954
955 spin_unlock_bh(&write_sockets_lock);
956 send_to_sock(con);
957 spin_lock_bh(&write_sockets_lock);
958 }
959 spin_unlock_bh(&write_sockets_lock);
960}
961
962static void process_state_queue(void)
963{
964 struct list_head *list;
965 struct list_head *temp;
966
967 spin_lock_bh(&state_sockets_lock);
968 list_for_each_safe(list, temp, &state_sockets) {
969 struct connection *con =
970 list_entry(list, struct connection, state_list);
971 list_del(&con->state_list);
972 clear_bit(CF_CONNECT_PENDING, &con->flags);
973 spin_unlock_bh(&state_sockets_lock);
974
975 connect_to_sock(con);
976 spin_lock_bh(&state_sockets_lock);
977 }
978 spin_unlock_bh(&state_sockets_lock);
979}
980
981
982/* Discard all entries on the write queues */
983static void clean_writequeues(void)
984{
985 int nodeid;
986
987 for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
988 struct connection *con = nodeid2con(nodeid, 0);
989
990 if (con)
991 clean_one_writequeue(con);
992 }
993}
994
995static int read_list_empty(void)
996{
997 int status;
998
999 spin_lock_bh(&read_sockets_lock);
1000 status = list_empty(&read_sockets);
1001 spin_unlock_bh(&read_sockets_lock);
1002
1003 return status;
1004}
1005
1006/* DLM Transport comms receive daemon */
1007static int dlm_recvd(void *data)
1008{
1009 init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1010 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1011
1012 while (!kthread_should_stop()) {
1013 set_current_state(TASK_INTERRUPTIBLE);
1014 if (read_list_empty())
1015 cond_resched();
1016 set_current_state(TASK_RUNNING);
1017
1018 process_sockets();
1019 }
1020
1021 return 0;
1022}
1023
1024static int write_and_state_lists_empty(void)
1025{
1026 int status;
1027
1028 spin_lock_bh(&write_sockets_lock);
1029 status = list_empty(&write_sockets);
1030 spin_unlock_bh(&write_sockets_lock);
1031
1032 spin_lock_bh(&state_sockets_lock);
1033 if (list_empty(&state_sockets) == 0)
1034 status = 0;
1035 spin_unlock_bh(&state_sockets_lock);
1036
1037 return status;
1038}
1039
1040/* DLM Transport send daemon */
1041static int dlm_sendd(void *data)
1042{
1043 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1044 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1045
1046 while (!kthread_should_stop()) {
1047 set_current_state(TASK_INTERRUPTIBLE);
1048 if (write_and_state_lists_empty())
1049 cond_resched();
1050 set_current_state(TASK_RUNNING);
1051
1052 process_state_queue();
1053 process_output_queue();
1054 }
1055
1056 return 0;
1057}
1058
1059static void daemons_stop(void)
1060{
1061 kthread_stop(recv_task);
1062 kthread_stop(send_task);
1063}
1064
1065static int daemons_start(void)
1066{
1067 struct task_struct *p;
1068 int error;
1069
1070 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1071 error = IS_ERR(p);
1072 if (error) {
1073 log_print("can't start dlm_recvd %d", error);
1074 return error;
1075 }
1076 recv_task = p;
1077
1078 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1079 error = IS_ERR(p);
1080 if (error) {
1081 log_print("can't start dlm_sendd %d", error);
1082 kthread_stop(recv_task);
1083 return error;
1084 }
1085 send_task = p;
1086
1087 return 0;
1088}
1089
1090/*
1091 * Return the largest buffer size we can cope with.
1092 */
1093int lowcomms_max_buffer_size(void)
1094{
1095 return PAGE_CACHE_SIZE;
1096}
1097
1098void dlm_lowcomms_stop(void)
1099{
1100 int i;
1101
1102 /* Set all the flags to prevent any
1103 socket activity.
1104 */
1105 for (i = 0; i < conn_array_size; i++) {
1106 if (connections[i])
1107 connections[i]->flags |= 0xFF;
1108 }
1109
1110 daemons_stop();
1111 clean_writequeues();
1112
1113 for (i = 0; i < conn_array_size; i++) {
1114 if (connections[i]) {
1115 close_connection(connections[i], true);
1116 if (connections[i]->othercon)
1117 kmem_cache_free(con_cache, connections[i]->othercon);
1118 kmem_cache_free(con_cache, connections[i]);
1119 }
1120 }
1121
1122 kfree(connections);
1123 connections = NULL;
1124
1125 kmem_cache_destroy(con_cache);
1126}
1127
1128/* This is quite likely to sleep... */
1129int dlm_lowcomms_start(void)
1130{
1131 int error = 0;
1132
1133 error = -ENOMEM;
1134 connections = kzalloc(sizeof(struct connection *) *
1135 NODE_INCREMENT, GFP_KERNEL);
1136 if (!connections)
1137 goto out;
1138
1139 conn_array_size = NODE_INCREMENT;
1140
1141 if (dlm_our_addr(&dlm_local_addr, 0)) {
1142 log_print("no local IP address has been set");
1143 goto fail_free_conn;
1144 }
1145 if (!dlm_our_addr(&dlm_local_addr, 1)) {
1146 log_print("This dlm comms module does not support multi-homed clustering");
1147 goto fail_free_conn;
1148 }
1149
1150 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1151 __alignof__(struct connection), 0,
1152 NULL, NULL);
1153 if (!con_cache)
1154 goto fail_free_conn;
1155
1156
1157 /* Start listening */
1158 error = listen_for_all();
1159 if (error)
1160 goto fail_unlisten;
1161
1162 error = daemons_start();
1163 if (error)
1164 goto fail_unlisten;
1165
1166 return 0;
1167
1168fail_unlisten:
1169 close_connection(connections[0], false);
1170 kmem_cache_free(con_cache, connections[0]);
1171 kmem_cache_destroy(con_cache);
1172
1173fail_free_conn:
1174 kfree(connections);
1175
1176out:
1177 return error;
1178}
1179
1180/*
1181 * Overrides for Emacs so that we follow Linus's tabbing style.
1182 * Emacs will notice this stuff at the end of the file and automatically
1183 * adjust the settings for this buffer only. This must remain at the end
1184 * of the file.
1185 * ---------------------------------------------------------------------------
1186 * Local variables:
1187 * c-file-style: "linux"
1188 * End:
1189 */
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 2d045e0daae1..a9a9618c0d3f 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -14,8 +14,6 @@
14#ifndef __LOWCOMMS_DOT_H__ 14#ifndef __LOWCOMMS_DOT_H__
15#define __LOWCOMMS_DOT_H__ 15#define __LOWCOMMS_DOT_H__
16 16
17int dlm_lowcomms_init(void);
18void dlm_lowcomms_exit(void);
19int dlm_lowcomms_start(void); 17int dlm_lowcomms_start(void);
20void dlm_lowcomms_stop(void); 18void dlm_lowcomms_stop(void);
21int dlm_lowcomms_close(int nodeid); 19int dlm_lowcomms_close(int nodeid);
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index a8da8dc36b2e..162fbae58fe5 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -16,7 +16,6 @@
16#include "lock.h" 16#include "lock.h"
17#include "user.h" 17#include "user.h"
18#include "memory.h" 18#include "memory.h"
19#include "lowcomms.h"
20#include "config.h" 19#include "config.h"
21 20
22#ifdef CONFIG_DLM_DEBUG 21#ifdef CONFIG_DLM_DEBUG
@@ -47,20 +46,14 @@ static int __init init_dlm(void)
47 if (error) 46 if (error)
48 goto out_config; 47 goto out_config;
49 48
50 error = dlm_lowcomms_init();
51 if (error)
52 goto out_debug;
53
54 error = dlm_user_init(); 49 error = dlm_user_init();
55 if (error) 50 if (error)
56 goto out_lowcomms; 51 goto out_debug;
57 52
58 printk("DLM (built %s %s) installed\n", __DATE__, __TIME__); 53 printk("DLM (built %s %s) installed\n", __DATE__, __TIME__);
59 54
60 return 0; 55 return 0;
61 56
62 out_lowcomms:
63 dlm_lowcomms_exit();
64 out_debug: 57 out_debug:
65 dlm_unregister_debugfs(); 58 dlm_unregister_debugfs();
66 out_config: 59 out_config:
@@ -76,7 +69,6 @@ static int __init init_dlm(void)
76static void __exit exit_dlm(void) 69static void __exit exit_dlm(void)
77{ 70{
78 dlm_user_exit(); 71 dlm_user_exit();
79 dlm_lowcomms_exit();
80 dlm_config_exit(); 72 dlm_config_exit();
81 dlm_memory_exit(); 73 dlm_memory_exit();
82 dlm_lockspace_exit(); 74 dlm_lockspace_exit();
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index a3f7de7f3a8f..85e2897bd740 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -186,6 +186,14 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
186 struct dlm_member *memb, *safe; 186 struct dlm_member *memb, *safe;
187 int i, error, found, pos = 0, neg = 0, low = -1; 187 int i, error, found, pos = 0, neg = 0, low = -1;
188 188
189 /* previously removed members that we've not finished removing need to
190 count as a negative change so the "neg" recovery steps will happen */
191
192 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
193 log_debug(ls, "prev removed member %d", memb->nodeid);
194 neg++;
195 }
196
189 /* move departed members from ls_nodes to ls_nodes_gone */ 197 /* move departed members from ls_nodes to ls_nodes_gone */
190 198
191 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { 199 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 518239a8b1e9..4cc31be9cd9d 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -90,13 +90,28 @@ static int check_config(struct dlm_ls *ls, struct rcom_config *rf, int nodeid)
90 return 0; 90 return 0;
91} 91}
92 92
93static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
94{
95 spin_lock(&ls->ls_rcom_spin);
96 *new_seq = ++ls->ls_rcom_seq;
97 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
98 spin_unlock(&ls->ls_rcom_spin);
99}
100
101static void disallow_sync_reply(struct dlm_ls *ls)
102{
103 spin_lock(&ls->ls_rcom_spin);
104 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
105 clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
106 spin_unlock(&ls->ls_rcom_spin);
107}
108
93int dlm_rcom_status(struct dlm_ls *ls, int nodeid) 109int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
94{ 110{
95 struct dlm_rcom *rc; 111 struct dlm_rcom *rc;
96 struct dlm_mhandle *mh; 112 struct dlm_mhandle *mh;
97 int error = 0; 113 int error = 0;
98 114
99 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
100 ls->ls_recover_nodeid = nodeid; 115 ls->ls_recover_nodeid = nodeid;
101 116
102 if (nodeid == dlm_our_nodeid()) { 117 if (nodeid == dlm_our_nodeid()) {
@@ -108,12 +123,14 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
108 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh); 123 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
109 if (error) 124 if (error)
110 goto out; 125 goto out;
111 rc->rc_id = ++ls->ls_rcom_seq; 126
127 allow_sync_reply(ls, &rc->rc_id);
128 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
112 129
113 send_rcom(ls, mh, rc); 130 send_rcom(ls, mh, rc);
114 131
115 error = dlm_wait_function(ls, &rcom_response); 132 error = dlm_wait_function(ls, &rcom_response);
116 clear_bit(LSFL_RCOM_READY, &ls->ls_flags); 133 disallow_sync_reply(ls);
117 if (error) 134 if (error)
118 goto out; 135 goto out;
119 136
@@ -150,14 +167,21 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
150 167
151static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 168static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
152{ 169{
153 if (rc_in->rc_id != ls->ls_rcom_seq) { 170 spin_lock(&ls->ls_rcom_spin);
154 log_debug(ls, "reject old reply %d got %llx wanted %llx", 171 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
155 rc_in->rc_type, rc_in->rc_id, ls->ls_rcom_seq); 172 rc_in->rc_id != ls->ls_rcom_seq) {
156 return; 173 log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
174 rc_in->rc_type, rc_in->rc_header.h_nodeid,
175 (unsigned long long)rc_in->rc_id,
176 (unsigned long long)ls->ls_rcom_seq);
177 goto out;
157 } 178 }
158 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length); 179 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
159 set_bit(LSFL_RCOM_READY, &ls->ls_flags); 180 set_bit(LSFL_RCOM_READY, &ls->ls_flags);
181 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
160 wake_up(&ls->ls_wait_general); 182 wake_up(&ls->ls_wait_general);
183 out:
184 spin_unlock(&ls->ls_rcom_spin);
161} 185}
162 186
163static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 187static void receive_rcom_status_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
@@ -171,7 +195,6 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
171 struct dlm_mhandle *mh; 195 struct dlm_mhandle *mh;
172 int error = 0, len = sizeof(struct dlm_rcom); 196 int error = 0, len = sizeof(struct dlm_rcom);
173 197
174 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
175 ls->ls_recover_nodeid = nodeid; 198 ls->ls_recover_nodeid = nodeid;
176 199
177 if (nodeid == dlm_our_nodeid()) { 200 if (nodeid == dlm_our_nodeid()) {
@@ -185,12 +208,14 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
185 if (error) 208 if (error)
186 goto out; 209 goto out;
187 memcpy(rc->rc_buf, last_name, last_len); 210 memcpy(rc->rc_buf, last_name, last_len);
188 rc->rc_id = ++ls->ls_rcom_seq; 211
212 allow_sync_reply(ls, &rc->rc_id);
213 memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
189 214
190 send_rcom(ls, mh, rc); 215 send_rcom(ls, mh, rc);
191 216
192 error = dlm_wait_function(ls, &rcom_response); 217 error = dlm_wait_function(ls, &rcom_response);
193 clear_bit(LSFL_RCOM_READY, &ls->ls_flags); 218 disallow_sync_reply(ls);
194 out: 219 out:
195 return error; 220 return error;
196} 221}
@@ -370,9 +395,10 @@ static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
370static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) 395static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
371{ 396{
372 struct dlm_rcom *rc; 397 struct dlm_rcom *rc;
398 struct rcom_config *rf;
373 struct dlm_mhandle *mh; 399 struct dlm_mhandle *mh;
374 char *mb; 400 char *mb;
375 int mb_len = sizeof(struct dlm_rcom); 401 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
376 402
377 mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_KERNEL, &mb); 403 mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_KERNEL, &mb);
378 if (!mh) 404 if (!mh)
@@ -391,6 +417,9 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
391 rc->rc_id = rc_in->rc_id; 417 rc->rc_id = rc_in->rc_id;
392 rc->rc_result = -ESRCH; 418 rc->rc_result = -ESRCH;
393 419
420 rf = (struct rcom_config *) rc->rc_buf;
421 rf->rf_lvblen = -1;
422
394 dlm_rcom_out(rc); 423 dlm_rcom_out(rc);
395 dlm_lowcomms_commit_buffer(mh); 424 dlm_lowcomms_commit_buffer(mh);
396 425
@@ -412,9 +441,10 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
412 441
413 ls = dlm_find_lockspace_global(hd->h_lockspace); 442 ls = dlm_find_lockspace_global(hd->h_lockspace);
414 if (!ls) { 443 if (!ls) {
415 log_print("lockspace %x from %d not found", 444 log_print("lockspace %x from %d type %x not found",
416 hd->h_lockspace, nodeid); 445 hd->h_lockspace, nodeid, rc->rc_type);
417 send_ls_not_ready(nodeid, rc); 446 if (rc->rc_type == DLM_RCOM_STATUS)
447 send_ls_not_ready(nodeid, rc);
418 return; 448 return;
419 } 449 }
420 450
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index a5e6d184872e..cf9f6831bab5 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -252,6 +252,7 @@ static void recover_list_clear(struct dlm_ls *ls)
252 spin_lock(&ls->ls_recover_list_lock); 252 spin_lock(&ls->ls_recover_list_lock);
253 list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) { 253 list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
254 list_del_init(&r->res_recover_list); 254 list_del_init(&r->res_recover_list);
255 r->res_recover_locks_count = 0;
255 dlm_put_rsb(r); 256 dlm_put_rsb(r);
256 ls->ls_recover_list_count--; 257 ls->ls_recover_list_count--;
257 } 258 }
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 362e3eff4dc9..650536aa5139 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -45,7 +45,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
45 unsigned long start; 45 unsigned long start;
46 int error, neg = 0; 46 int error, neg = 0;
47 47
48 log_debug(ls, "recover %llx", rv->seq); 48 log_debug(ls, "recover %llx", (unsigned long long)rv->seq);
49 49
50 mutex_lock(&ls->ls_recoverd_active); 50 mutex_lock(&ls->ls_recoverd_active);
51 51
@@ -94,14 +94,6 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
94 } 94 }
95 95
96 /* 96 /*
97 * Purge directory-related requests that are saved in requestqueue.
98 * All dir requests from before recovery are invalid now due to the dir
99 * rebuild and will be resent by the requesting nodes.
100 */
101
102 dlm_purge_requestqueue(ls);
103
104 /*
105 * Wait for all nodes to complete directory rebuild. 97 * Wait for all nodes to complete directory rebuild.
106 */ 98 */
107 99
@@ -164,10 +156,31 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
164 */ 156 */
165 157
166 dlm_recover_rsbs(ls); 158 dlm_recover_rsbs(ls);
159 } else {
160 /*
161 * Other lockspace members may be going through the "neg" steps
162 * while also adding us to the lockspace, in which case they'll
163 * be doing the recover_locks (RS_LOCKS) barrier.
164 */
165 dlm_set_recover_status(ls, DLM_RS_LOCKS);
166
167 error = dlm_recover_locks_wait(ls);
168 if (error) {
169 log_error(ls, "recover_locks_wait failed %d", error);
170 goto fail;
171 }
167 } 172 }
168 173
169 dlm_release_root_list(ls); 174 dlm_release_root_list(ls);
170 175
176 /*
177 * Purge directory-related requests that are saved in requestqueue.
178 * All dir requests from before recovery are invalid now due to the dir
179 * rebuild and will be resent by the requesting nodes.
180 */
181
182 dlm_purge_requestqueue(ls);
183
171 dlm_set_recover_status(ls, DLM_RS_DONE); 184 dlm_set_recover_status(ls, DLM_RS_DONE);
172 error = dlm_recover_done_wait(ls); 185 error = dlm_recover_done_wait(ls);
173 if (error) { 186 if (error) {
@@ -199,7 +212,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
199 212
200 dlm_astd_wake(); 213 dlm_astd_wake();
201 214
202 log_debug(ls, "recover %llx done: %u ms", rv->seq, 215 log_debug(ls, "recover %llx done: %u ms",
216 (unsigned long long)rv->seq,
203 jiffies_to_msecs(jiffies - start)); 217 jiffies_to_msecs(jiffies - start));
204 mutex_unlock(&ls->ls_recoverd_active); 218 mutex_unlock(&ls->ls_recoverd_active);
205 219
@@ -207,11 +221,16 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
207 221
208 fail: 222 fail:
209 dlm_release_root_list(ls); 223 dlm_release_root_list(ls);
210 log_debug(ls, "recover %llx error %d", rv->seq, error); 224 log_debug(ls, "recover %llx error %d",
225 (unsigned long long)rv->seq, error);
211 mutex_unlock(&ls->ls_recoverd_active); 226 mutex_unlock(&ls->ls_recoverd_active);
212 return error; 227 return error;
213} 228}
214 229
230/* The dlm_ls_start() that created the rv we take here may already have been
231 stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
232 flag set. */
233
215static void do_ls_recovery(struct dlm_ls *ls) 234static void do_ls_recovery(struct dlm_ls *ls)
216{ 235{
217 struct dlm_recover *rv = NULL; 236 struct dlm_recover *rv = NULL;
@@ -219,7 +238,8 @@ static void do_ls_recovery(struct dlm_ls *ls)
219 spin_lock(&ls->ls_recover_lock); 238 spin_lock(&ls->ls_recover_lock);
220 rv = ls->ls_recover_args; 239 rv = ls->ls_recover_args;
221 ls->ls_recover_args = NULL; 240 ls->ls_recover_args = NULL;
222 clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 241 if (rv && ls->ls_recover_seq == rv->seq)
242 clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
223 spin_unlock(&ls->ls_recover_lock); 243 spin_unlock(&ls->ls_recover_lock);
224 244
225 if (rv) { 245 if (rv) {
diff --git a/fs/dlm/requestqueue.c b/fs/dlm/requestqueue.c
index 7b2b089634a2..65008d79c96d 100644
--- a/fs/dlm/requestqueue.c
+++ b/fs/dlm/requestqueue.c
@@ -30,26 +30,36 @@ struct rq_entry {
30 * lockspace is enabled on some while still suspended on others. 30 * lockspace is enabled on some while still suspended on others.
31 */ 31 */
32 32
33void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd) 33int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd)
34{ 34{
35 struct rq_entry *e; 35 struct rq_entry *e;
36 int length = hd->h_length; 36 int length = hd->h_length;
37 37 int rv = 0;
38 if (dlm_is_removed(ls, nodeid))
39 return;
40 38
41 e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL); 39 e = kmalloc(sizeof(struct rq_entry) + length, GFP_KERNEL);
42 if (!e) { 40 if (!e) {
43 log_print("dlm_add_requestqueue: out of memory\n"); 41 log_print("dlm_add_requestqueue: out of memory\n");
44 return; 42 return 0;
45 } 43 }
46 44
47 e->nodeid = nodeid; 45 e->nodeid = nodeid;
48 memcpy(e->request, hd, length); 46 memcpy(e->request, hd, length);
49 47
48 /* We need to check dlm_locking_stopped() after taking the mutex to
49 avoid a race where dlm_recoverd enables locking and runs
50 process_requestqueue between our earlier dlm_locking_stopped check
51 and this addition to the requestqueue. */
52
50 mutex_lock(&ls->ls_requestqueue_mutex); 53 mutex_lock(&ls->ls_requestqueue_mutex);
51 list_add_tail(&e->list, &ls->ls_requestqueue); 54 if (dlm_locking_stopped(ls))
55 list_add_tail(&e->list, &ls->ls_requestqueue);
56 else {
57 log_debug(ls, "dlm_add_requestqueue skip from %d", nodeid);
58 kfree(e);
59 rv = -EAGAIN;
60 }
52 mutex_unlock(&ls->ls_requestqueue_mutex); 61 mutex_unlock(&ls->ls_requestqueue_mutex);
62 return rv;
53} 63}
54 64
55int dlm_process_requestqueue(struct dlm_ls *ls) 65int dlm_process_requestqueue(struct dlm_ls *ls)
@@ -120,6 +130,10 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
120{ 130{
121 uint32_t type = ms->m_type; 131 uint32_t type = ms->m_type;
122 132
133 /* the ls is being cleaned up and freed by release_lockspace */
134 if (!ls->ls_count)
135 return 1;
136
123 if (dlm_is_removed(ls, nodeid)) 137 if (dlm_is_removed(ls, nodeid))
124 return 1; 138 return 1;
125 139
diff --git a/fs/dlm/requestqueue.h b/fs/dlm/requestqueue.h
index 349f0d292d95..6a53ea03335d 100644
--- a/fs/dlm/requestqueue.h
+++ b/fs/dlm/requestqueue.h
@@ -13,7 +13,7 @@
13#ifndef __REQUESTQUEUE_DOT_H__ 13#ifndef __REQUESTQUEUE_DOT_H__
14#define __REQUESTQUEUE_DOT_H__ 14#define __REQUESTQUEUE_DOT_H__
15 15
16void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd); 16int dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_header *hd);
17int dlm_process_requestqueue(struct dlm_ls *ls); 17int dlm_process_requestqueue(struct dlm_ls *ls);
18void dlm_wait_requestqueue(struct dlm_ls *ls); 18void dlm_wait_requestqueue(struct dlm_ls *ls);
19void dlm_purge_requestqueue(struct dlm_ls *ls); 19void dlm_purge_requestqueue(struct dlm_ls *ls);