aboutsummaryrefslogtreecommitdiffstats
path: root/fs/dlm
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@woody.linux-foundation.org>2007-05-07 15:26:27 -0400
committerLinus Torvalds <torvalds@woody.linux-foundation.org>2007-05-07 15:26:27 -0400
commit5cefcab3db2b13093480f2a42bf081574dd72d3d (patch)
treec3755a241553436a1b84d65ad3c00f77ce6d02ad /fs/dlm
parent5f757f91e70a97eda8f0cc13bddc853209b2d173 (diff)
parent37fde8ca6c60ea61f5e9d7cb877c25ac60e74167 (diff)
Merge git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw
* git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw: (34 commits) [GFS2] Uncomment sprintf_symbol calling code [DLM] lowcomms style [GFS2] printk warning fixes [GFS2] Patch to fix mmap of stuffed files [GFS2] use lib/parser for parsing mount options [DLM] Lowcomms nodeid range & initialisation fixes [DLM] Fix dlm_lowcoms_stop hang [DLM] fix mode munging [GFS2] lockdump improvements [GFS2] Patch to detect corrupt number of dir entries in leaf and/or inode blocks [GFS2] bz 236008: Kernel gpf doing cat /debugfs/gfs2/xxx (lock dump) [DLM] fs/dlm/ast.c should #include "ast.h" [DLM] Consolidate transport protocols [DLM] Remove redundant assignment [GFS2] Fix bz 234168 (ignoring rgrp flags) [DLM] change lkid format [DLM] interface for purge (2/2) [DLM] add orphan purging code (1/2) [DLM] split create_message function [GFS2] Set drop_count to 0 (off) by default ...
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/Kconfig31
-rw-r--r--fs/dlm/Makefile6
-rw-r--r--fs/dlm/ast.c1
-rw-r--r--fs/dlm/config.c10
-rw-r--r--fs/dlm/config.h3
-rw-r--r--fs/dlm/dlm_internal.h11
-rw-r--r--fs/dlm/lock.c955
-rw-r--r--fs/dlm/lock.h2
-rw-r--r--fs/dlm/lockspace.c4
-rw-r--r--fs/dlm/lowcomms-sctp.c1210
-rw-r--r--fs/dlm/lowcomms.c (renamed from fs/dlm/lowcomms-tcp.c)788
-rw-r--r--fs/dlm/user.c163
12 files changed, 1508 insertions, 1676 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig
index 6fa7b0d5c043..69a94690e493 100644
--- a/fs/dlm/Kconfig
+++ b/fs/dlm/Kconfig
@@ -3,36 +3,19 @@ menu "Distributed Lock Manager"
3 3
4config DLM 4config DLM
5 tristate "Distributed Lock Manager (DLM)" 5 tristate "Distributed Lock Manager (DLM)"
6 depends on SYSFS && (IPV6 || IPV6=n) 6 depends on IPV6 || IPV6=n
7 select CONFIGFS_FS 7 select CONFIGFS_FS
8 select IP_SCTP if DLM_SCTP 8 select IP_SCTP
9 help 9 help
10 A general purpose distributed lock manager for kernel or userspace 10 A general purpose distributed lock manager for kernel or userspace
11 applications. 11 applications.
12
13choice
14 prompt "Select DLM communications protocol"
15 depends on DLM
16 default DLM_TCP
17 help
18 The DLM Can use TCP or SCTP for it's network communications.
19 SCTP supports multi-homed operations whereas TCP doesn't.
20 However, SCTP seems to have stability problems at the moment.
21
22config DLM_TCP
23 bool "TCP/IP"
24
25config DLM_SCTP
26 bool "SCTP"
27
28endchoice
29 12
30config DLM_DEBUG 13config DLM_DEBUG
31 bool "DLM debugging" 14 bool "DLM debugging"
32 depends on DLM 15 depends on DLM
33 help 16 help
34 Under the debugfs mount point, the name of each lockspace will 17 Under the debugfs mount point, the name of each lockspace will
35 appear as a file in the "dlm" directory. The output is the 18 appear as a file in the "dlm" directory. The output is the
36 list of resource and locks the local node knows about. 19 list of resource and locks the local node knows about.
37 20
38endmenu 21endmenu
diff --git a/fs/dlm/Makefile b/fs/dlm/Makefile
index 65388944eba0..604cf7dc5f39 100644
--- a/fs/dlm/Makefile
+++ b/fs/dlm/Makefile
@@ -8,14 +8,12 @@ dlm-y := ast.o \
8 member.o \ 8 member.o \
9 memory.o \ 9 memory.o \
10 midcomms.o \ 10 midcomms.o \
11 lowcomms.o \
11 rcom.o \ 12 rcom.o \
12 recover.o \ 13 recover.o \
13 recoverd.o \ 14 recoverd.o \
14 requestqueue.o \ 15 requestqueue.o \
15 user.o \ 16 user.o \
16 util.o 17 util.o
17dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o 18dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o
18 19
19dlm-$(CONFIG_DLM_TCP) += lowcomms-tcp.o
20
21dlm-$(CONFIG_DLM_SCTP) += lowcomms-sctp.o \ No newline at end of file
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index f91d39cb1e0b..6308122890ca 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -14,6 +14,7 @@
14#include "dlm_internal.h" 14#include "dlm_internal.h"
15#include "lock.h" 15#include "lock.h"
16#include "user.h" 16#include "user.h"
17#include "ast.h"
17 18
18#define WAKE_ASTS 0 19#define WAKE_ASTS 0
19 20
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 8665c88e5af2..822abdcd1434 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -89,6 +89,7 @@ struct cluster {
89 unsigned int cl_toss_secs; 89 unsigned int cl_toss_secs;
90 unsigned int cl_scan_secs; 90 unsigned int cl_scan_secs;
91 unsigned int cl_log_debug; 91 unsigned int cl_log_debug;
92 unsigned int cl_protocol;
92}; 93};
93 94
94enum { 95enum {
@@ -101,6 +102,7 @@ enum {
101 CLUSTER_ATTR_TOSS_SECS, 102 CLUSTER_ATTR_TOSS_SECS,
102 CLUSTER_ATTR_SCAN_SECS, 103 CLUSTER_ATTR_SCAN_SECS,
103 CLUSTER_ATTR_LOG_DEBUG, 104 CLUSTER_ATTR_LOG_DEBUG,
105 CLUSTER_ATTR_PROTOCOL,
104}; 106};
105 107
106struct cluster_attribute { 108struct cluster_attribute {
@@ -159,6 +161,7 @@ CLUSTER_ATTR(recover_timer, 1);
159CLUSTER_ATTR(toss_secs, 1); 161CLUSTER_ATTR(toss_secs, 1);
160CLUSTER_ATTR(scan_secs, 1); 162CLUSTER_ATTR(scan_secs, 1);
161CLUSTER_ATTR(log_debug, 0); 163CLUSTER_ATTR(log_debug, 0);
164CLUSTER_ATTR(protocol, 0);
162 165
163static struct configfs_attribute *cluster_attrs[] = { 166static struct configfs_attribute *cluster_attrs[] = {
164 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, 167 [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
@@ -170,6 +173,7 @@ static struct configfs_attribute *cluster_attrs[] = {
170 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, 173 [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
171 [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, 174 [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
172 [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr, 175 [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr,
176 [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
173 NULL, 177 NULL,
174}; 178};
175 179
@@ -904,6 +908,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
904#define DEFAULT_TOSS_SECS 10 908#define DEFAULT_TOSS_SECS 10
905#define DEFAULT_SCAN_SECS 5 909#define DEFAULT_SCAN_SECS 5
906#define DEFAULT_LOG_DEBUG 0 910#define DEFAULT_LOG_DEBUG 0
911#define DEFAULT_PROTOCOL 0
907 912
908struct dlm_config_info dlm_config = { 913struct dlm_config_info dlm_config = {
909 .ci_tcp_port = DEFAULT_TCP_PORT, 914 .ci_tcp_port = DEFAULT_TCP_PORT,
@@ -914,6 +919,7 @@ struct dlm_config_info dlm_config = {
914 .ci_recover_timer = DEFAULT_RECOVER_TIMER, 919 .ci_recover_timer = DEFAULT_RECOVER_TIMER,
915 .ci_toss_secs = DEFAULT_TOSS_SECS, 920 .ci_toss_secs = DEFAULT_TOSS_SECS,
916 .ci_scan_secs = DEFAULT_SCAN_SECS, 921 .ci_scan_secs = DEFAULT_SCAN_SECS,
917 .ci_log_debug = DEFAULT_LOG_DEBUG 922 .ci_log_debug = DEFAULT_LOG_DEBUG,
923 .ci_protocol = DEFAULT_PROTOCOL
918}; 924};
919 925
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 1e978611a96e..967cc3d72e5e 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -26,6 +26,7 @@ struct dlm_config_info {
26 int ci_toss_secs; 26 int ci_toss_secs;
27 int ci_scan_secs; 27 int ci_scan_secs;
28 int ci_log_debug; 28 int ci_log_debug;
29 int ci_protocol;
29}; 30};
30 31
31extern struct dlm_config_info dlm_config; 32extern struct dlm_config_info dlm_config;
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 61d93201e1b2..30994d68f6a0 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -210,6 +210,9 @@ struct dlm_args {
210#define DLM_IFL_MSTCPY 0x00010000 210#define DLM_IFL_MSTCPY 0x00010000
211#define DLM_IFL_RESEND 0x00020000 211#define DLM_IFL_RESEND 0x00020000
212#define DLM_IFL_DEAD 0x00040000 212#define DLM_IFL_DEAD 0x00040000
213#define DLM_IFL_OVERLAP_UNLOCK 0x00080000
214#define DLM_IFL_OVERLAP_CANCEL 0x00100000
215#define DLM_IFL_ENDOFLIFE 0x00200000
213#define DLM_IFL_USER 0x00000001 216#define DLM_IFL_USER 0x00000001
214#define DLM_IFL_ORPHAN 0x00000002 217#define DLM_IFL_ORPHAN 0x00000002
215 218
@@ -230,8 +233,8 @@ struct dlm_lkb {
230 int8_t lkb_grmode; /* granted lock mode */ 233 int8_t lkb_grmode; /* granted lock mode */
231 int8_t lkb_bastmode; /* requested mode */ 234 int8_t lkb_bastmode; /* requested mode */
232 int8_t lkb_highbast; /* highest mode bast sent for */ 235 int8_t lkb_highbast; /* highest mode bast sent for */
233
234 int8_t lkb_wait_type; /* type of reply waiting for */ 236 int8_t lkb_wait_type; /* type of reply waiting for */
237 int8_t lkb_wait_count;
235 int8_t lkb_ast_type; /* type of ast queued for */ 238 int8_t lkb_ast_type; /* type of ast queued for */
236 239
237 struct list_head lkb_idtbl_list; /* lockspace lkbtbl */ 240 struct list_head lkb_idtbl_list; /* lockspace lkbtbl */
@@ -339,6 +342,7 @@ struct dlm_header {
339#define DLM_MSG_LOOKUP 11 342#define DLM_MSG_LOOKUP 11
340#define DLM_MSG_REMOVE 12 343#define DLM_MSG_REMOVE 12
341#define DLM_MSG_LOOKUP_REPLY 13 344#define DLM_MSG_LOOKUP_REPLY 13
345#define DLM_MSG_PURGE 14
342 346
343struct dlm_message { 347struct dlm_message {
344 struct dlm_header m_header; 348 struct dlm_header m_header;
@@ -440,6 +444,9 @@ struct dlm_ls {
440 struct mutex ls_waiters_mutex; 444 struct mutex ls_waiters_mutex;
441 struct list_head ls_waiters; /* lkbs needing a reply */ 445 struct list_head ls_waiters; /* lkbs needing a reply */
442 446
447 struct mutex ls_orphans_mutex;
448 struct list_head ls_orphans;
449
443 struct list_head ls_nodes; /* current nodes in ls */ 450 struct list_head ls_nodes; /* current nodes in ls */
444 struct list_head ls_nodes_gone; /* dead node list, recovery */ 451 struct list_head ls_nodes_gone; /* dead node list, recovery */
445 int ls_num_nodes; /* number of nodes in ls */ 452 int ls_num_nodes; /* number of nodes in ls */
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index e725005fafd0..d8d6e729f96b 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1,7 +1,7 @@
1/****************************************************************************** 1/******************************************************************************
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 4** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
5** 5**
6** This copyrighted material is made available to anyone wishing to use, 6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions 7** modify, copy, or redistribute it subject to the terms and conditions
@@ -85,6 +85,7 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms); 86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms); 87static int receive_extralen(struct dlm_message *ms);
88static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
88 89
89/* 90/*
90 * Lock compatibilty matrix - thanks Steve 91 * Lock compatibilty matrix - thanks Steve
@@ -223,6 +224,16 @@ static inline int is_demoted(struct dlm_lkb *lkb)
223 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); 224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
224} 225}
225 226
227static inline int is_altmode(struct dlm_lkb *lkb)
228{
229 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
230}
231
232static inline int is_granted(struct dlm_lkb *lkb)
233{
234 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
235}
236
226static inline int is_remote(struct dlm_rsb *r) 237static inline int is_remote(struct dlm_rsb *r)
227{ 238{
228 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 239 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
@@ -254,6 +265,22 @@ static inline int down_conversion(struct dlm_lkb *lkb)
254 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 265 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
255} 266}
256 267
268static inline int is_overlap_unlock(struct dlm_lkb *lkb)
269{
270 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
271}
272
273static inline int is_overlap_cancel(struct dlm_lkb *lkb)
274{
275 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
276}
277
278static inline int is_overlap(struct dlm_lkb *lkb)
279{
280 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
281 DLM_IFL_OVERLAP_CANCEL));
282}
283
257static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 284static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
258{ 285{
259 if (is_master_copy(lkb)) 286 if (is_master_copy(lkb))
@@ -267,6 +294,12 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
267 dlm_add_ast(lkb, AST_COMP); 294 dlm_add_ast(lkb, AST_COMP);
268} 295}
269 296
297static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
298{
299 queue_cast(r, lkb,
300 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
301}
302
270static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 303static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
271{ 304{
272 if (is_master_copy(lkb)) 305 if (is_master_copy(lkb))
@@ -547,6 +580,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
547 lkb->lkb_grmode = DLM_LOCK_IV; 580 lkb->lkb_grmode = DLM_LOCK_IV;
548 kref_init(&lkb->lkb_ref); 581 kref_init(&lkb->lkb_ref);
549 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 582 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
583 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
550 584
551 get_random_bytes(&bucket, sizeof(bucket)); 585 get_random_bytes(&bucket, sizeof(bucket));
552 bucket &= (ls->ls_lkbtbl_size - 1); 586 bucket &= (ls->ls_lkbtbl_size - 1);
@@ -556,7 +590,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
556 /* counter can roll over so we must verify lkid is not in use */ 590 /* counter can roll over so we must verify lkid is not in use */
557 591
558 while (lkid == 0) { 592 while (lkid == 0) {
559 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16); 593 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
560 594
561 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, 595 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
562 lkb_idtbl_list) { 596 lkb_idtbl_list) {
@@ -577,8 +611,8 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
577 611
578static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) 612static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
579{ 613{
580 uint16_t bucket = lkid & 0xFFFF;
581 struct dlm_lkb *lkb; 614 struct dlm_lkb *lkb;
615 uint16_t bucket = (lkid >> 16);
582 616
583 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { 617 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
584 if (lkb->lkb_id == lkid) 618 if (lkb->lkb_id == lkid)
@@ -590,7 +624,7 @@ static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
590static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 624static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
591{ 625{
592 struct dlm_lkb *lkb; 626 struct dlm_lkb *lkb;
593 uint16_t bucket = lkid & 0xFFFF; 627 uint16_t bucket = (lkid >> 16);
594 628
595 if (bucket >= ls->ls_lkbtbl_size) 629 if (bucket >= ls->ls_lkbtbl_size)
596 return -EBADSLT; 630 return -EBADSLT;
@@ -620,7 +654,7 @@ static void kill_lkb(struct kref *kref)
620 654
621static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 655static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
622{ 656{
623 uint16_t bucket = lkb->lkb_id & 0xFFFF; 657 uint16_t bucket = (lkb->lkb_id >> 16);
624 658
625 write_lock(&ls->ls_lkbtbl[bucket].lock); 659 write_lock(&ls->ls_lkbtbl[bucket].lock);
626 if (kref_put(&lkb->lkb_ref, kill_lkb)) { 660 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
@@ -735,23 +769,75 @@ static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
735 unhold_lkb(lkb); 769 unhold_lkb(lkb);
736} 770}
737 771
772static int msg_reply_type(int mstype)
773{
774 switch (mstype) {
775 case DLM_MSG_REQUEST:
776 return DLM_MSG_REQUEST_REPLY;
777 case DLM_MSG_CONVERT:
778 return DLM_MSG_CONVERT_REPLY;
779 case DLM_MSG_UNLOCK:
780 return DLM_MSG_UNLOCK_REPLY;
781 case DLM_MSG_CANCEL:
782 return DLM_MSG_CANCEL_REPLY;
783 case DLM_MSG_LOOKUP:
784 return DLM_MSG_LOOKUP_REPLY;
785 }
786 return -1;
787}
788
738/* add/remove lkb from global waiters list of lkb's waiting for 789/* add/remove lkb from global waiters list of lkb's waiting for
739 a reply from a remote node */ 790 a reply from a remote node */
740 791
741static void add_to_waiters(struct dlm_lkb *lkb, int mstype) 792static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
742{ 793{
743 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 794 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
795 int error = 0;
744 796
745 mutex_lock(&ls->ls_waiters_mutex); 797 mutex_lock(&ls->ls_waiters_mutex);
746 if (lkb->lkb_wait_type) { 798
747 log_print("add_to_waiters error %d", lkb->lkb_wait_type); 799 if (is_overlap_unlock(lkb) ||
800 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
801 error = -EINVAL;
802 goto out;
803 }
804
805 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
806 switch (mstype) {
807 case DLM_MSG_UNLOCK:
808 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
809 break;
810 case DLM_MSG_CANCEL:
811 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
812 break;
813 default:
814 error = -EBUSY;
815 goto out;
816 }
817 lkb->lkb_wait_count++;
818 hold_lkb(lkb);
819
820 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
821 lkb->lkb_id, lkb->lkb_wait_type, mstype,
822 lkb->lkb_wait_count, lkb->lkb_flags);
748 goto out; 823 goto out;
749 } 824 }
825
826 DLM_ASSERT(!lkb->lkb_wait_count,
827 dlm_print_lkb(lkb);
828 printk("wait_count %d\n", lkb->lkb_wait_count););
829
830 lkb->lkb_wait_count++;
750 lkb->lkb_wait_type = mstype; 831 lkb->lkb_wait_type = mstype;
751 kref_get(&lkb->lkb_ref); 832 hold_lkb(lkb);
752 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 833 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
753 out: 834 out:
835 if (error)
836 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
837 lkb->lkb_id, error, lkb->lkb_flags, mstype,
838 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
754 mutex_unlock(&ls->ls_waiters_mutex); 839 mutex_unlock(&ls->ls_waiters_mutex);
840 return error;
755} 841}
756 842
757/* We clear the RESEND flag because we might be taking an lkb off the waiters 843/* We clear the RESEND flag because we might be taking an lkb off the waiters
@@ -759,34 +845,85 @@ static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
759 request reply on the requestqueue) between dlm_recover_waiters_pre() which 845 request reply on the requestqueue) between dlm_recover_waiters_pre() which
760 set RESEND and dlm_recover_waiters_post() */ 846 set RESEND and dlm_recover_waiters_post() */
761 847
762static int _remove_from_waiters(struct dlm_lkb *lkb) 848static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
763{ 849{
764 int error = 0; 850 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
851 int overlap_done = 0;
765 852
766 if (!lkb->lkb_wait_type) { 853 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
767 log_print("remove_from_waiters error"); 854 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
768 error = -EINVAL; 855 overlap_done = 1;
769 goto out; 856 goto out_del;
857 }
858
859 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
860 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
861 overlap_done = 1;
862 goto out_del;
863 }
864
865 /* N.B. type of reply may not always correspond to type of original
866 msg due to lookup->request optimization, verify others? */
867
868 if (lkb->lkb_wait_type) {
869 lkb->lkb_wait_type = 0;
870 goto out_del;
871 }
872
873 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
874 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
875 return -1;
876
877 out_del:
878 /* the force-unlock/cancel has completed and we haven't recvd a reply
879 to the op that was in progress prior to the unlock/cancel; we
880 give up on any reply to the earlier op. FIXME: not sure when/how
881 this would happen */
882
883 if (overlap_done && lkb->lkb_wait_type) {
884 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
885 lkb->lkb_id, mstype, lkb->lkb_wait_type);
886 lkb->lkb_wait_count--;
887 lkb->lkb_wait_type = 0;
770 } 888 }
771 lkb->lkb_wait_type = 0; 889
890 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
891
772 lkb->lkb_flags &= ~DLM_IFL_RESEND; 892 lkb->lkb_flags &= ~DLM_IFL_RESEND;
773 list_del(&lkb->lkb_wait_reply); 893 lkb->lkb_wait_count--;
894 if (!lkb->lkb_wait_count)
895 list_del_init(&lkb->lkb_wait_reply);
774 unhold_lkb(lkb); 896 unhold_lkb(lkb);
775 out: 897 return 0;
776 return error;
777} 898}
778 899
779static int remove_from_waiters(struct dlm_lkb *lkb) 900static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
780{ 901{
781 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 902 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
782 int error; 903 int error;
783 904
784 mutex_lock(&ls->ls_waiters_mutex); 905 mutex_lock(&ls->ls_waiters_mutex);
785 error = _remove_from_waiters(lkb); 906 error = _remove_from_waiters(lkb, mstype);
786 mutex_unlock(&ls->ls_waiters_mutex); 907 mutex_unlock(&ls->ls_waiters_mutex);
787 return error; 908 return error;
788} 909}
789 910
911/* Handles situations where we might be processing a "fake" or "stub" reply in
912 which we can't try to take waiters_mutex again. */
913
914static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
915{
916 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
917 int error;
918
919 if (ms != &ls->ls_stub_ms)
920 mutex_lock(&ls->ls_waiters_mutex);
921 error = _remove_from_waiters(lkb, ms->m_type);
922 if (ms != &ls->ls_stub_ms)
923 mutex_unlock(&ls->ls_waiters_mutex);
924 return error;
925}
926
790static void dir_remove(struct dlm_rsb *r) 927static void dir_remove(struct dlm_rsb *r)
791{ 928{
792 int to_nodeid; 929 int to_nodeid;
@@ -988,8 +1125,14 @@ static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
988 _remove_lock(r, lkb); 1125 _remove_lock(r, lkb);
989} 1126}
990 1127
991static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1128/* returns: 0 did nothing
1129 1 moved lock to granted
1130 -1 removed lock */
1131
1132static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992{ 1133{
1134 int rv = 0;
1135
993 lkb->lkb_rqmode = DLM_LOCK_IV; 1136 lkb->lkb_rqmode = DLM_LOCK_IV;
994 1137
995 switch (lkb->lkb_status) { 1138 switch (lkb->lkb_status) {
@@ -997,6 +1140,7 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
997 break; 1140 break;
998 case DLM_LKSTS_CONVERT: 1141 case DLM_LKSTS_CONVERT:
999 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1142 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1143 rv = 1;
1000 break; 1144 break;
1001 case DLM_LKSTS_WAITING: 1145 case DLM_LKSTS_WAITING:
1002 del_lkb(r, lkb); 1146 del_lkb(r, lkb);
@@ -1004,15 +1148,17 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1004 /* this unhold undoes the original ref from create_lkb() 1148 /* this unhold undoes the original ref from create_lkb()
1005 so this leads to the lkb being freed */ 1149 so this leads to the lkb being freed */
1006 unhold_lkb(lkb); 1150 unhold_lkb(lkb);
1151 rv = -1;
1007 break; 1152 break;
1008 default: 1153 default:
1009 log_print("invalid status for revert %d", lkb->lkb_status); 1154 log_print("invalid status for revert %d", lkb->lkb_status);
1010 } 1155 }
1156 return rv;
1011} 1157}
1012 1158
1013static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1159static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1014{ 1160{
1015 revert_lock(r, lkb); 1161 return revert_lock(r, lkb);
1016} 1162}
1017 1163
1018static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1164static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -1055,6 +1201,50 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1055 queue_cast(r, lkb, 0); 1201 queue_cast(r, lkb, 0);
1056} 1202}
1057 1203
1204/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1205 change the granted/requested modes. We're munging things accordingly in
1206 the process copy.
1207 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1208 conversion deadlock
1209 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1210 compatible with other granted locks */
1211
1212static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1213{
1214 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1215 log_print("munge_demoted %x invalid reply type %d",
1216 lkb->lkb_id, ms->m_type);
1217 return;
1218 }
1219
1220 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1221 log_print("munge_demoted %x invalid modes gr %d rq %d",
1222 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1223 return;
1224 }
1225
1226 lkb->lkb_grmode = DLM_LOCK_NL;
1227}
1228
1229static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1230{
1231 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1232 ms->m_type != DLM_MSG_GRANT) {
1233 log_print("munge_altmode %x invalid reply type %d",
1234 lkb->lkb_id, ms->m_type);
1235 return;
1236 }
1237
1238 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1239 lkb->lkb_rqmode = DLM_LOCK_PR;
1240 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1241 lkb->lkb_rqmode = DLM_LOCK_CW;
1242 else {
1243 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1244 dlm_print_lkb(lkb);
1245 }
1246}
1247
1058static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 1248static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1059{ 1249{
1060 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 1250 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
@@ -1499,7 +1689,7 @@ static void process_lookup_list(struct dlm_rsb *r)
1499 struct dlm_lkb *lkb, *safe; 1689 struct dlm_lkb *lkb, *safe;
1500 1690
1501 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 1691 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1502 list_del(&lkb->lkb_rsb_lookup); 1692 list_del_init(&lkb->lkb_rsb_lookup);
1503 _request_lock(r, lkb); 1693 _request_lock(r, lkb);
1504 schedule(); 1694 schedule();
1505 } 1695 }
@@ -1530,7 +1720,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
1530 if (!list_empty(&r->res_lookup)) { 1720 if (!list_empty(&r->res_lookup)) {
1531 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 1721 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1532 lkb_rsb_lookup); 1722 lkb_rsb_lookup);
1533 list_del(&lkb->lkb_rsb_lookup); 1723 list_del_init(&lkb->lkb_rsb_lookup);
1534 r->res_first_lkid = lkb->lkb_id; 1724 r->res_first_lkid = lkb->lkb_id;
1535 _request_lock(r, lkb); 1725 _request_lock(r, lkb);
1536 } else 1726 } else
@@ -1614,6 +1804,9 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1614 DLM_LKF_FORCEUNLOCK)) 1804 DLM_LKF_FORCEUNLOCK))
1615 return -EINVAL; 1805 return -EINVAL;
1616 1806
1807 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1808 return -EINVAL;
1809
1617 args->flags = flags; 1810 args->flags = flags;
1618 args->astparam = (long) astarg; 1811 args->astparam = (long) astarg;
1619 return 0; 1812 return 0;
@@ -1638,6 +1831,9 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1638 1831
1639 if (lkb->lkb_wait_type) 1832 if (lkb->lkb_wait_type)
1640 goto out; 1833 goto out;
1834
1835 if (is_overlap(lkb))
1836 goto out;
1641 } 1837 }
1642 1838
1643 lkb->lkb_exflags = args->flags; 1839 lkb->lkb_exflags = args->flags;
@@ -1654,35 +1850,126 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1654 return rv; 1850 return rv;
1655} 1851}
1656 1852
1853/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1854 for success */
1855
1856/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1857 because there may be a lookup in progress and it's valid to do
1858 cancel/unlockf on it */
1859
1657static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 1860static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1658{ 1861{
1862 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1659 int rv = -EINVAL; 1863 int rv = -EINVAL;
1660 1864
1661 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 1865 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1866 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1867 dlm_print_lkb(lkb);
1662 goto out; 1868 goto out;
1869 }
1663 1870
1664 if (args->flags & DLM_LKF_FORCEUNLOCK) 1871 /* an lkb may still exist even though the lock is EOL'ed due to a
1665 goto out_ok; 1872 cancel, unlock or failed noqueue request; an app can't use these
1873 locks; return same error as if the lkid had not been found at all */
1666 1874
1667 if (args->flags & DLM_LKF_CANCEL && 1875 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1668 lkb->lkb_status == DLM_LKSTS_GRANTED) 1876 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1877 rv = -ENOENT;
1669 goto out; 1878 goto out;
1879 }
1670 1880
1671 if (!(args->flags & DLM_LKF_CANCEL) && 1881 /* an lkb may be waiting for an rsb lookup to complete where the
1672 lkb->lkb_status != DLM_LKSTS_GRANTED) 1882 lookup was initiated by another lock */
1673 goto out; 1883
1884 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1885 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1886 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1887 list_del_init(&lkb->lkb_rsb_lookup);
1888 queue_cast(lkb->lkb_resource, lkb,
1889 args->flags & DLM_LKF_CANCEL ?
1890 -DLM_ECANCEL : -DLM_EUNLOCK);
1891 unhold_lkb(lkb); /* undoes create_lkb() */
1892 rv = -EBUSY;
1893 goto out;
1894 }
1895 }
1896
1897 /* cancel not allowed with another cancel/unlock in progress */
1898
1899 if (args->flags & DLM_LKF_CANCEL) {
1900 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1901 goto out;
1902
1903 if (is_overlap(lkb))
1904 goto out;
1905
1906 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1907 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1908 rv = -EBUSY;
1909 goto out;
1910 }
1911
1912 switch (lkb->lkb_wait_type) {
1913 case DLM_MSG_LOOKUP:
1914 case DLM_MSG_REQUEST:
1915 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1916 rv = -EBUSY;
1917 goto out;
1918 case DLM_MSG_UNLOCK:
1919 case DLM_MSG_CANCEL:
1920 goto out;
1921 }
1922 /* add_to_waiters() will set OVERLAP_CANCEL */
1923 goto out_ok;
1924 }
1925
1926 /* do we need to allow a force-unlock if there's a normal unlock
1927 already in progress? in what conditions could the normal unlock
1928 fail such that we'd want to send a force-unlock to be sure? */
1929
1930 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1931 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1932 goto out;
1933
1934 if (is_overlap_unlock(lkb))
1935 goto out;
1674 1936
1937 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1938 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1939 rv = -EBUSY;
1940 goto out;
1941 }
1942
1943 switch (lkb->lkb_wait_type) {
1944 case DLM_MSG_LOOKUP:
1945 case DLM_MSG_REQUEST:
1946 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1947 rv = -EBUSY;
1948 goto out;
1949 case DLM_MSG_UNLOCK:
1950 goto out;
1951 }
1952 /* add_to_waiters() will set OVERLAP_UNLOCK */
1953 goto out_ok;
1954 }
1955
1956 /* normal unlock not allowed if there's any op in progress */
1675 rv = -EBUSY; 1957 rv = -EBUSY;
1676 if (lkb->lkb_wait_type) 1958 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
1677 goto out; 1959 goto out;
1678 1960
1679 out_ok: 1961 out_ok:
1680 lkb->lkb_exflags = args->flags; 1962 /* an overlapping op shouldn't blow away exflags from other op */
1963 lkb->lkb_exflags |= args->flags;
1681 lkb->lkb_sbflags = 0; 1964 lkb->lkb_sbflags = 0;
1682 lkb->lkb_astparam = args->astparam; 1965 lkb->lkb_astparam = args->astparam;
1683
1684 rv = 0; 1966 rv = 0;
1685 out: 1967 out:
1968 if (rv)
1969 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1970 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1971 args->flags, lkb->lkb_wait_type,
1972 lkb->lkb_resource->res_name);
1686 return rv; 1973 return rv;
1687} 1974}
1688 1975
@@ -1732,9 +2019,24 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1732 goto out; 2019 goto out;
1733 } 2020 }
1734 2021
1735 if (can_be_queued(lkb)) { 2022 /* is_demoted() means the can_be_granted() above set the grmode
1736 if (is_demoted(lkb)) 2023 to NL, and left us on the granted queue. This auto-demotion
2024 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2025 now grantable. We have to try to grant other converting locks
2026 before we try again to grant this one. */
2027
2028 if (is_demoted(lkb)) {
2029 grant_pending_convert(r, DLM_LOCK_IV);
2030 if (_can_be_granted(r, lkb, 1)) {
2031 grant_lock(r, lkb);
2032 queue_cast(r, lkb, 0);
1737 grant_pending_locks(r); 2033 grant_pending_locks(r);
2034 goto out;
2035 }
2036 /* else fall through and move to convert queue */
2037 }
2038
2039 if (can_be_queued(lkb)) {
1738 error = -EINPROGRESS; 2040 error = -EINPROGRESS;
1739 del_lkb(r, lkb); 2041 del_lkb(r, lkb);
1740 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 2042 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
@@ -1759,17 +2061,19 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1759 return -DLM_EUNLOCK; 2061 return -DLM_EUNLOCK;
1760} 2062}
1761 2063
1762/* FIXME: if revert_lock() finds that the lkb is granted, we should 2064/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
1763 skip the queue_cast(ECANCEL). It indicates that the request/convert
1764 completed (and queued a normal ast) just before the cancel; we don't
1765 want to clobber the sb_result for the normal ast with ECANCEL. */
1766 2065
1767static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2066static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1768{ 2067{
1769 revert_lock(r, lkb); 2068 int error;
1770 queue_cast(r, lkb, -DLM_ECANCEL); 2069
1771 grant_pending_locks(r); 2070 error = revert_lock(r, lkb);
1772 return -DLM_ECANCEL; 2071 if (error) {
2072 queue_cast(r, lkb, -DLM_ECANCEL);
2073 grant_pending_locks(r);
2074 return -DLM_ECANCEL;
2075 }
2076 return 0;
1773} 2077}
1774 2078
1775/* 2079/*
@@ -2035,6 +2339,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
2035 2339
2036 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 2340 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2037 error = 0; 2341 error = 0;
2342 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2343 error = 0;
2038 out_put: 2344 out_put:
2039 dlm_put_lkb(lkb); 2345 dlm_put_lkb(lkb);
2040 out: 2346 out:
@@ -2065,31 +2371,14 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
2065 * receive_lookup_reply send_lookup_reply 2371 * receive_lookup_reply send_lookup_reply
2066 */ 2372 */
2067 2373
2068static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 2374static int _create_message(struct dlm_ls *ls, int mb_len,
2069 int to_nodeid, int mstype, 2375 int to_nodeid, int mstype,
2070 struct dlm_message **ms_ret, 2376 struct dlm_message **ms_ret,
2071 struct dlm_mhandle **mh_ret) 2377 struct dlm_mhandle **mh_ret)
2072{ 2378{
2073 struct dlm_message *ms; 2379 struct dlm_message *ms;
2074 struct dlm_mhandle *mh; 2380 struct dlm_mhandle *mh;
2075 char *mb; 2381 char *mb;
2076 int mb_len = sizeof(struct dlm_message);
2077
2078 switch (mstype) {
2079 case DLM_MSG_REQUEST:
2080 case DLM_MSG_LOOKUP:
2081 case DLM_MSG_REMOVE:
2082 mb_len += r->res_length;
2083 break;
2084 case DLM_MSG_CONVERT:
2085 case DLM_MSG_UNLOCK:
2086 case DLM_MSG_REQUEST_REPLY:
2087 case DLM_MSG_CONVERT_REPLY:
2088 case DLM_MSG_GRANT:
2089 if (lkb && lkb->lkb_lvbptr)
2090 mb_len += r->res_ls->ls_lvblen;
2091 break;
2092 }
2093 2382
2094 /* get_buffer gives us a message handle (mh) that we need to 2383 /* get_buffer gives us a message handle (mh) that we need to
2095 pass into lowcomms_commit and a message buffer (mb) that we 2384 pass into lowcomms_commit and a message buffer (mb) that we
@@ -2104,7 +2393,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2104 ms = (struct dlm_message *) mb; 2393 ms = (struct dlm_message *) mb;
2105 2394
2106 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 2395 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2107 ms->m_header.h_lockspace = r->res_ls->ls_global_id; 2396 ms->m_header.h_lockspace = ls->ls_global_id;
2108 ms->m_header.h_nodeid = dlm_our_nodeid(); 2397 ms->m_header.h_nodeid = dlm_our_nodeid();
2109 ms->m_header.h_length = mb_len; 2398 ms->m_header.h_length = mb_len;
2110 ms->m_header.h_cmd = DLM_MSG; 2399 ms->m_header.h_cmd = DLM_MSG;
@@ -2116,6 +2405,33 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2116 return 0; 2405 return 0;
2117} 2406}
2118 2407
2408static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2409 int to_nodeid, int mstype,
2410 struct dlm_message **ms_ret,
2411 struct dlm_mhandle **mh_ret)
2412{
2413 int mb_len = sizeof(struct dlm_message);
2414
2415 switch (mstype) {
2416 case DLM_MSG_REQUEST:
2417 case DLM_MSG_LOOKUP:
2418 case DLM_MSG_REMOVE:
2419 mb_len += r->res_length;
2420 break;
2421 case DLM_MSG_CONVERT:
2422 case DLM_MSG_UNLOCK:
2423 case DLM_MSG_REQUEST_REPLY:
2424 case DLM_MSG_CONVERT_REPLY:
2425 case DLM_MSG_GRANT:
2426 if (lkb && lkb->lkb_lvbptr)
2427 mb_len += r->res_ls->ls_lvblen;
2428 break;
2429 }
2430
2431 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2432 ms_ret, mh_ret);
2433}
2434
2119/* further lowcomms enhancements or alternate implementations may make 2435/* further lowcomms enhancements or alternate implementations may make
2120 the return value from this function useful at some point */ 2436 the return value from this function useful at some point */
2121 2437
@@ -2176,7 +2492,9 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2176 struct dlm_mhandle *mh; 2492 struct dlm_mhandle *mh;
2177 int to_nodeid, error; 2493 int to_nodeid, error;
2178 2494
2179 add_to_waiters(lkb, mstype); 2495 error = add_to_waiters(lkb, mstype);
2496 if (error)
2497 return error;
2180 2498
2181 to_nodeid = r->res_nodeid; 2499 to_nodeid = r->res_nodeid;
2182 2500
@@ -2192,7 +2510,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2192 return 0; 2510 return 0;
2193 2511
2194 fail: 2512 fail:
2195 remove_from_waiters(lkb); 2513 remove_from_waiters(lkb, msg_reply_type(mstype));
2196 return error; 2514 return error;
2197} 2515}
2198 2516
@@ -2209,7 +2527,8 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2209 2527
2210 /* down conversions go without a reply from the master */ 2528 /* down conversions go without a reply from the master */
2211 if (!error && down_conversion(lkb)) { 2529 if (!error && down_conversion(lkb)) {
2212 remove_from_waiters(lkb); 2530 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2531 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2213 r->res_ls->ls_stub_ms.m_result = 0; 2532 r->res_ls->ls_stub_ms.m_result = 0;
2214 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; 2533 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2215 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 2534 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
@@ -2280,7 +2599,9 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2280 struct dlm_mhandle *mh; 2599 struct dlm_mhandle *mh;
2281 int to_nodeid, error; 2600 int to_nodeid, error;
2282 2601
2283 add_to_waiters(lkb, DLM_MSG_LOOKUP); 2602 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2603 if (error)
2604 return error;
2284 2605
2285 to_nodeid = dlm_dir_nodeid(r); 2606 to_nodeid = dlm_dir_nodeid(r);
2286 2607
@@ -2296,7 +2617,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2296 return 0; 2617 return 0;
2297 2618
2298 fail: 2619 fail:
2299 remove_from_waiters(lkb); 2620 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2300 return error; 2621 return error;
2301} 2622}
2302 2623
@@ -2656,6 +2977,8 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2656 lock_rsb(r); 2977 lock_rsb(r);
2657 2978
2658 receive_flags_reply(lkb, ms); 2979 receive_flags_reply(lkb, ms);
2980 if (is_altmode(lkb))
2981 munge_altmode(lkb, ms);
2659 grant_lock_pc(r, lkb, ms); 2982 grant_lock_pc(r, lkb, ms);
2660 queue_cast(r, lkb, 0); 2983 queue_cast(r, lkb, 0);
2661 2984
@@ -2736,11 +3059,16 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2736 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); 3059 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2737} 3060}
2738 3061
3062static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3063{
3064 do_purge(ls, ms->m_nodeid, ms->m_pid);
3065}
3066
2739static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 3067static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2740{ 3068{
2741 struct dlm_lkb *lkb; 3069 struct dlm_lkb *lkb;
2742 struct dlm_rsb *r; 3070 struct dlm_rsb *r;
2743 int error, mstype; 3071 int error, mstype, result;
2744 3072
2745 error = find_lkb(ls, ms->m_remid, &lkb); 3073 error = find_lkb(ls, ms->m_remid, &lkb);
2746 if (error) { 3074 if (error) {
@@ -2749,20 +3077,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2749 } 3077 }
2750 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3078 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2751 3079
2752 mstype = lkb->lkb_wait_type;
2753 error = remove_from_waiters(lkb);
2754 if (error) {
2755 log_error(ls, "receive_request_reply not on waiters");
2756 goto out;
2757 }
2758
2759 /* this is the value returned from do_request() on the master */
2760 error = ms->m_result;
2761
2762 r = lkb->lkb_resource; 3080 r = lkb->lkb_resource;
2763 hold_rsb(r); 3081 hold_rsb(r);
2764 lock_rsb(r); 3082 lock_rsb(r);
2765 3083
3084 mstype = lkb->lkb_wait_type;
3085 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3086 if (error)
3087 goto out;
3088
2766 /* Optimization: the dir node was also the master, so it took our 3089 /* Optimization: the dir node was also the master, so it took our
2767 lookup as a request and sent request reply instead of lookup reply */ 3090 lookup as a request and sent request reply instead of lookup reply */
2768 if (mstype == DLM_MSG_LOOKUP) { 3091 if (mstype == DLM_MSG_LOOKUP) {
@@ -2770,14 +3093,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2770 lkb->lkb_nodeid = r->res_nodeid; 3093 lkb->lkb_nodeid = r->res_nodeid;
2771 } 3094 }
2772 3095
2773 switch (error) { 3096 /* this is the value returned from do_request() on the master */
3097 result = ms->m_result;
3098
3099 switch (result) {
2774 case -EAGAIN: 3100 case -EAGAIN:
2775 /* request would block (be queued) on remote master; 3101 /* request would block (be queued) on remote master */
2776 the unhold undoes the original ref from create_lkb()
2777 so it leads to the lkb being freed */
2778 queue_cast(r, lkb, -EAGAIN); 3102 queue_cast(r, lkb, -EAGAIN);
2779 confirm_master(r, -EAGAIN); 3103 confirm_master(r, -EAGAIN);
2780 unhold_lkb(lkb); 3104 unhold_lkb(lkb); /* undoes create_lkb() */
2781 break; 3105 break;
2782 3106
2783 case -EINPROGRESS: 3107 case -EINPROGRESS:
@@ -2785,41 +3109,64 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2785 /* request was queued or granted on remote master */ 3109 /* request was queued or granted on remote master */
2786 receive_flags_reply(lkb, ms); 3110 receive_flags_reply(lkb, ms);
2787 lkb->lkb_remid = ms->m_lkid; 3111 lkb->lkb_remid = ms->m_lkid;
2788 if (error) 3112 if (is_altmode(lkb))
3113 munge_altmode(lkb, ms);
3114 if (result)
2789 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3115 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2790 else { 3116 else {
2791 grant_lock_pc(r, lkb, ms); 3117 grant_lock_pc(r, lkb, ms);
2792 queue_cast(r, lkb, 0); 3118 queue_cast(r, lkb, 0);
2793 } 3119 }
2794 confirm_master(r, error); 3120 confirm_master(r, result);
2795 break; 3121 break;
2796 3122
2797 case -EBADR: 3123 case -EBADR:
2798 case -ENOTBLK: 3124 case -ENOTBLK:
2799 /* find_rsb failed to find rsb or rsb wasn't master */ 3125 /* find_rsb failed to find rsb or rsb wasn't master */
3126 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3127 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
2800 r->res_nodeid = -1; 3128 r->res_nodeid = -1;
2801 lkb->lkb_nodeid = -1; 3129 lkb->lkb_nodeid = -1;
2802 _request_lock(r, lkb); 3130
3131 if (is_overlap(lkb)) {
3132 /* we'll ignore error in cancel/unlock reply */
3133 queue_cast_overlap(r, lkb);
3134 unhold_lkb(lkb); /* undoes create_lkb() */
3135 } else
3136 _request_lock(r, lkb);
2803 break; 3137 break;
2804 3138
2805 default: 3139 default:
2806 log_error(ls, "receive_request_reply error %d", error); 3140 log_error(ls, "receive_request_reply %x error %d",
3141 lkb->lkb_id, result);
2807 } 3142 }
2808 3143
3144 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3145 log_debug(ls, "receive_request_reply %x result %d unlock",
3146 lkb->lkb_id, result);
3147 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3148 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3149 send_unlock(r, lkb);
3150 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3151 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3152 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3153 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3154 send_cancel(r, lkb);
3155 } else {
3156 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3157 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3158 }
3159 out:
2809 unlock_rsb(r); 3160 unlock_rsb(r);
2810 put_rsb(r); 3161 put_rsb(r);
2811 out:
2812 dlm_put_lkb(lkb); 3162 dlm_put_lkb(lkb);
2813} 3163}
2814 3164
2815static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3165static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816 struct dlm_message *ms) 3166 struct dlm_message *ms)
2817{ 3167{
2818 int error = ms->m_result;
2819
2820 /* this is the value returned from do_convert() on the master */ 3168 /* this is the value returned from do_convert() on the master */
2821 3169 switch (ms->m_result) {
2822 switch (error) {
2823 case -EAGAIN: 3170 case -EAGAIN:
2824 /* convert would block (be queued) on remote master */ 3171 /* convert would block (be queued) on remote master */
2825 queue_cast(r, lkb, -EAGAIN); 3172 queue_cast(r, lkb, -EAGAIN);
@@ -2827,6 +3174,9 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2827 3174
2828 case -EINPROGRESS: 3175 case -EINPROGRESS:
2829 /* convert was queued on remote master */ 3176 /* convert was queued on remote master */
3177 receive_flags_reply(lkb, ms);
3178 if (is_demoted(lkb))
3179 munge_demoted(lkb, ms);
2830 del_lkb(r, lkb); 3180 del_lkb(r, lkb);
2831 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3181 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2832 break; 3182 break;
@@ -2834,24 +3184,33 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2834 case 0: 3184 case 0:
2835 /* convert was granted on remote master */ 3185 /* convert was granted on remote master */
2836 receive_flags_reply(lkb, ms); 3186 receive_flags_reply(lkb, ms);
3187 if (is_demoted(lkb))
3188 munge_demoted(lkb, ms);
2837 grant_lock_pc(r, lkb, ms); 3189 grant_lock_pc(r, lkb, ms);
2838 queue_cast(r, lkb, 0); 3190 queue_cast(r, lkb, 0);
2839 break; 3191 break;
2840 3192
2841 default: 3193 default:
2842 log_error(r->res_ls, "receive_convert_reply error %d", error); 3194 log_error(r->res_ls, "receive_convert_reply %x error %d",
3195 lkb->lkb_id, ms->m_result);
2843 } 3196 }
2844} 3197}
2845 3198
2846static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3199static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2847{ 3200{
2848 struct dlm_rsb *r = lkb->lkb_resource; 3201 struct dlm_rsb *r = lkb->lkb_resource;
3202 int error;
2849 3203
2850 hold_rsb(r); 3204 hold_rsb(r);
2851 lock_rsb(r); 3205 lock_rsb(r);
2852 3206
2853 __receive_convert_reply(r, lkb, ms); 3207 /* stub reply can happen with waiters_mutex held */
3208 error = remove_from_waiters_ms(lkb, ms);
3209 if (error)
3210 goto out;
2854 3211
3212 __receive_convert_reply(r, lkb, ms);
3213 out:
2855 unlock_rsb(r); 3214 unlock_rsb(r);
2856 put_rsb(r); 3215 put_rsb(r);
2857} 3216}
@@ -2868,37 +3227,38 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2868 } 3227 }
2869 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3228 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2870 3229
2871 error = remove_from_waiters(lkb);
2872 if (error) {
2873 log_error(ls, "receive_convert_reply not on waiters");
2874 goto out;
2875 }
2876
2877 _receive_convert_reply(lkb, ms); 3230 _receive_convert_reply(lkb, ms);
2878 out:
2879 dlm_put_lkb(lkb); 3231 dlm_put_lkb(lkb);
2880} 3232}
2881 3233
2882static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3234static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2883{ 3235{
2884 struct dlm_rsb *r = lkb->lkb_resource; 3236 struct dlm_rsb *r = lkb->lkb_resource;
2885 int error = ms->m_result; 3237 int error;
2886 3238
2887 hold_rsb(r); 3239 hold_rsb(r);
2888 lock_rsb(r); 3240 lock_rsb(r);
2889 3241
3242 /* stub reply can happen with waiters_mutex held */
3243 error = remove_from_waiters_ms(lkb, ms);
3244 if (error)
3245 goto out;
3246
2890 /* this is the value returned from do_unlock() on the master */ 3247 /* this is the value returned from do_unlock() on the master */
2891 3248
2892 switch (error) { 3249 switch (ms->m_result) {
2893 case -DLM_EUNLOCK: 3250 case -DLM_EUNLOCK:
2894 receive_flags_reply(lkb, ms); 3251 receive_flags_reply(lkb, ms);
2895 remove_lock_pc(r, lkb); 3252 remove_lock_pc(r, lkb);
2896 queue_cast(r, lkb, -DLM_EUNLOCK); 3253 queue_cast(r, lkb, -DLM_EUNLOCK);
2897 break; 3254 break;
3255 case -ENOENT:
3256 break;
2898 default: 3257 default:
2899 log_error(r->res_ls, "receive_unlock_reply error %d", error); 3258 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3259 lkb->lkb_id, ms->m_result);
2900 } 3260 }
2901 3261 out:
2902 unlock_rsb(r); 3262 unlock_rsb(r);
2903 put_rsb(r); 3263 put_rsb(r);
2904} 3264}
@@ -2915,37 +3275,39 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2915 } 3275 }
2916 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3276 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2917 3277
2918 error = remove_from_waiters(lkb);
2919 if (error) {
2920 log_error(ls, "receive_unlock_reply not on waiters");
2921 goto out;
2922 }
2923
2924 _receive_unlock_reply(lkb, ms); 3278 _receive_unlock_reply(lkb, ms);
2925 out:
2926 dlm_put_lkb(lkb); 3279 dlm_put_lkb(lkb);
2927} 3280}
2928 3281
2929static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3282static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2930{ 3283{
2931 struct dlm_rsb *r = lkb->lkb_resource; 3284 struct dlm_rsb *r = lkb->lkb_resource;
2932 int error = ms->m_result; 3285 int error;
2933 3286
2934 hold_rsb(r); 3287 hold_rsb(r);
2935 lock_rsb(r); 3288 lock_rsb(r);
2936 3289
3290 /* stub reply can happen with waiters_mutex held */
3291 error = remove_from_waiters_ms(lkb, ms);
3292 if (error)
3293 goto out;
3294
2937 /* this is the value returned from do_cancel() on the master */ 3295 /* this is the value returned from do_cancel() on the master */
2938 3296
2939 switch (error) { 3297 switch (ms->m_result) {
2940 case -DLM_ECANCEL: 3298 case -DLM_ECANCEL:
2941 receive_flags_reply(lkb, ms); 3299 receive_flags_reply(lkb, ms);
2942 revert_lock_pc(r, lkb); 3300 revert_lock_pc(r, lkb);
2943 queue_cast(r, lkb, -DLM_ECANCEL); 3301 if (ms->m_result)
3302 queue_cast(r, lkb, -DLM_ECANCEL);
3303 break;
3304 case 0:
2944 break; 3305 break;
2945 default: 3306 default:
2946 log_error(r->res_ls, "receive_cancel_reply error %d", error); 3307 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3308 lkb->lkb_id, ms->m_result);
2947 } 3309 }
2948 3310 out:
2949 unlock_rsb(r); 3311 unlock_rsb(r);
2950 put_rsb(r); 3312 put_rsb(r);
2951} 3313}
@@ -2962,14 +3324,7 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2962 } 3324 }
2963 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3325 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2964 3326
2965 error = remove_from_waiters(lkb);
2966 if (error) {
2967 log_error(ls, "receive_cancel_reply not on waiters");
2968 goto out;
2969 }
2970
2971 _receive_cancel_reply(lkb, ms); 3327 _receive_cancel_reply(lkb, ms);
2972 out:
2973 dlm_put_lkb(lkb); 3328 dlm_put_lkb(lkb);
2974} 3329}
2975 3330
@@ -2985,20 +3340,17 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2985 return; 3340 return;
2986 } 3341 }
2987 3342
2988 error = remove_from_waiters(lkb); 3343 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
2989 if (error) {
2990 log_error(ls, "receive_lookup_reply not on waiters");
2991 goto out;
2992 }
2993
2994 /* this is the value returned by dlm_dir_lookup on dir node
2995 FIXME: will a non-zero error ever be returned? */ 3344 FIXME: will a non-zero error ever be returned? */
2996 error = ms->m_result;
2997 3345
2998 r = lkb->lkb_resource; 3346 r = lkb->lkb_resource;
2999 hold_rsb(r); 3347 hold_rsb(r);
3000 lock_rsb(r); 3348 lock_rsb(r);
3001 3349
3350 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3351 if (error)
3352 goto out;
3353
3002 ret_nodeid = ms->m_nodeid; 3354 ret_nodeid = ms->m_nodeid;
3003 if (ret_nodeid == dlm_our_nodeid()) { 3355 if (ret_nodeid == dlm_our_nodeid()) {
3004 r->res_nodeid = 0; 3356 r->res_nodeid = 0;
@@ -3009,14 +3361,22 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3009 r->res_nodeid = ret_nodeid; 3361 r->res_nodeid = ret_nodeid;
3010 } 3362 }
3011 3363
3364 if (is_overlap(lkb)) {
3365 log_debug(ls, "receive_lookup_reply %x unlock %x",
3366 lkb->lkb_id, lkb->lkb_flags);
3367 queue_cast_overlap(r, lkb);
3368 unhold_lkb(lkb); /* undoes create_lkb() */
3369 goto out_list;
3370 }
3371
3012 _request_lock(r, lkb); 3372 _request_lock(r, lkb);
3013 3373
3374 out_list:
3014 if (!ret_nodeid) 3375 if (!ret_nodeid)
3015 process_lookup_list(r); 3376 process_lookup_list(r);
3016 3377 out:
3017 unlock_rsb(r); 3378 unlock_rsb(r);
3018 put_rsb(r); 3379 put_rsb(r);
3019 out:
3020 dlm_put_lkb(lkb); 3380 dlm_put_lkb(lkb);
3021} 3381}
3022 3382
@@ -3133,6 +3493,12 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3133 receive_lookup_reply(ls, ms); 3493 receive_lookup_reply(ls, ms);
3134 break; 3494 break;
3135 3495
3496 /* other messages */
3497
3498 case DLM_MSG_PURGE:
3499 receive_purge(ls, ms);
3500 break;
3501
3136 default: 3502 default:
3137 log_error(ls, "unknown message type %d", ms->m_type); 3503 log_error(ls, "unknown message type %d", ms->m_type);
3138 } 3504 }
@@ -3153,9 +3519,9 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3153{ 3519{
3154 if (middle_conversion(lkb)) { 3520 if (middle_conversion(lkb)) {
3155 hold_lkb(lkb); 3521 hold_lkb(lkb);
3522 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3156 ls->ls_stub_ms.m_result = -EINPROGRESS; 3523 ls->ls_stub_ms.m_result = -EINPROGRESS;
3157 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3524 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3158 _remove_from_waiters(lkb);
3159 _receive_convert_reply(lkb, &ls->ls_stub_ms); 3525 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3160 3526
3161 /* Same special case as in receive_rcom_lock_args() */ 3527 /* Same special case as in receive_rcom_lock_args() */
@@ -3227,18 +3593,18 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3227 3593
3228 case DLM_MSG_UNLOCK: 3594 case DLM_MSG_UNLOCK:
3229 hold_lkb(lkb); 3595 hold_lkb(lkb);
3596 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3230 ls->ls_stub_ms.m_result = -DLM_EUNLOCK; 3597 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3231 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3598 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3232 _remove_from_waiters(lkb);
3233 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 3599 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3234 dlm_put_lkb(lkb); 3600 dlm_put_lkb(lkb);
3235 break; 3601 break;
3236 3602
3237 case DLM_MSG_CANCEL: 3603 case DLM_MSG_CANCEL:
3238 hold_lkb(lkb); 3604 hold_lkb(lkb);
3605 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3239 ls->ls_stub_ms.m_result = -DLM_ECANCEL; 3606 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3240 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3607 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3241 _remove_from_waiters(lkb);
3242 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 3608 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3243 dlm_put_lkb(lkb); 3609 dlm_put_lkb(lkb);
3244 break; 3610 break;
@@ -3252,37 +3618,47 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3252 mutex_unlock(&ls->ls_waiters_mutex); 3618 mutex_unlock(&ls->ls_waiters_mutex);
3253} 3619}
3254 3620
3255static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 3621static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
3256{ 3622{
3257 struct dlm_lkb *lkb; 3623 struct dlm_lkb *lkb;
3258 int rv = 0; 3624 int found = 0;
3259 3625
3260 mutex_lock(&ls->ls_waiters_mutex); 3626 mutex_lock(&ls->ls_waiters_mutex);
3261 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 3627 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3262 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3628 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3263 rv = lkb->lkb_wait_type; 3629 hold_lkb(lkb);
3264 _remove_from_waiters(lkb); 3630 found = 1;
3265 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3266 break; 3631 break;
3267 } 3632 }
3268 } 3633 }
3269 mutex_unlock(&ls->ls_waiters_mutex); 3634 mutex_unlock(&ls->ls_waiters_mutex);
3270 3635
3271 if (!rv) 3636 if (!found)
3272 lkb = NULL; 3637 lkb = NULL;
3273 *lkb_ret = lkb; 3638 return lkb;
3274 return rv;
3275} 3639}
3276 3640
3277/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 3641/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3278 master or dir-node for r. Processing the lkb may result in it being placed 3642 master or dir-node for r. Processing the lkb may result in it being placed
3279 back on waiters. */ 3643 back on waiters. */
3280 3644
3645/* We do this after normal locking has been enabled and any saved messages
3646 (in requestqueue) have been processed. We should be confident that at
3647 this point we won't get or process a reply to any of these waiting
3648 operations. But, new ops may be coming in on the rsbs/locks here from
3649 userspace or remotely. */
3650
3651/* there may have been an overlap unlock/cancel prior to recovery or after
3652 recovery. if before, the lkb may still have a pos wait_count; if after, the
3653 overlap flag would just have been set and nothing new sent. we can be
3654 confident here than any replies to either the initial op or overlap ops
3655 prior to recovery have been received. */
3656
3281int dlm_recover_waiters_post(struct dlm_ls *ls) 3657int dlm_recover_waiters_post(struct dlm_ls *ls)
3282{ 3658{
3283 struct dlm_lkb *lkb; 3659 struct dlm_lkb *lkb;
3284 struct dlm_rsb *r; 3660 struct dlm_rsb *r;
3285 int error = 0, mstype; 3661 int error = 0, mstype, err, oc, ou;
3286 3662
3287 while (1) { 3663 while (1) {
3288 if (dlm_locking_stopped(ls)) { 3664 if (dlm_locking_stopped(ls)) {
@@ -3291,48 +3667,78 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
3291 break; 3667 break;
3292 } 3668 }
3293 3669
3294 mstype = remove_resend_waiter(ls, &lkb); 3670 lkb = find_resend_waiter(ls);
3295 if (!mstype) 3671 if (!lkb)
3296 break; 3672 break;
3297 3673
3298 r = lkb->lkb_resource; 3674 r = lkb->lkb_resource;
3675 hold_rsb(r);
3676 lock_rsb(r);
3677
3678 mstype = lkb->lkb_wait_type;
3679 oc = is_overlap_cancel(lkb);
3680 ou = is_overlap_unlock(lkb);
3681 err = 0;
3299 3682
3300 log_debug(ls, "recover_waiters_post %x type %d flags %x %s", 3683 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3301 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); 3684 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3302 3685
3303 switch (mstype) { 3686 /* At this point we assume that we won't get a reply to any
3304 3687 previous op or overlap op on this lock. First, do a big
3305 case DLM_MSG_LOOKUP: 3688 remove_from_waiters() for all previous ops. */
3306 hold_rsb(r); 3689
3307 lock_rsb(r); 3690 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3308 _request_lock(r, lkb); 3691 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3309 if (is_master(r)) 3692 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3310 confirm_master(r, 0); 3693 lkb->lkb_wait_type = 0;
3311 unlock_rsb(r); 3694 lkb->lkb_wait_count = 0;
3312 put_rsb(r); 3695 mutex_lock(&ls->ls_waiters_mutex);
3313 break; 3696 list_del_init(&lkb->lkb_wait_reply);
3314 3697 mutex_unlock(&ls->ls_waiters_mutex);
3315 case DLM_MSG_REQUEST: 3698 unhold_lkb(lkb); /* for waiters list */
3316 hold_rsb(r); 3699
3317 lock_rsb(r); 3700 if (oc || ou) {
3318 _request_lock(r, lkb); 3701 /* do an unlock or cancel instead of resending */
3319 if (is_master(r)) 3702 switch (mstype) {
3320 confirm_master(r, 0); 3703 case DLM_MSG_LOOKUP:
3321 unlock_rsb(r); 3704 case DLM_MSG_REQUEST:
3322 put_rsb(r); 3705 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3323 break; 3706 -DLM_ECANCEL);
3324 3707 unhold_lkb(lkb); /* undoes create_lkb() */
3325 case DLM_MSG_CONVERT: 3708 break;
3326 hold_rsb(r); 3709 case DLM_MSG_CONVERT:
3327 lock_rsb(r); 3710 if (oc) {
3328 _convert_lock(r, lkb); 3711 queue_cast(r, lkb, -DLM_ECANCEL);
3329 unlock_rsb(r); 3712 } else {
3330 put_rsb(r); 3713 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3331 break; 3714 _unlock_lock(r, lkb);
3332 3715 }
3333 default: 3716 break;
3334 log_error(ls, "recover_waiters_post type %d", mstype); 3717 default:
3718 err = 1;
3719 }
3720 } else {
3721 switch (mstype) {
3722 case DLM_MSG_LOOKUP:
3723 case DLM_MSG_REQUEST:
3724 _request_lock(r, lkb);
3725 if (is_master(r))
3726 confirm_master(r, 0);
3727 break;
3728 case DLM_MSG_CONVERT:
3729 _convert_lock(r, lkb);
3730 break;
3731 default:
3732 err = 1;
3733 }
3335 } 3734 }
3735
3736 if (err)
3737 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3738 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3739 unlock_rsb(r);
3740 put_rsb(r);
3741 dlm_put_lkb(lkb);
3336 } 3742 }
3337 3743
3338 return error; 3744 return error;
@@ -3684,7 +4090,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3684 4090
3685 /* add this new lkb to the per-process list of locks */ 4091 /* add this new lkb to the per-process list of locks */
3686 spin_lock(&ua->proc->locks_spin); 4092 spin_lock(&ua->proc->locks_spin);
3687 kref_get(&lkb->lkb_ref); 4093 hold_lkb(lkb);
3688 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 4094 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3689 spin_unlock(&ua->proc->locks_spin); 4095 spin_unlock(&ua->proc->locks_spin);
3690 out: 4096 out:
@@ -3774,6 +4180,9 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3774 4180
3775 if (error == -DLM_EUNLOCK) 4181 if (error == -DLM_EUNLOCK)
3776 error = 0; 4182 error = 0;
4183 /* from validate_unlock_args() */
4184 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4185 error = 0;
3777 if (error) 4186 if (error)
3778 goto out_put; 4187 goto out_put;
3779 4188
@@ -3786,6 +4195,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3786 dlm_put_lkb(lkb); 4195 dlm_put_lkb(lkb);
3787 out: 4196 out:
3788 unlock_recovery(ls); 4197 unlock_recovery(ls);
4198 kfree(ua_tmp);
3789 return error; 4199 return error;
3790} 4200}
3791 4201
@@ -3815,33 +4225,37 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3815 4225
3816 if (error == -DLM_ECANCEL) 4226 if (error == -DLM_ECANCEL)
3817 error = 0; 4227 error = 0;
3818 if (error) 4228 /* from validate_unlock_args() */
3819 goto out_put; 4229 if (error == -EBUSY)
3820 4230 error = 0;
3821 /* this lkb was removed from the WAITING queue */
3822 if (lkb->lkb_grmode == DLM_LOCK_IV) {
3823 spin_lock(&ua->proc->locks_spin);
3824 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3825 spin_unlock(&ua->proc->locks_spin);
3826 }
3827 out_put: 4231 out_put:
3828 dlm_put_lkb(lkb); 4232 dlm_put_lkb(lkb);
3829 out: 4233 out:
3830 unlock_recovery(ls); 4234 unlock_recovery(ls);
4235 kfree(ua_tmp);
3831 return error; 4236 return error;
3832} 4237}
3833 4238
4239/* lkb's that are removed from the waiters list by revert are just left on the
4240 orphans list with the granted orphan locks, to be freed by purge */
4241
3834static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 4242static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3835{ 4243{
3836 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; 4244 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4245 struct dlm_args args;
4246 int error;
3837 4247
3838 if (ua->lksb.sb_lvbptr) 4248 hold_lkb(lkb);
3839 kfree(ua->lksb.sb_lvbptr); 4249 mutex_lock(&ls->ls_orphans_mutex);
3840 kfree(ua); 4250 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
3841 lkb->lkb_astparam = (long)NULL; 4251 mutex_unlock(&ls->ls_orphans_mutex);
3842 4252
3843 /* TODO: propogate to master if needed */ 4253 set_unlock_args(0, ua, &args);
3844 return 0; 4254
4255 error = cancel_lock(ls, lkb, &args);
4256 if (error == -DLM_ECANCEL)
4257 error = 0;
4258 return error;
3845} 4259}
3846 4260
3847/* The force flag allows the unlock to go ahead even if the lkb isn't granted. 4261/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
@@ -3853,10 +4267,6 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3853 struct dlm_args args; 4267 struct dlm_args args;
3854 int error; 4268 int error;
3855 4269
3856 /* FIXME: we need to handle the case where the lkb is in limbo
3857 while the rsb is being looked up, currently we assert in
3858 _unlock_lock/is_remote because rsb nodeid is -1. */
3859
3860 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); 4270 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3861 4271
3862 error = unlock_lock(ls, lkb, &args); 4272 error = unlock_lock(ls, lkb, &args);
@@ -3865,6 +4275,31 @@ static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3865 return error; 4275 return error;
3866} 4276}
3867 4277
4278/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4279 (which does lock_rsb) due to deadlock with receiving a message that does
4280 lock_rsb followed by dlm_user_add_ast() */
4281
4282static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4283 struct dlm_user_proc *proc)
4284{
4285 struct dlm_lkb *lkb = NULL;
4286
4287 mutex_lock(&ls->ls_clear_proc_locks);
4288 if (list_empty(&proc->locks))
4289 goto out;
4290
4291 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4292 list_del_init(&lkb->lkb_ownqueue);
4293
4294 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4295 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4296 else
4297 lkb->lkb_flags |= DLM_IFL_DEAD;
4298 out:
4299 mutex_unlock(&ls->ls_clear_proc_locks);
4300 return lkb;
4301}
4302
3868/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 4303/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3869 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 4304 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3870 which we clear here. */ 4305 which we clear here. */
@@ -3880,18 +4315,15 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3880 struct dlm_lkb *lkb, *safe; 4315 struct dlm_lkb *lkb, *safe;
3881 4316
3882 lock_recovery(ls); 4317 lock_recovery(ls);
3883 mutex_lock(&ls->ls_clear_proc_locks);
3884 4318
3885 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) { 4319 while (1) {
3886 list_del_init(&lkb->lkb_ownqueue); 4320 lkb = del_proc_lock(ls, proc);
3887 4321 if (!lkb)
3888 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) { 4322 break;
3889 lkb->lkb_flags |= DLM_IFL_ORPHAN; 4323 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
3890 orphan_proc_lock(ls, lkb); 4324 orphan_proc_lock(ls, lkb);
3891 } else { 4325 else
3892 lkb->lkb_flags |= DLM_IFL_DEAD;
3893 unlock_proc_lock(ls, lkb); 4326 unlock_proc_lock(ls, lkb);
3894 }
3895 4327
3896 /* this removes the reference for the proc->locks list 4328 /* this removes the reference for the proc->locks list
3897 added by dlm_user_request, it may result in the lkb 4329 added by dlm_user_request, it may result in the lkb
@@ -3900,6 +4332,8 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3900 dlm_put_lkb(lkb); 4332 dlm_put_lkb(lkb);
3901 } 4333 }
3902 4334
4335 mutex_lock(&ls->ls_clear_proc_locks);
4336
3903 /* in-progress unlocks */ 4337 /* in-progress unlocks */
3904 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 4338 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
3905 list_del_init(&lkb->lkb_ownqueue); 4339 list_del_init(&lkb->lkb_ownqueue);
@@ -3916,3 +4350,92 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3916 unlock_recovery(ls); 4350 unlock_recovery(ls);
3917} 4351}
3918 4352
4353static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4354{
4355 struct dlm_lkb *lkb, *safe;
4356
4357 while (1) {
4358 lkb = NULL;
4359 spin_lock(&proc->locks_spin);
4360 if (!list_empty(&proc->locks)) {
4361 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4362 lkb_ownqueue);
4363 list_del_init(&lkb->lkb_ownqueue);
4364 }
4365 spin_unlock(&proc->locks_spin);
4366
4367 if (!lkb)
4368 break;
4369
4370 lkb->lkb_flags |= DLM_IFL_DEAD;
4371 unlock_proc_lock(ls, lkb);
4372 dlm_put_lkb(lkb); /* ref from proc->locks list */
4373 }
4374
4375 spin_lock(&proc->locks_spin);
4376 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4377 list_del_init(&lkb->lkb_ownqueue);
4378 lkb->lkb_flags |= DLM_IFL_DEAD;
4379 dlm_put_lkb(lkb);
4380 }
4381 spin_unlock(&proc->locks_spin);
4382
4383 spin_lock(&proc->asts_spin);
4384 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4385 list_del(&lkb->lkb_astqueue);
4386 dlm_put_lkb(lkb);
4387 }
4388 spin_unlock(&proc->asts_spin);
4389}
4390
4391/* pid of 0 means purge all orphans */
4392
4393static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4394{
4395 struct dlm_lkb *lkb, *safe;
4396
4397 mutex_lock(&ls->ls_orphans_mutex);
4398 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4399 if (pid && lkb->lkb_ownpid != pid)
4400 continue;
4401 unlock_proc_lock(ls, lkb);
4402 list_del_init(&lkb->lkb_ownqueue);
4403 dlm_put_lkb(lkb);
4404 }
4405 mutex_unlock(&ls->ls_orphans_mutex);
4406}
4407
4408static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4409{
4410 struct dlm_message *ms;
4411 struct dlm_mhandle *mh;
4412 int error;
4413
4414 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4415 DLM_MSG_PURGE, &ms, &mh);
4416 if (error)
4417 return error;
4418 ms->m_nodeid = nodeid;
4419 ms->m_pid = pid;
4420
4421 return send_message(mh, ms);
4422}
4423
4424int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4425 int nodeid, int pid)
4426{
4427 int error = 0;
4428
4429 if (nodeid != dlm_our_nodeid()) {
4430 error = send_purge(ls, nodeid, pid);
4431 } else {
4432 lock_recovery(ls);
4433 if (pid == current->pid)
4434 purge_proc_locks(ls, proc);
4435 else
4436 do_purge(ls, nodeid, pid);
4437 unlock_recovery(ls);
4438 }
4439 return error;
4440}
4441
diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h
index 0843a3073ec3..64fc4ec40668 100644
--- a/fs/dlm/lock.h
+++ b/fs/dlm/lock.h
@@ -41,6 +41,8 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
41 uint32_t flags, uint32_t lkid, char *lvb_in); 41 uint32_t flags, uint32_t lkid, char *lvb_in);
42int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 42int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
43 uint32_t flags, uint32_t lkid); 43 uint32_t flags, uint32_t lkid);
44int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
45 int nodeid, int pid);
44void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc); 46void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc);
45 47
46static inline int is_master(struct dlm_rsb *r) 48static inline int is_master(struct dlm_rsb *r)
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 8e6ad7600236..a677b2a5eed4 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -2,7 +2,7 @@
2******************************************************************************* 2*******************************************************************************
3** 3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 5** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved.
6** 6**
7** This copyrighted material is made available to anyone wishing to use, 7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions 8** modify, copy, or redistribute it subject to the terms and conditions
@@ -459,6 +459,8 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
459 459
460 INIT_LIST_HEAD(&ls->ls_waiters); 460 INIT_LIST_HEAD(&ls->ls_waiters);
461 mutex_init(&ls->ls_waiters_mutex); 461 mutex_init(&ls->ls_waiters_mutex);
462 INIT_LIST_HEAD(&ls->ls_orphans);
463 mutex_init(&ls->ls_orphans_mutex);
462 464
463 INIT_LIST_HEAD(&ls->ls_nodes); 465 INIT_LIST_HEAD(&ls->ls_nodes);
464 INIT_LIST_HEAD(&ls->ls_nodes_gone); 466 INIT_LIST_HEAD(&ls->ls_nodes_gone);
diff --git a/fs/dlm/lowcomms-sctp.c b/fs/dlm/lowcomms-sctp.c
deleted file mode 100644
index dc83a9d979b5..000000000000
--- a/fs/dlm/lowcomms-sctp.c
+++ /dev/null
@@ -1,1210 +0,0 @@
1/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
6**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never (well, hardly ever) waits.
42 *
43 */
44
45#include <asm/ioctls.h>
46#include <net/sock.h>
47#include <net/tcp.h>
48#include <net/sctp/user.h>
49#include <linux/pagemap.h>
50#include <linux/socket.h>
51#include <linux/idr.h>
52
53#include "dlm_internal.h"
54#include "lowcomms.h"
55#include "config.h"
56#include "midcomms.h"
57
58static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
59static int dlm_local_count;
60static int dlm_local_nodeid;
61
62/* One of these per connected node */
63
64#define NI_INIT_PENDING 1
65#define NI_WRITE_PENDING 2
66
67struct nodeinfo {
68 spinlock_t lock;
69 sctp_assoc_t assoc_id;
70 unsigned long flags;
71 struct list_head write_list; /* nodes with pending writes */
72 struct list_head writequeue; /* outgoing writequeue_entries */
73 spinlock_t writequeue_lock;
74 int nodeid;
75 struct work_struct swork; /* Send workqueue */
76 struct work_struct lwork; /* Locking workqueue */
77};
78
79static DEFINE_IDR(nodeinfo_idr);
80static DECLARE_RWSEM(nodeinfo_lock);
81static int max_nodeid;
82
83struct cbuf {
84 unsigned int base;
85 unsigned int len;
86 unsigned int mask;
87};
88
89/* Just the one of these, now. But this struct keeps
90 the connection-specific variables together */
91
92#define CF_READ_PENDING 1
93
94struct connection {
95 struct socket *sock;
96 unsigned long flags;
97 struct page *rx_page;
98 atomic_t waiting_requests;
99 struct cbuf cb;
100 int eagain_flag;
101 struct work_struct work; /* Send workqueue */
102};
103
104/* An entry waiting to be sent */
105
106struct writequeue_entry {
107 struct list_head list;
108 struct page *page;
109 int offset;
110 int len;
111 int end;
112 int users;
113 struct nodeinfo *ni;
114};
115
116static void cbuf_add(struct cbuf *cb, int n)
117{
118 cb->len += n;
119}
120
121static int cbuf_data(struct cbuf *cb)
122{
123 return ((cb->base + cb->len) & cb->mask);
124}
125
126static void cbuf_init(struct cbuf *cb, int size)
127{
128 cb->base = cb->len = 0;
129 cb->mask = size-1;
130}
131
132static void cbuf_eat(struct cbuf *cb, int n)
133{
134 cb->len -= n;
135 cb->base += n;
136 cb->base &= cb->mask;
137}
138
139/* List of nodes which have writes pending */
140static LIST_HEAD(write_nodes);
141static DEFINE_SPINLOCK(write_nodes_lock);
142
143
144/* Maximum number of incoming messages to process before
145 * doing a schedule()
146 */
147#define MAX_RX_MSG_COUNT 25
148
149/* Work queues */
150static struct workqueue_struct *recv_workqueue;
151static struct workqueue_struct *send_workqueue;
152static struct workqueue_struct *lock_workqueue;
153
154/* The SCTP connection */
155static struct connection sctp_con;
156
157static void process_send_sockets(struct work_struct *work);
158static void process_recv_sockets(struct work_struct *work);
159static void process_lock_request(struct work_struct *work);
160
161static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
162{
163 struct sockaddr_storage addr;
164 int error;
165
166 if (!dlm_local_count)
167 return -1;
168
169 error = dlm_nodeid_to_addr(nodeid, &addr);
170 if (error)
171 return error;
172
173 if (dlm_local_addr[0]->ss_family == AF_INET) {
174 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
175 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
176 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
177 } else {
178 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
179 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
180 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
181 sizeof(in6->sin6_addr));
182 }
183
184 return 0;
185}
186
187/* If alloc is 0 here we will not attempt to allocate a new
188 nodeinfo struct */
189static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
190{
191 struct nodeinfo *ni;
192 int r;
193 int n;
194
195 down_read(&nodeinfo_lock);
196 ni = idr_find(&nodeinfo_idr, nodeid);
197 up_read(&nodeinfo_lock);
198
199 if (ni || !alloc)
200 return ni;
201
202 down_write(&nodeinfo_lock);
203
204 ni = idr_find(&nodeinfo_idr, nodeid);
205 if (ni)
206 goto out_up;
207
208 r = idr_pre_get(&nodeinfo_idr, alloc);
209 if (!r)
210 goto out_up;
211
212 ni = kmalloc(sizeof(struct nodeinfo), alloc);
213 if (!ni)
214 goto out_up;
215
216 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
217 if (r) {
218 kfree(ni);
219 ni = NULL;
220 goto out_up;
221 }
222 if (n != nodeid) {
223 idr_remove(&nodeinfo_idr, n);
224 kfree(ni);
225 ni = NULL;
226 goto out_up;
227 }
228 memset(ni, 0, sizeof(struct nodeinfo));
229 spin_lock_init(&ni->lock);
230 INIT_LIST_HEAD(&ni->writequeue);
231 spin_lock_init(&ni->writequeue_lock);
232 INIT_WORK(&ni->lwork, process_lock_request);
233 INIT_WORK(&ni->swork, process_send_sockets);
234 ni->nodeid = nodeid;
235
236 if (nodeid > max_nodeid)
237 max_nodeid = nodeid;
238out_up:
239 up_write(&nodeinfo_lock);
240
241 return ni;
242}
243
244/* Don't call this too often... */
245static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
246{
247 int i;
248 struct nodeinfo *ni;
249
250 for (i=1; i<=max_nodeid; i++) {
251 ni = nodeid2nodeinfo(i, 0);
252 if (ni && ni->assoc_id == assoc)
253 return ni;
254 }
255 return NULL;
256}
257
258/* Data or notification available on socket */
259static void lowcomms_data_ready(struct sock *sk, int count_unused)
260{
261 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
262 queue_work(recv_workqueue, &sctp_con.work);
263}
264
265
266/* Add the port number to an IP6 or 4 sockaddr and return the address length.
267 Also padd out the struct with zeros to make comparisons meaningful */
268
269static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
270 int *addr_len)
271{
272 struct sockaddr_in *local4_addr;
273 struct sockaddr_in6 *local6_addr;
274
275 if (!dlm_local_count)
276 return;
277
278 if (!port) {
279 if (dlm_local_addr[0]->ss_family == AF_INET) {
280 local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
281 port = be16_to_cpu(local4_addr->sin_port);
282 } else {
283 local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
284 port = be16_to_cpu(local6_addr->sin6_port);
285 }
286 }
287
288 saddr->ss_family = dlm_local_addr[0]->ss_family;
289 if (dlm_local_addr[0]->ss_family == AF_INET) {
290 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
291 in4_addr->sin_port = cpu_to_be16(port);
292 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
293 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
294 sizeof(struct sockaddr_in));
295 *addr_len = sizeof(struct sockaddr_in);
296 } else {
297 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
298 in6_addr->sin6_port = cpu_to_be16(port);
299 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
300 sizeof(struct sockaddr_in6));
301 *addr_len = sizeof(struct sockaddr_in6);
302 }
303}
304
305/* Close the connection and tidy up */
306static void close_connection(void)
307{
308 if (sctp_con.sock) {
309 sock_release(sctp_con.sock);
310 sctp_con.sock = NULL;
311 }
312
313 if (sctp_con.rx_page) {
314 __free_page(sctp_con.rx_page);
315 sctp_con.rx_page = NULL;
316 }
317}
318
319/* We only send shutdown messages to nodes that are not part of the cluster */
320static void send_shutdown(sctp_assoc_t associd)
321{
322 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
323 struct msghdr outmessage;
324 struct cmsghdr *cmsg;
325 struct sctp_sndrcvinfo *sinfo;
326 int ret;
327
328 outmessage.msg_name = NULL;
329 outmessage.msg_namelen = 0;
330 outmessage.msg_control = outcmsg;
331 outmessage.msg_controllen = sizeof(outcmsg);
332 outmessage.msg_flags = MSG_EOR;
333
334 cmsg = CMSG_FIRSTHDR(&outmessage);
335 cmsg->cmsg_level = IPPROTO_SCTP;
336 cmsg->cmsg_type = SCTP_SNDRCV;
337 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
338 outmessage.msg_controllen = cmsg->cmsg_len;
339 sinfo = CMSG_DATA(cmsg);
340 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
341
342 sinfo->sinfo_flags |= MSG_EOF;
343 sinfo->sinfo_assoc_id = associd;
344
345 ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
346
347 if (ret != 0)
348 log_print("send EOF to node failed: %d", ret);
349}
350
351
352/* INIT failed but we don't know which node...
353 restart INIT on all pending nodes */
354static void init_failed(void)
355{
356 int i;
357 struct nodeinfo *ni;
358
359 for (i=1; i<=max_nodeid; i++) {
360 ni = nodeid2nodeinfo(i, 0);
361 if (!ni)
362 continue;
363
364 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
365 ni->assoc_id = 0;
366 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
367 spin_lock_bh(&write_nodes_lock);
368 list_add_tail(&ni->write_list, &write_nodes);
369 spin_unlock_bh(&write_nodes_lock);
370 queue_work(send_workqueue, &ni->swork);
371 }
372 }
373 }
374}
375
376/* Something happened to an association */
377static void process_sctp_notification(struct msghdr *msg, char *buf)
378{
379 union sctp_notification *sn = (union sctp_notification *)buf;
380
381 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
382 switch (sn->sn_assoc_change.sac_state) {
383
384 case SCTP_COMM_UP:
385 case SCTP_RESTART:
386 {
387 /* Check that the new node is in the lockspace */
388 struct sctp_prim prim;
389 mm_segment_t fs;
390 int nodeid;
391 int prim_len, ret;
392 int addr_len;
393 struct nodeinfo *ni;
394
395 /* This seems to happen when we received a connection
396 * too early... or something... anyway, it happens but
397 * we always seem to get a real message too, see
398 * receive_from_sock */
399
400 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
401 log_print("COMM_UP for invalid assoc ID %d",
402 (int)sn->sn_assoc_change.sac_assoc_id);
403 init_failed();
404 return;
405 }
406 memset(&prim, 0, sizeof(struct sctp_prim));
407 prim_len = sizeof(struct sctp_prim);
408 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
409
410 fs = get_fs();
411 set_fs(get_ds());
412 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
413 IPPROTO_SCTP,
414 SCTP_PRIMARY_ADDR,
415 (char*)&prim,
416 &prim_len);
417 set_fs(fs);
418 if (ret < 0) {
419 struct nodeinfo *ni;
420
421 log_print("getsockopt/sctp_primary_addr on "
422 "new assoc %d failed : %d",
423 (int)sn->sn_assoc_change.sac_assoc_id,
424 ret);
425
426 /* Retry INIT later */
427 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
428 if (ni)
429 clear_bit(NI_INIT_PENDING, &ni->flags);
430 return;
431 }
432 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
433 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
434 log_print("reject connect from unknown addr");
435 send_shutdown(prim.ssp_assoc_id);
436 return;
437 }
438
439 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
440 if (!ni)
441 return;
442
443 /* Save the assoc ID */
444 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
445
446 log_print("got new/restarted association %d nodeid %d",
447 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
448
449 /* Send any pending writes */
450 clear_bit(NI_INIT_PENDING, &ni->flags);
451 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
452 spin_lock_bh(&write_nodes_lock);
453 list_add_tail(&ni->write_list, &write_nodes);
454 spin_unlock_bh(&write_nodes_lock);
455 queue_work(send_workqueue, &ni->swork);
456 }
457 }
458 break;
459
460 case SCTP_COMM_LOST:
461 case SCTP_SHUTDOWN_COMP:
462 {
463 struct nodeinfo *ni;
464
465 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
466 if (ni) {
467 spin_lock(&ni->lock);
468 ni->assoc_id = 0;
469 spin_unlock(&ni->lock);
470 }
471 }
472 break;
473
474 /* We don't know which INIT failed, so clear the PENDING flags
475 * on them all. if assoc_id is zero then it will then try
476 * again */
477
478 case SCTP_CANT_STR_ASSOC:
479 {
480 log_print("Can't start SCTP association - retrying");
481 init_failed();
482 }
483 break;
484
485 default:
486 log_print("unexpected SCTP assoc change id=%d state=%d",
487 (int)sn->sn_assoc_change.sac_assoc_id,
488 sn->sn_assoc_change.sac_state);
489 }
490 }
491}
492
493/* Data received from remote end */
494static int receive_from_sock(void)
495{
496 int ret = 0;
497 struct msghdr msg;
498 struct kvec iov[2];
499 unsigned len;
500 int r;
501 struct sctp_sndrcvinfo *sinfo;
502 struct cmsghdr *cmsg;
503 struct nodeinfo *ni;
504
505 /* These two are marginally too big for stack allocation, but this
506 * function is (currently) only called by dlm_recvd so static should be
507 * OK.
508 */
509 static struct sockaddr_storage msgname;
510 static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
511
512 if (sctp_con.sock == NULL)
513 goto out;
514
515 if (sctp_con.rx_page == NULL) {
516 /*
517 * This doesn't need to be atomic, but I think it should
518 * improve performance if it is.
519 */
520 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
521 if (sctp_con.rx_page == NULL)
522 goto out_resched;
523 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
524 }
525
526 memset(&incmsg, 0, sizeof(incmsg));
527 memset(&msgname, 0, sizeof(msgname));
528
529 msg.msg_name = &msgname;
530 msg.msg_namelen = sizeof(msgname);
531 msg.msg_flags = 0;
532 msg.msg_control = incmsg;
533 msg.msg_controllen = sizeof(incmsg);
534 msg.msg_iovlen = 1;
535
536 /* I don't see why this circular buffer stuff is necessary for SCTP
537 * which is a packet-based protocol, but the whole thing breaks under
538 * load without it! The overhead is minimal (and is in the TCP lowcomms
539 * anyway, of course) so I'll leave it in until I can figure out what's
540 * really happening.
541 */
542
543 /*
544 * iov[0] is the bit of the circular buffer between the current end
545 * point (cb.base + cb.len) and the end of the buffer.
546 */
547 iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
548 iov[0].iov_base = page_address(sctp_con.rx_page) +
549 cbuf_data(&sctp_con.cb);
550 iov[1].iov_len = 0;
551
552 /*
553 * iov[1] is the bit of the circular buffer between the start of the
554 * buffer and the start of the currently used section (cb.base)
555 */
556 if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
557 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
558 iov[1].iov_len = sctp_con.cb.base;
559 iov[1].iov_base = page_address(sctp_con.rx_page);
560 msg.msg_iovlen = 2;
561 }
562 len = iov[0].iov_len + iov[1].iov_len;
563
564 r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
565 MSG_NOSIGNAL | MSG_DONTWAIT);
566 if (ret <= 0)
567 goto out_close;
568
569 msg.msg_control = incmsg;
570 msg.msg_controllen = sizeof(incmsg);
571 cmsg = CMSG_FIRSTHDR(&msg);
572 sinfo = CMSG_DATA(cmsg);
573
574 if (msg.msg_flags & MSG_NOTIFICATION) {
575 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
576 return 0;
577 }
578
579 /* Is this a new association ? */
580 ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
581 if (ni) {
582 ni->assoc_id = sinfo->sinfo_assoc_id;
583 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
584
585 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
586 spin_lock_bh(&write_nodes_lock);
587 list_add_tail(&ni->write_list, &write_nodes);
588 spin_unlock_bh(&write_nodes_lock);
589 queue_work(send_workqueue, &ni->swork);
590 }
591 }
592 }
593
594 /* INIT sends a message with length of 1 - ignore it */
595 if (r == 1)
596 return 0;
597
598 cbuf_add(&sctp_con.cb, ret);
599 // PJC: TODO: Add to node's workqueue....can we ??
600 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
601 page_address(sctp_con.rx_page),
602 sctp_con.cb.base, sctp_con.cb.len,
603 PAGE_CACHE_SIZE);
604 if (ret < 0)
605 goto out_close;
606 cbuf_eat(&sctp_con.cb, ret);
607
608out:
609 ret = 0;
610 goto out_ret;
611
612out_resched:
613 lowcomms_data_ready(sctp_con.sock->sk, 0);
614 ret = 0;
615 cond_resched();
616 goto out_ret;
617
618out_close:
619 if (ret != -EAGAIN)
620 log_print("error reading from sctp socket: %d", ret);
621out_ret:
622 return ret;
623}
624
625/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
626static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
627{
628 mm_segment_t fs;
629 int result = 0;
630
631 fs = get_fs();
632 set_fs(get_ds());
633 if (num == 1)
634 result = sctp_con.sock->ops->bind(sctp_con.sock,
635 (struct sockaddr *) addr,
636 addr_len);
637 else
638 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
639 SCTP_SOCKOPT_BINDX_ADD,
640 (char *)addr, addr_len);
641 set_fs(fs);
642
643 if (result < 0)
644 log_print("Can't bind to port %d addr number %d",
645 dlm_config.ci_tcp_port, num);
646
647 return result;
648}
649
650static void init_local(void)
651{
652 struct sockaddr_storage sas, *addr;
653 int i;
654
655 dlm_local_nodeid = dlm_our_nodeid();
656
657 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
658 if (dlm_our_addr(&sas, i))
659 break;
660
661 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
662 if (!addr)
663 break;
664 memcpy(addr, &sas, sizeof(*addr));
665 dlm_local_addr[dlm_local_count++] = addr;
666 }
667}
668
669/* Initialise SCTP socket and bind to all interfaces */
670static int init_sock(void)
671{
672 mm_segment_t fs;
673 struct socket *sock = NULL;
674 struct sockaddr_storage localaddr;
675 struct sctp_event_subscribe subscribe;
676 int result = -EINVAL, num = 1, i, addr_len;
677
678 if (!dlm_local_count) {
679 init_local();
680 if (!dlm_local_count) {
681 log_print("no local IP address has been set");
682 goto out;
683 }
684 }
685
686 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
687 IPPROTO_SCTP, &sock);
688 if (result < 0) {
689 log_print("Can't create comms socket, check SCTP is loaded");
690 goto out;
691 }
692
693 /* Listen for events */
694 memset(&subscribe, 0, sizeof(subscribe));
695 subscribe.sctp_data_io_event = 1;
696 subscribe.sctp_association_event = 1;
697 subscribe.sctp_send_failure_event = 1;
698 subscribe.sctp_shutdown_event = 1;
699 subscribe.sctp_partial_delivery_event = 1;
700
701 fs = get_fs();
702 set_fs(get_ds());
703 result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
704 (char *)&subscribe, sizeof(subscribe));
705 set_fs(fs);
706
707 if (result < 0) {
708 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
709 result);
710 goto create_delsock;
711 }
712
713 /* Init con struct */
714 sock->sk->sk_user_data = &sctp_con;
715 sctp_con.sock = sock;
716 sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
717
718 /* Bind to all interfaces. */
719 for (i = 0; i < dlm_local_count; i++) {
720 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
721 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
722
723 result = add_bind_addr(&localaddr, addr_len, num);
724 if (result)
725 goto create_delsock;
726 ++num;
727 }
728
729 result = sock->ops->listen(sock, 5);
730 if (result < 0) {
731 log_print("Can't set socket listening");
732 goto create_delsock;
733 }
734
735 return 0;
736
737create_delsock:
738 sock_release(sock);
739 sctp_con.sock = NULL;
740out:
741 return result;
742}
743
744
745static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
746{
747 struct writequeue_entry *entry;
748
749 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
750 if (!entry)
751 return NULL;
752
753 entry->page = alloc_page(allocation);
754 if (!entry->page) {
755 kfree(entry);
756 return NULL;
757 }
758
759 entry->offset = 0;
760 entry->len = 0;
761 entry->end = 0;
762 entry->users = 0;
763
764 return entry;
765}
766
767void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
768{
769 struct writequeue_entry *e;
770 int offset = 0;
771 int users = 0;
772 struct nodeinfo *ni;
773
774 ni = nodeid2nodeinfo(nodeid, allocation);
775 if (!ni)
776 return NULL;
777
778 spin_lock(&ni->writequeue_lock);
779 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
780 if ((&e->list == &ni->writequeue) ||
781 (PAGE_CACHE_SIZE - e->end < len)) {
782 e = NULL;
783 } else {
784 offset = e->end;
785 e->end += len;
786 users = e->users++;
787 }
788 spin_unlock(&ni->writequeue_lock);
789
790 if (e) {
791 got_one:
792 if (users == 0)
793 kmap(e->page);
794 *ppc = page_address(e->page) + offset;
795 return e;
796 }
797
798 e = new_writequeue_entry(allocation);
799 if (e) {
800 spin_lock(&ni->writequeue_lock);
801 offset = e->end;
802 e->end += len;
803 e->ni = ni;
804 users = e->users++;
805 list_add_tail(&e->list, &ni->writequeue);
806 spin_unlock(&ni->writequeue_lock);
807 goto got_one;
808 }
809 return NULL;
810}
811
812void dlm_lowcomms_commit_buffer(void *arg)
813{
814 struct writequeue_entry *e = (struct writequeue_entry *) arg;
815 int users;
816 struct nodeinfo *ni = e->ni;
817
818 spin_lock(&ni->writequeue_lock);
819 users = --e->users;
820 if (users)
821 goto out;
822 e->len = e->end - e->offset;
823 kunmap(e->page);
824 spin_unlock(&ni->writequeue_lock);
825
826 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
827 spin_lock_bh(&write_nodes_lock);
828 list_add_tail(&ni->write_list, &write_nodes);
829 spin_unlock_bh(&write_nodes_lock);
830
831 queue_work(send_workqueue, &ni->swork);
832 }
833 return;
834
835out:
836 spin_unlock(&ni->writequeue_lock);
837 return;
838}
839
840static void free_entry(struct writequeue_entry *e)
841{
842 __free_page(e->page);
843 kfree(e);
844}
845
846/* Initiate an SCTP association. In theory we could just use sendmsg() on
847 the first IP address and it should work, but this allows us to set up the
848 association before sending any valuable data that we can't afford to lose.
849 It also keeps the send path clean as it can now always use the association ID */
850static void initiate_association(int nodeid)
851{
852 struct sockaddr_storage rem_addr;
853 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
854 struct msghdr outmessage;
855 struct cmsghdr *cmsg;
856 struct sctp_sndrcvinfo *sinfo;
857 int ret;
858 int addrlen;
859 char buf[1];
860 struct kvec iov[1];
861 struct nodeinfo *ni;
862
863 log_print("Initiating association with node %d", nodeid);
864
865 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
866 if (!ni)
867 return;
868
869 if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
870 log_print("no address for nodeid %d", nodeid);
871 return;
872 }
873
874 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
875
876 outmessage.msg_name = &rem_addr;
877 outmessage.msg_namelen = addrlen;
878 outmessage.msg_control = outcmsg;
879 outmessage.msg_controllen = sizeof(outcmsg);
880 outmessage.msg_flags = MSG_EOR;
881
882 iov[0].iov_base = buf;
883 iov[0].iov_len = 1;
884
885 /* Real INIT messages seem to cause trouble. Just send a 1 byte message
886 we can afford to lose */
887 cmsg = CMSG_FIRSTHDR(&outmessage);
888 cmsg->cmsg_level = IPPROTO_SCTP;
889 cmsg->cmsg_type = SCTP_SNDRCV;
890 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
891 sinfo = CMSG_DATA(cmsg);
892 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
893 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
894
895 outmessage.msg_controllen = cmsg->cmsg_len;
896 ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
897 if (ret < 0) {
898 log_print("send INIT to node failed: %d", ret);
899 /* Try again later */
900 clear_bit(NI_INIT_PENDING, &ni->flags);
901 }
902}
903
904/* Send a message */
905static void send_to_sock(struct nodeinfo *ni)
906{
907 int ret = 0;
908 struct writequeue_entry *e;
909 int len, offset;
910 struct msghdr outmsg;
911 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
912 struct cmsghdr *cmsg;
913 struct sctp_sndrcvinfo *sinfo;
914 struct kvec iov;
915
916 /* See if we need to init an association before we start
917 sending precious messages */
918 spin_lock(&ni->lock);
919 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
920 spin_unlock(&ni->lock);
921 initiate_association(ni->nodeid);
922 return;
923 }
924 spin_unlock(&ni->lock);
925
926 outmsg.msg_name = NULL; /* We use assoc_id */
927 outmsg.msg_namelen = 0;
928 outmsg.msg_control = outcmsg;
929 outmsg.msg_controllen = sizeof(outcmsg);
930 outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
931
932 cmsg = CMSG_FIRSTHDR(&outmsg);
933 cmsg->cmsg_level = IPPROTO_SCTP;
934 cmsg->cmsg_type = SCTP_SNDRCV;
935 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
936 sinfo = CMSG_DATA(cmsg);
937 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
938 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
939 sinfo->sinfo_assoc_id = ni->assoc_id;
940 outmsg.msg_controllen = cmsg->cmsg_len;
941
942 spin_lock(&ni->writequeue_lock);
943 for (;;) {
944 if (list_empty(&ni->writequeue))
945 break;
946 e = list_entry(ni->writequeue.next, struct writequeue_entry,
947 list);
948 len = e->len;
949 offset = e->offset;
950 BUG_ON(len == 0 && e->users == 0);
951 spin_unlock(&ni->writequeue_lock);
952 kmap(e->page);
953
954 ret = 0;
955 if (len) {
956 iov.iov_base = page_address(e->page)+offset;
957 iov.iov_len = len;
958
959 ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
960 len);
961 if (ret == -EAGAIN) {
962 sctp_con.eagain_flag = 1;
963 goto out;
964 } else if (ret < 0)
965 goto send_error;
966 } else {
967 /* Don't starve people filling buffers */
968 cond_resched();
969 }
970
971 spin_lock(&ni->writequeue_lock);
972 e->offset += ret;
973 e->len -= ret;
974
975 if (e->len == 0 && e->users == 0) {
976 list_del(&e->list);
977 kunmap(e->page);
978 free_entry(e);
979 continue;
980 }
981 }
982 spin_unlock(&ni->writequeue_lock);
983out:
984 return;
985
986send_error:
987 log_print("Error sending to node %d %d", ni->nodeid, ret);
988 spin_lock(&ni->lock);
989 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
990 ni->assoc_id = 0;
991 spin_unlock(&ni->lock);
992 initiate_association(ni->nodeid);
993 } else
994 spin_unlock(&ni->lock);
995
996 return;
997}
998
999/* Try to send any messages that are pending */
1000static void process_output_queue(void)
1001{
1002 struct list_head *list;
1003 struct list_head *temp;
1004
1005 spin_lock_bh(&write_nodes_lock);
1006 list_for_each_safe(list, temp, &write_nodes) {
1007 struct nodeinfo *ni =
1008 list_entry(list, struct nodeinfo, write_list);
1009 clear_bit(NI_WRITE_PENDING, &ni->flags);
1010 list_del(&ni->write_list);
1011
1012 spin_unlock_bh(&write_nodes_lock);
1013
1014 send_to_sock(ni);
1015 spin_lock_bh(&write_nodes_lock);
1016 }
1017 spin_unlock_bh(&write_nodes_lock);
1018}
1019
1020/* Called after we've had -EAGAIN and been woken up */
1021static void refill_write_queue(void)
1022{
1023 int i;
1024
1025 for (i=1; i<=max_nodeid; i++) {
1026 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1027
1028 if (ni) {
1029 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
1030 spin_lock_bh(&write_nodes_lock);
1031 list_add_tail(&ni->write_list, &write_nodes);
1032 spin_unlock_bh(&write_nodes_lock);
1033 }
1034 }
1035 }
1036}
1037
1038static void clean_one_writequeue(struct nodeinfo *ni)
1039{
1040 struct list_head *list;
1041 struct list_head *temp;
1042
1043 spin_lock(&ni->writequeue_lock);
1044 list_for_each_safe(list, temp, &ni->writequeue) {
1045 struct writequeue_entry *e =
1046 list_entry(list, struct writequeue_entry, list);
1047 list_del(&e->list);
1048 free_entry(e);
1049 }
1050 spin_unlock(&ni->writequeue_lock);
1051}
1052
1053static void clean_writequeues(void)
1054{
1055 int i;
1056
1057 for (i=1; i<=max_nodeid; i++) {
1058 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1059 if (ni)
1060 clean_one_writequeue(ni);
1061 }
1062}
1063
1064
1065static void dealloc_nodeinfo(void)
1066{
1067 int i;
1068
1069 for (i=1; i<=max_nodeid; i++) {
1070 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1071 if (ni) {
1072 idr_remove(&nodeinfo_idr, i);
1073 kfree(ni);
1074 }
1075 }
1076}
1077
1078int dlm_lowcomms_close(int nodeid)
1079{
1080 struct nodeinfo *ni;
1081
1082 ni = nodeid2nodeinfo(nodeid, 0);
1083 if (!ni)
1084 return -1;
1085
1086 spin_lock(&ni->lock);
1087 if (ni->assoc_id) {
1088 ni->assoc_id = 0;
1089 /* Don't send shutdown here, sctp will just queue it
1090 till the node comes back up! */
1091 }
1092 spin_unlock(&ni->lock);
1093
1094 clean_one_writequeue(ni);
1095 clear_bit(NI_INIT_PENDING, &ni->flags);
1096 return 0;
1097}
1098
1099// PJC: The work queue function for receiving.
1100static void process_recv_sockets(struct work_struct *work)
1101{
1102 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1103 int ret;
1104 int count = 0;
1105
1106 do {
1107 ret = receive_from_sock();
1108
1109 /* Don't starve out everyone else */
1110 if (++count >= MAX_RX_MSG_COUNT) {
1111 cond_resched();
1112 count = 0;
1113 }
1114 } while (!kthread_should_stop() && ret >=0);
1115 }
1116 cond_resched();
1117}
1118
1119// PJC: the work queue function for sending
1120static void process_send_sockets(struct work_struct *work)
1121{
1122 if (sctp_con.eagain_flag) {
1123 sctp_con.eagain_flag = 0;
1124 refill_write_queue();
1125 }
1126 process_output_queue();
1127}
1128
1129// PJC: Process lock requests from a particular node.
1130// TODO: can we optimise this out on UP ??
1131static void process_lock_request(struct work_struct *work)
1132{
1133}
1134
1135static void daemons_stop(void)
1136{
1137 destroy_workqueue(recv_workqueue);
1138 destroy_workqueue(send_workqueue);
1139 destroy_workqueue(lock_workqueue);
1140}
1141
1142static int daemons_start(void)
1143{
1144 int error;
1145 recv_workqueue = create_workqueue("dlm_recv");
1146 error = IS_ERR(recv_workqueue);
1147 if (error) {
1148 log_print("can't start dlm_recv %d", error);
1149 return error;
1150 }
1151
1152 send_workqueue = create_singlethread_workqueue("dlm_send");
1153 error = IS_ERR(send_workqueue);
1154 if (error) {
1155 log_print("can't start dlm_send %d", error);
1156 destroy_workqueue(recv_workqueue);
1157 return error;
1158 }
1159
1160 lock_workqueue = create_workqueue("dlm_rlock");
1161 error = IS_ERR(lock_workqueue);
1162 if (error) {
1163 log_print("can't start dlm_rlock %d", error);
1164 destroy_workqueue(send_workqueue);
1165 destroy_workqueue(recv_workqueue);
1166 return error;
1167 }
1168
1169 return 0;
1170}
1171
1172/*
1173 * This is quite likely to sleep...
1174 */
1175int dlm_lowcomms_start(void)
1176{
1177 int error;
1178
1179 INIT_WORK(&sctp_con.work, process_recv_sockets);
1180
1181 error = init_sock();
1182 if (error)
1183 goto fail_sock;
1184 error = daemons_start();
1185 if (error)
1186 goto fail_sock;
1187 return 0;
1188
1189fail_sock:
1190 close_connection();
1191 return error;
1192}
1193
1194void dlm_lowcomms_stop(void)
1195{
1196 int i;
1197
1198 sctp_con.flags = 0x7;
1199 daemons_stop();
1200 clean_writequeues();
1201 close_connection();
1202 dealloc_nodeinfo();
1203 max_nodeid = 0;
1204
1205 dlm_local_count = 0;
1206 dlm_local_nodeid = 0;
1207
1208 for (i = 0; i < dlm_local_count; i++)
1209 kfree(dlm_local_addr[i]);
1210}
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms.c
index 07e0a122c32f..27970a58d29b 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms.c
@@ -36,30 +36,36 @@
36 * of high load. Also, this way, the sending thread can collect together 36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block. 37 * messages bound for one node and send them in one block.
38 * 38 *
39 * I don't see any problem with the recv thread executing the locking 39 * lowcomms will choose to use wither TCP or SCTP as its transport layer
40 * code on behalf of remote processes as the locking code is 40 * depending on the configuration variable 'protocol'. This should be set
41 * short, efficient and never waits. 41 * to 0 (default) for TCP or 1 for SCTP. It shouldbe configured using a
42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
43 * for the DLM to function.
42 * 44 *
43 */ 45 */
44 46
45
46#include <asm/ioctls.h> 47#include <asm/ioctls.h>
47#include <net/sock.h> 48#include <net/sock.h>
48#include <net/tcp.h> 49#include <net/tcp.h>
49#include <linux/pagemap.h> 50#include <linux/pagemap.h>
51#include <linux/idr.h>
52#include <linux/file.h>
53#include <linux/sctp.h>
54#include <net/sctp/user.h>
50 55
51#include "dlm_internal.h" 56#include "dlm_internal.h"
52#include "lowcomms.h" 57#include "lowcomms.h"
53#include "midcomms.h" 58#include "midcomms.h"
54#include "config.h" 59#include "config.h"
55 60
61#define NEEDED_RMEM (4*1024*1024)
62
56struct cbuf { 63struct cbuf {
57 unsigned int base; 64 unsigned int base;
58 unsigned int len; 65 unsigned int len;
59 unsigned int mask; 66 unsigned int mask;
60}; 67};
61 68
62#define NODE_INCREMENT 32
63static void cbuf_add(struct cbuf *cb, int n) 69static void cbuf_add(struct cbuf *cb, int n)
64{ 70{
65 cb->len += n; 71 cb->len += n;
@@ -88,28 +94,25 @@ static bool cbuf_empty(struct cbuf *cb)
88 return cb->len == 0; 94 return cb->len == 0;
89} 95}
90 96
91/* Maximum number of incoming messages to process before
92 doing a cond_resched()
93*/
94#define MAX_RX_MSG_COUNT 25
95
96struct connection { 97struct connection {
97 struct socket *sock; /* NULL if not connected */ 98 struct socket *sock; /* NULL if not connected */
98 uint32_t nodeid; /* So we know who we are in the list */ 99 uint32_t nodeid; /* So we know who we are in the list */
99 struct mutex sock_mutex; 100 struct mutex sock_mutex;
100 unsigned long flags; /* bit 1,2 = We are on the read/write lists */ 101 unsigned long flags;
101#define CF_READ_PENDING 1 102#define CF_READ_PENDING 1
102#define CF_WRITE_PENDING 2 103#define CF_WRITE_PENDING 2
103#define CF_CONNECT_PENDING 3 104#define CF_CONNECT_PENDING 3
104#define CF_IS_OTHERCON 4 105#define CF_INIT_PENDING 4
106#define CF_IS_OTHERCON 5
105 struct list_head writequeue; /* List of outgoing writequeue_entries */ 107 struct list_head writequeue; /* List of outgoing writequeue_entries */
106 struct list_head listenlist; /* List of allocated listening sockets */
107 spinlock_t writequeue_lock; 108 spinlock_t writequeue_lock;
108 int (*rx_action) (struct connection *); /* What to do when active */ 109 int (*rx_action) (struct connection *); /* What to do when active */
110 void (*connect_action) (struct connection *); /* What to do to connect */
109 struct page *rx_page; 111 struct page *rx_page;
110 struct cbuf cb; 112 struct cbuf cb;
111 int retries; 113 int retries;
112#define MAX_CONNECT_RETRIES 3 114#define MAX_CONNECT_RETRIES 3
115 int sctp_assoc;
113 struct connection *othercon; 116 struct connection *othercon;
114 struct work_struct rwork; /* Receive workqueue */ 117 struct work_struct rwork; /* Receive workqueue */
115 struct work_struct swork; /* Send workqueue */ 118 struct work_struct swork; /* Send workqueue */
@@ -127,68 +130,136 @@ struct writequeue_entry {
127 struct connection *con; 130 struct connection *con;
128}; 131};
129 132
130static struct sockaddr_storage dlm_local_addr; 133static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
134static int dlm_local_count;
131 135
132/* Work queues */ 136/* Work queues */
133static struct workqueue_struct *recv_workqueue; 137static struct workqueue_struct *recv_workqueue;
134static struct workqueue_struct *send_workqueue; 138static struct workqueue_struct *send_workqueue;
135 139
136/* An array of pointers to connections, indexed by NODEID */ 140static DEFINE_IDR(connections_idr);
137static struct connection **connections;
138static DECLARE_MUTEX(connections_lock); 141static DECLARE_MUTEX(connections_lock);
142static int max_nodeid;
139static struct kmem_cache *con_cache; 143static struct kmem_cache *con_cache;
140static int conn_array_size;
141 144
142static void process_recv_sockets(struct work_struct *work); 145static void process_recv_sockets(struct work_struct *work);
143static void process_send_sockets(struct work_struct *work); 146static void process_send_sockets(struct work_struct *work);
144 147
145static struct connection *nodeid2con(int nodeid, gfp_t allocation) 148/*
149 * If 'allocation' is zero then we don't attempt to create a new
150 * connection structure for this node.
151 */
152static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
146{ 153{
147 struct connection *con = NULL; 154 struct connection *con = NULL;
155 int r;
156 int n;
148 157
149 down(&connections_lock); 158 con = idr_find(&connections_idr, nodeid);
150 if (nodeid >= conn_array_size) { 159 if (con || !alloc)
151 int new_size = nodeid + NODE_INCREMENT; 160 return con;
152 struct connection **new_conns;
153 161
154 new_conns = kzalloc(sizeof(struct connection *) * 162 r = idr_pre_get(&connections_idr, alloc);
155 new_size, allocation); 163 if (!r)
156 if (!new_conns) 164 return NULL;
157 goto finish; 165
166 con = kmem_cache_zalloc(con_cache, alloc);
167 if (!con)
168 return NULL;
158 169
159 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); 170 r = idr_get_new_above(&connections_idr, con, nodeid, &n);
160 conn_array_size = new_size; 171 if (r) {
161 kfree(connections); 172 kmem_cache_free(con_cache, con);
162 connections = new_conns; 173 return NULL;
174 }
163 175
176 if (n != nodeid) {
177 idr_remove(&connections_idr, n);
178 kmem_cache_free(con_cache, con);
179 return NULL;
164 } 180 }
165 181
166 con = connections[nodeid]; 182 con->nodeid = nodeid;
167 if (con == NULL && allocation) { 183 mutex_init(&con->sock_mutex);
168 con = kmem_cache_zalloc(con_cache, allocation); 184 INIT_LIST_HEAD(&con->writequeue);
169 if (!con) 185 spin_lock_init(&con->writequeue_lock);
170 goto finish; 186 INIT_WORK(&con->swork, process_send_sockets);
187 INIT_WORK(&con->rwork, process_recv_sockets);
171 188
172 con->nodeid = nodeid; 189 /* Setup action pointers for child sockets */
173 mutex_init(&con->sock_mutex); 190 if (con->nodeid) {
174 INIT_LIST_HEAD(&con->writequeue); 191 struct connection *zerocon = idr_find(&connections_idr, 0);
175 spin_lock_init(&con->writequeue_lock);
176 INIT_WORK(&con->swork, process_send_sockets);
177 INIT_WORK(&con->rwork, process_recv_sockets);
178 192
179 connections[nodeid] = con; 193 con->connect_action = zerocon->connect_action;
194 if (!con->rx_action)
195 con->rx_action = zerocon->rx_action;
180 } 196 }
181 197
182finish: 198 if (nodeid > max_nodeid)
199 max_nodeid = nodeid;
200
201 return con;
202}
203
204static struct connection *nodeid2con(int nodeid, gfp_t allocation)
205{
206 struct connection *con;
207
208 down(&connections_lock);
209 con = __nodeid2con(nodeid, allocation);
183 up(&connections_lock); 210 up(&connections_lock);
211
184 return con; 212 return con;
185} 213}
186 214
215/* This is a bit drastic, but only called when things go wrong */
216static struct connection *assoc2con(int assoc_id)
217{
218 int i;
219 struct connection *con;
220
221 down(&connections_lock);
222 for (i=0; i<=max_nodeid; i++) {
223 con = __nodeid2con(i, 0);
224 if (con && con->sctp_assoc == assoc_id) {
225 up(&connections_lock);
226 return con;
227 }
228 }
229 up(&connections_lock);
230 return NULL;
231}
232
233static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
234{
235 struct sockaddr_storage addr;
236 int error;
237
238 if (!dlm_local_count)
239 return -1;
240
241 error = dlm_nodeid_to_addr(nodeid, &addr);
242 if (error)
243 return error;
244
245 if (dlm_local_addr[0]->ss_family == AF_INET) {
246 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
247 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
248 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
249 } else {
250 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
251 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
252 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
253 sizeof(in6->sin6_addr));
254 }
255
256 return 0;
257}
258
187/* Data available on socket or listen socket received a connect */ 259/* Data available on socket or listen socket received a connect */
188static void lowcomms_data_ready(struct sock *sk, int count_unused) 260static void lowcomms_data_ready(struct sock *sk, int count_unused)
189{ 261{
190 struct connection *con = sock2con(sk); 262 struct connection *con = sock2con(sk);
191
192 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 263 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
193 queue_work(recv_workqueue, &con->rwork); 264 queue_work(recv_workqueue, &con->rwork);
194} 265}
@@ -222,20 +293,21 @@ static int add_sock(struct socket *sock, struct connection *con)
222 con->sock->sk->sk_data_ready = lowcomms_data_ready; 293 con->sock->sk->sk_data_ready = lowcomms_data_ready;
223 con->sock->sk->sk_write_space = lowcomms_write_space; 294 con->sock->sk->sk_write_space = lowcomms_write_space;
224 con->sock->sk->sk_state_change = lowcomms_state_change; 295 con->sock->sk->sk_state_change = lowcomms_state_change;
225 296 con->sock->sk->sk_user_data = con;
226 return 0; 297 return 0;
227} 298}
228 299
229/* Add the port number to an IP6 or 4 sockaddr and return the address 300/* Add the port number to an IPv6 or 4 sockaddr and return the address
230 length */ 301 length */
231static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 302static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
232 int *addr_len) 303 int *addr_len)
233{ 304{
234 saddr->ss_family = dlm_local_addr.ss_family; 305 saddr->ss_family = dlm_local_addr[0]->ss_family;
235 if (saddr->ss_family == AF_INET) { 306 if (saddr->ss_family == AF_INET) {
236 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 307 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
237 in4_addr->sin_port = cpu_to_be16(port); 308 in4_addr->sin_port = cpu_to_be16(port);
238 *addr_len = sizeof(struct sockaddr_in); 309 *addr_len = sizeof(struct sockaddr_in);
310 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
239 } else { 311 } else {
240 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 312 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
241 in6_addr->sin6_port = cpu_to_be16(port); 313 in6_addr->sin6_port = cpu_to_be16(port);
@@ -264,6 +336,193 @@ static void close_connection(struct connection *con, bool and_other)
264 mutex_unlock(&con->sock_mutex); 336 mutex_unlock(&con->sock_mutex);
265} 337}
266 338
339/* We only send shutdown messages to nodes that are not part of the cluster */
340static void sctp_send_shutdown(sctp_assoc_t associd)
341{
342 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
343 struct msghdr outmessage;
344 struct cmsghdr *cmsg;
345 struct sctp_sndrcvinfo *sinfo;
346 int ret;
347 struct connection *con;
348
349 con = nodeid2con(0,0);
350 BUG_ON(con == NULL);
351
352 outmessage.msg_name = NULL;
353 outmessage.msg_namelen = 0;
354 outmessage.msg_control = outcmsg;
355 outmessage.msg_controllen = sizeof(outcmsg);
356 outmessage.msg_flags = MSG_EOR;
357
358 cmsg = CMSG_FIRSTHDR(&outmessage);
359 cmsg->cmsg_level = IPPROTO_SCTP;
360 cmsg->cmsg_type = SCTP_SNDRCV;
361 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
362 outmessage.msg_controllen = cmsg->cmsg_len;
363 sinfo = CMSG_DATA(cmsg);
364 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
365
366 sinfo->sinfo_flags |= MSG_EOF;
367 sinfo->sinfo_assoc_id = associd;
368
369 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
370
371 if (ret != 0)
372 log_print("send EOF to node failed: %d", ret);
373}
374
375/* INIT failed but we don't know which node...
376 restart INIT on all pending nodes */
377static void sctp_init_failed(void)
378{
379 int i;
380 struct connection *con;
381
382 down(&connections_lock);
383 for (i=1; i<=max_nodeid; i++) {
384 con = __nodeid2con(i, 0);
385 if (!con)
386 continue;
387 con->sctp_assoc = 0;
388 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
389 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
390 queue_work(send_workqueue, &con->swork);
391 }
392 }
393 }
394 up(&connections_lock);
395}
396
397/* Something happened to an association */
398static void process_sctp_notification(struct connection *con,
399 struct msghdr *msg, char *buf)
400{
401 union sctp_notification *sn = (union sctp_notification *)buf;
402
403 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
404 switch (sn->sn_assoc_change.sac_state) {
405
406 case SCTP_COMM_UP:
407 case SCTP_RESTART:
408 {
409 /* Check that the new node is in the lockspace */
410 struct sctp_prim prim;
411 int nodeid;
412 int prim_len, ret;
413 int addr_len;
414 struct connection *new_con;
415 struct file *file;
416 sctp_peeloff_arg_t parg;
417 int parglen = sizeof(parg);
418
419 /*
420 * We get this before any data for an association.
421 * We verify that the node is in the cluster and
422 * then peel off a socket for it.
423 */
424 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
425 log_print("COMM_UP for invalid assoc ID %d",
426 (int)sn->sn_assoc_change.sac_assoc_id);
427 sctp_init_failed();
428 return;
429 }
430 memset(&prim, 0, sizeof(struct sctp_prim));
431 prim_len = sizeof(struct sctp_prim);
432 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
433
434 ret = kernel_getsockopt(con->sock,
435 IPPROTO_SCTP,
436 SCTP_PRIMARY_ADDR,
437 (char*)&prim,
438 &prim_len);
439 if (ret < 0) {
440 log_print("getsockopt/sctp_primary_addr on "
441 "new assoc %d failed : %d",
442 (int)sn->sn_assoc_change.sac_assoc_id,
443 ret);
444
445 /* Retry INIT later */
446 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
447 if (new_con)
448 clear_bit(CF_CONNECT_PENDING, &con->flags);
449 return;
450 }
451 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
452 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
453 int i;
454 unsigned char *b=(unsigned char *)&prim.ssp_addr;
455 log_print("reject connect from unknown addr");
456 for (i=0; i<sizeof(struct sockaddr_storage);i++)
457 printk("%02x ", b[i]);
458 printk("\n");
459 sctp_send_shutdown(prim.ssp_assoc_id);
460 return;
461 }
462
463 new_con = nodeid2con(nodeid, GFP_KERNEL);
464 if (!new_con)
465 return;
466
467 /* Peel off a new sock */
468 parg.associd = sn->sn_assoc_change.sac_assoc_id;
469 ret = kernel_getsockopt(con->sock, IPPROTO_SCTP,
470 SCTP_SOCKOPT_PEELOFF,
471 (void *)&parg, &parglen);
472 if (ret) {
473 log_print("Can't peel off a socket for "
474 "connection %d to node %d: err=%d\n",
475 parg.associd, nodeid, ret);
476 }
477 file = fget(parg.sd);
478 new_con->sock = SOCKET_I(file->f_dentry->d_inode);
479 add_sock(new_con->sock, new_con);
480 fput(file);
481 put_unused_fd(parg.sd);
482
483 log_print("got new/restarted association %d nodeid %d",
484 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
485
486 /* Send any pending writes */
487 clear_bit(CF_CONNECT_PENDING, &new_con->flags);
488 clear_bit(CF_INIT_PENDING, &con->flags);
489 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
490 queue_work(send_workqueue, &new_con->swork);
491 }
492 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
493 queue_work(recv_workqueue, &new_con->rwork);
494 }
495 break;
496
497 case SCTP_COMM_LOST:
498 case SCTP_SHUTDOWN_COMP:
499 {
500 con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
501 if (con) {
502 con->sctp_assoc = 0;
503 }
504 }
505 break;
506
507 /* We don't know which INIT failed, so clear the PENDING flags
508 * on them all. if assoc_id is zero then it will then try
509 * again */
510
511 case SCTP_CANT_STR_ASSOC:
512 {
513 log_print("Can't start SCTP association - retrying");
514 sctp_init_failed();
515 }
516 break;
517
518 default:
519 log_print("unexpected SCTP assoc change id=%d state=%d",
520 (int)sn->sn_assoc_change.sac_assoc_id,
521 sn->sn_assoc_change.sac_state);
522 }
523 }
524}
525
267/* Data received from remote end */ 526/* Data received from remote end */
268static int receive_from_sock(struct connection *con) 527static int receive_from_sock(struct connection *con)
269{ 528{
@@ -274,6 +533,7 @@ static int receive_from_sock(struct connection *con)
274 int r; 533 int r;
275 int call_again_soon = 0; 534 int call_again_soon = 0;
276 int nvec; 535 int nvec;
536 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
277 537
278 mutex_lock(&con->sock_mutex); 538 mutex_lock(&con->sock_mutex);
279 539
@@ -293,12 +553,18 @@ static int receive_from_sock(struct connection *con)
293 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 553 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
294 } 554 }
295 555
556 /* Only SCTP needs these really */
557 memset(&incmsg, 0, sizeof(incmsg));
558 msg.msg_control = incmsg;
559 msg.msg_controllen = sizeof(incmsg);
560
296 /* 561 /*
297 * iov[0] is the bit of the circular buffer between the current end 562 * iov[0] is the bit of the circular buffer between the current end
298 * point (cb.base + cb.len) and the end of the buffer. 563 * point (cb.base + cb.len) and the end of the buffer.
299 */ 564 */
300 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); 565 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
301 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); 566 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
567 iov[1].iov_len = 0;
302 nvec = 1; 568 nvec = 1;
303 569
304 /* 570 /*
@@ -315,11 +581,20 @@ static int receive_from_sock(struct connection *con)
315 581
316 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, 582 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
317 MSG_DONTWAIT | MSG_NOSIGNAL); 583 MSG_DONTWAIT | MSG_NOSIGNAL);
318
319 if (ret <= 0) 584 if (ret <= 0)
320 goto out_close; 585 goto out_close;
321 if (ret == -EAGAIN) 586
322 goto out_resched; 587 /* Process SCTP notifications */
588 if (msg.msg_flags & MSG_NOTIFICATION) {
589 msg.msg_control = incmsg;
590 msg.msg_controllen = sizeof(incmsg);
591
592 process_sctp_notification(con, &msg,
593 page_address(con->rx_page) + con->cb.base);
594 mutex_unlock(&con->sock_mutex);
595 return 0;
596 }
597 BUG_ON(con->nodeid == 0);
323 598
324 if (ret == len) 599 if (ret == len)
325 call_again_soon = 1; 600 call_again_soon = 1;
@@ -329,10 +604,10 @@ static int receive_from_sock(struct connection *con)
329 con->cb.base, con->cb.len, 604 con->cb.base, con->cb.len,
330 PAGE_CACHE_SIZE); 605 PAGE_CACHE_SIZE);
331 if (ret == -EBADMSG) { 606 if (ret == -EBADMSG) {
332 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, " 607 log_print("lowcomms: addr=%p, base=%u, len=%u, "
333 "iov_len=%u, iov_base[0]=%p, read=%d\n", 608 "iov_len=%u, iov_base[0]=%p, read=%d",
334 page_address(con->rx_page), con->cb.base, con->cb.len, 609 page_address(con->rx_page), con->cb.base, con->cb.len,
335 len, iov[0].iov_base, r); 610 len, iov[0].iov_base, r);
336 } 611 }
337 if (ret < 0) 612 if (ret < 0)
338 goto out_close; 613 goto out_close;
@@ -368,7 +643,7 @@ out_close:
368} 643}
369 644
370/* Listening socket is busy, accept a connection */ 645/* Listening socket is busy, accept a connection */
371static int accept_from_sock(struct connection *con) 646static int tcp_accept_from_sock(struct connection *con)
372{ 647{
373 int result; 648 int result;
374 struct sockaddr_storage peeraddr; 649 struct sockaddr_storage peeraddr;
@@ -379,7 +654,7 @@ static int accept_from_sock(struct connection *con)
379 struct connection *addcon; 654 struct connection *addcon;
380 655
381 memset(&peeraddr, 0, sizeof(peeraddr)); 656 memset(&peeraddr, 0, sizeof(peeraddr));
382 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, 657 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
383 IPPROTO_TCP, &newsock); 658 IPPROTO_TCP, &newsock);
384 if (result < 0) 659 if (result < 0)
385 return -ENOMEM; 660 return -ENOMEM;
@@ -408,7 +683,7 @@ static int accept_from_sock(struct connection *con)
408 /* Get the new node's NODEID */ 683 /* Get the new node's NODEID */
409 make_sockaddr(&peeraddr, 0, &len); 684 make_sockaddr(&peeraddr, 0, &len);
410 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 685 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
411 printk("dlm: connect from non cluster node\n"); 686 log_print("connect from non cluster node");
412 sock_release(newsock); 687 sock_release(newsock);
413 mutex_unlock(&con->sock_mutex); 688 mutex_unlock(&con->sock_mutex);
414 return -1; 689 return -1;
@@ -419,7 +694,6 @@ static int accept_from_sock(struct connection *con)
419 /* Check to see if we already have a connection to this node. This 694 /* Check to see if we already have a connection to this node. This
420 * could happen if the two nodes initiate a connection at roughly 695 * could happen if the two nodes initiate a connection at roughly
421 * the same time and the connections cross on the wire. 696 * the same time and the connections cross on the wire.
422 * TEMPORARY FIX:
423 * In this case we store the incoming one in "othercon" 697 * In this case we store the incoming one in "othercon"
424 */ 698 */
425 newcon = nodeid2con(nodeid, GFP_KERNEL); 699 newcon = nodeid2con(nodeid, GFP_KERNEL);
@@ -434,7 +708,7 @@ static int accept_from_sock(struct connection *con)
434 if (!othercon) { 708 if (!othercon) {
435 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); 709 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
436 if (!othercon) { 710 if (!othercon) {
437 printk("dlm: failed to allocate incoming socket\n"); 711 log_print("failed to allocate incoming socket");
438 mutex_unlock(&newcon->sock_mutex); 712 mutex_unlock(&newcon->sock_mutex);
439 result = -ENOMEM; 713 result = -ENOMEM;
440 goto accept_err; 714 goto accept_err;
@@ -477,12 +751,107 @@ accept_err:
477 sock_release(newsock); 751 sock_release(newsock);
478 752
479 if (result != -EAGAIN) 753 if (result != -EAGAIN)
480 printk("dlm: error accepting connection from node: %d\n", result); 754 log_print("error accepting connection from node: %d", result);
481 return result; 755 return result;
482} 756}
483 757
758static void free_entry(struct writequeue_entry *e)
759{
760 __free_page(e->page);
761 kfree(e);
762}
763
764/* Initiate an SCTP association.
765 This is a special case of send_to_sock() in that we don't yet have a
766 peeled-off socket for this association, so we use the listening socket
767 and add the primary IP address of the remote node.
768 */
769static void sctp_init_assoc(struct connection *con)
770{
771 struct sockaddr_storage rem_addr;
772 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
773 struct msghdr outmessage;
774 struct cmsghdr *cmsg;
775 struct sctp_sndrcvinfo *sinfo;
776 struct connection *base_con;
777 struct writequeue_entry *e;
778 int len, offset;
779 int ret;
780 int addrlen;
781 struct kvec iov[1];
782
783 if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
784 return;
785
786 if (con->retries++ > MAX_CONNECT_RETRIES)
787 return;
788
789 log_print("Initiating association with node %d", con->nodeid);
790
791 if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) {
792 log_print("no address for nodeid %d", con->nodeid);
793 return;
794 }
795 base_con = nodeid2con(0, 0);
796 BUG_ON(base_con == NULL);
797
798 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
799
800 outmessage.msg_name = &rem_addr;
801 outmessage.msg_namelen = addrlen;
802 outmessage.msg_control = outcmsg;
803 outmessage.msg_controllen = sizeof(outcmsg);
804 outmessage.msg_flags = MSG_EOR;
805
806 spin_lock(&con->writequeue_lock);
807 e = list_entry(con->writequeue.next, struct writequeue_entry,
808 list);
809
810 BUG_ON((struct list_head *) e == &con->writequeue);
811
812 len = e->len;
813 offset = e->offset;
814 spin_unlock(&con->writequeue_lock);
815 kmap(e->page);
816
817 /* Send the first block off the write queue */
818 iov[0].iov_base = page_address(e->page)+offset;
819 iov[0].iov_len = len;
820
821 cmsg = CMSG_FIRSTHDR(&outmessage);
822 cmsg->cmsg_level = IPPROTO_SCTP;
823 cmsg->cmsg_type = SCTP_SNDRCV;
824 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
825 sinfo = CMSG_DATA(cmsg);
826 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
827 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
828 outmessage.msg_controllen = cmsg->cmsg_len;
829
830 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
831 if (ret < 0) {
832 log_print("Send first packet to node %d failed: %d",
833 con->nodeid, ret);
834
835 /* Try again later */
836 clear_bit(CF_CONNECT_PENDING, &con->flags);
837 clear_bit(CF_INIT_PENDING, &con->flags);
838 }
839 else {
840 spin_lock(&con->writequeue_lock);
841 e->offset += ret;
842 e->len -= ret;
843
844 if (e->len == 0 && e->users == 0) {
845 list_del(&e->list);
846 kunmap(e->page);
847 free_entry(e);
848 }
849 spin_unlock(&con->writequeue_lock);
850 }
851}
852
484/* Connect a new socket to its peer */ 853/* Connect a new socket to its peer */
485static void connect_to_sock(struct connection *con) 854static void tcp_connect_to_sock(struct connection *con)
486{ 855{
487 int result = -EHOSTUNREACH; 856 int result = -EHOSTUNREACH;
488 struct sockaddr_storage saddr; 857 struct sockaddr_storage saddr;
@@ -505,7 +874,7 @@ static void connect_to_sock(struct connection *con)
505 } 874 }
506 875
507 /* Create a socket to communicate with */ 876 /* Create a socket to communicate with */
508 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, 877 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
509 IPPROTO_TCP, &sock); 878 IPPROTO_TCP, &sock);
510 if (result < 0) 879 if (result < 0)
511 goto out_err; 880 goto out_err;
@@ -516,11 +885,11 @@ static void connect_to_sock(struct connection *con)
516 885
517 sock->sk->sk_user_data = con; 886 sock->sk->sk_user_data = con;
518 con->rx_action = receive_from_sock; 887 con->rx_action = receive_from_sock;
888 con->connect_action = tcp_connect_to_sock;
889 add_sock(sock, con);
519 890
520 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 891 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
521 892
522 add_sock(sock, con);
523
524 log_print("connecting to %d", con->nodeid); 893 log_print("connecting to %d", con->nodeid);
525 result = 894 result =
526 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 895 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
@@ -550,64 +919,57 @@ out:
550 return; 919 return;
551} 920}
552 921
553static struct socket *create_listen_sock(struct connection *con, 922static struct socket *tcp_create_listen_sock(struct connection *con,
554 struct sockaddr_storage *saddr) 923 struct sockaddr_storage *saddr)
555{ 924{
556 struct socket *sock = NULL; 925 struct socket *sock = NULL;
557 mm_segment_t fs;
558 int result = 0; 926 int result = 0;
559 int one = 1; 927 int one = 1;
560 int addr_len; 928 int addr_len;
561 929
562 if (dlm_local_addr.ss_family == AF_INET) 930 if (dlm_local_addr[0]->ss_family == AF_INET)
563 addr_len = sizeof(struct sockaddr_in); 931 addr_len = sizeof(struct sockaddr_in);
564 else 932 else
565 addr_len = sizeof(struct sockaddr_in6); 933 addr_len = sizeof(struct sockaddr_in6);
566 934
567 /* Create a socket to communicate with */ 935 /* Create a socket to communicate with */
568 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); 936 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
937 IPPROTO_TCP, &sock);
569 if (result < 0) { 938 if (result < 0) {
570 printk("dlm: Can't create listening comms socket\n"); 939 log_print("Can't create listening comms socket");
571 goto create_out; 940 goto create_out;
572 } 941 }
573 942
574 fs = get_fs(); 943 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
575 set_fs(get_ds()); 944 (char *)&one, sizeof(one));
576 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 945
577 (char *)&one, sizeof(one));
578 set_fs(fs);
579 if (result < 0) { 946 if (result < 0) {
580 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n", 947 log_print("Failed to set SO_REUSEADDR on socket: %d", result);
581 result);
582 } 948 }
583 sock->sk->sk_user_data = con; 949 sock->sk->sk_user_data = con;
584 con->rx_action = accept_from_sock; 950 con->rx_action = tcp_accept_from_sock;
951 con->connect_action = tcp_connect_to_sock;
585 con->sock = sock; 952 con->sock = sock;
586 953
587 /* Bind to our port */ 954 /* Bind to our port */
588 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 955 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
589 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 956 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
590 if (result < 0) { 957 if (result < 0) {
591 printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port); 958 log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
592 sock_release(sock); 959 sock_release(sock);
593 sock = NULL; 960 sock = NULL;
594 con->sock = NULL; 961 con->sock = NULL;
595 goto create_out; 962 goto create_out;
596 } 963 }
597 964 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
598 fs = get_fs();
599 set_fs(get_ds());
600
601 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
602 (char *)&one, sizeof(one)); 965 (char *)&one, sizeof(one));
603 set_fs(fs);
604 if (result < 0) { 966 if (result < 0) {
605 printk("dlm: Set keepalive failed: %d\n", result); 967 log_print("Set keepalive failed: %d", result);
606 } 968 }
607 969
608 result = sock->ops->listen(sock, 5); 970 result = sock->ops->listen(sock, 5);
609 if (result < 0) { 971 if (result < 0) {
610 printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port); 972 log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
611 sock_release(sock); 973 sock_release(sock);
612 sock = NULL; 974 sock = NULL;
613 goto create_out; 975 goto create_out;
@@ -617,18 +979,146 @@ create_out:
617 return sock; 979 return sock;
618} 980}
619 981
982/* Get local addresses */
983static void init_local(void)
984{
985 struct sockaddr_storage sas, *addr;
986 int i;
987
988 dlm_local_count = 0;
989 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
990 if (dlm_our_addr(&sas, i))
991 break;
992
993 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
994 if (!addr)
995 break;
996 memcpy(addr, &sas, sizeof(*addr));
997 dlm_local_addr[dlm_local_count++] = addr;
998 }
999}
1000
1001/* Bind to an IP address. SCTP allows multiple address so it can do
1002 multi-homing */
1003static int add_sctp_bind_addr(struct connection *sctp_con,
1004 struct sockaddr_storage *addr,
1005 int addr_len, int num)
1006{
1007 int result = 0;
1008
1009 if (num == 1)
1010 result = kernel_bind(sctp_con->sock,
1011 (struct sockaddr *) addr,
1012 addr_len);
1013 else
1014 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1015 SCTP_SOCKOPT_BINDX_ADD,
1016 (char *)addr, addr_len);
1017
1018 if (result < 0)
1019 log_print("Can't bind to port %d addr number %d",
1020 dlm_config.ci_tcp_port, num);
1021
1022 return result;
1023}
620 1024
621/* Listen on all interfaces */ 1025/* Initialise SCTP socket and bind to all interfaces */
622static int listen_for_all(void) 1026static int sctp_listen_for_all(void)
1027{
1028 struct socket *sock = NULL;
1029 struct sockaddr_storage localaddr;
1030 struct sctp_event_subscribe subscribe;
1031 int result = -EINVAL, num = 1, i, addr_len;
1032 struct connection *con = nodeid2con(0, GFP_KERNEL);
1033 int bufsize = NEEDED_RMEM;
1034
1035 if (!con)
1036 return -ENOMEM;
1037
1038 log_print("Using SCTP for communications");
1039
1040 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1041 IPPROTO_SCTP, &sock);
1042 if (result < 0) {
1043 log_print("Can't create comms socket, check SCTP is loaded");
1044 goto out;
1045 }
1046
1047 /* Listen for events */
1048 memset(&subscribe, 0, sizeof(subscribe));
1049 subscribe.sctp_data_io_event = 1;
1050 subscribe.sctp_association_event = 1;
1051 subscribe.sctp_send_failure_event = 1;
1052 subscribe.sctp_shutdown_event = 1;
1053 subscribe.sctp_partial_delivery_event = 1;
1054
1055 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
1056 (char *)&bufsize, sizeof(bufsize));
1057 if (result)
1058 log_print("Error increasing buffer space on socket %d", result);
1059
1060 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1061 (char *)&subscribe, sizeof(subscribe));
1062 if (result < 0) {
1063 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1064 result);
1065 goto create_delsock;
1066 }
1067
1068 /* Init con struct */
1069 sock->sk->sk_user_data = con;
1070 con->sock = sock;
1071 con->sock->sk->sk_data_ready = lowcomms_data_ready;
1072 con->rx_action = receive_from_sock;
1073 con->connect_action = sctp_init_assoc;
1074
1075 /* Bind to all interfaces. */
1076 for (i = 0; i < dlm_local_count; i++) {
1077 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1078 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1079
1080 result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1081 if (result)
1082 goto create_delsock;
1083 ++num;
1084 }
1085
1086 result = sock->ops->listen(sock, 5);
1087 if (result < 0) {
1088 log_print("Can't set socket listening");
1089 goto create_delsock;
1090 }
1091
1092 return 0;
1093
1094create_delsock:
1095 sock_release(sock);
1096 con->sock = NULL;
1097out:
1098 return result;
1099}
1100
1101static int tcp_listen_for_all(void)
623{ 1102{
624 struct socket *sock = NULL; 1103 struct socket *sock = NULL;
625 struct connection *con = nodeid2con(0, GFP_KERNEL); 1104 struct connection *con = nodeid2con(0, GFP_KERNEL);
626 int result = -EINVAL; 1105 int result = -EINVAL;
627 1106
1107 if (!con)
1108 return -ENOMEM;
1109
628 /* We don't support multi-homed hosts */ 1110 /* We don't support multi-homed hosts */
1111 if (dlm_local_addr[1] != NULL) {
1112 log_print("TCP protocol can't handle multi-homed hosts, "
1113 "try SCTP");
1114 return -EINVAL;
1115 }
1116
1117 log_print("Using TCP for communications");
1118
629 set_bit(CF_IS_OTHERCON, &con->flags); 1119 set_bit(CF_IS_OTHERCON, &con->flags);
630 1120
631 sock = create_listen_sock(con, &dlm_local_addr); 1121 sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
632 if (sock) { 1122 if (sock) {
633 add_sock(sock, con); 1123 add_sock(sock, con);
634 result = 0; 1124 result = 0;
@@ -666,8 +1156,7 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
666 return entry; 1156 return entry;
667} 1157}
668 1158
669void *dlm_lowcomms_get_buffer(int nodeid, int len, 1159void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
670 gfp_t allocation, char **ppc)
671{ 1160{
672 struct connection *con; 1161 struct connection *con;
673 struct writequeue_entry *e; 1162 struct writequeue_entry *e;
@@ -735,12 +1224,6 @@ out:
735 return; 1224 return;
736} 1225}
737 1226
738static void free_entry(struct writequeue_entry *e)
739{
740 __free_page(e->page);
741 kfree(e);
742}
743
744/* Send a message */ 1227/* Send a message */
745static void send_to_sock(struct connection *con) 1228static void send_to_sock(struct connection *con)
746{ 1229{
@@ -777,8 +1260,7 @@ static void send_to_sock(struct connection *con)
777 goto out; 1260 goto out;
778 if (ret <= 0) 1261 if (ret <= 0)
779 goto send_error; 1262 goto send_error;
780 } 1263 } else {
781 else {
782 /* Don't starve people filling buffers */ 1264 /* Don't starve people filling buffers */
783 cond_resched(); 1265 cond_resched();
784 } 1266 }
@@ -807,7 +1289,8 @@ send_error:
807 1289
808out_connect: 1290out_connect:
809 mutex_unlock(&con->sock_mutex); 1291 mutex_unlock(&con->sock_mutex);
810 connect_to_sock(con); 1292 if (!test_bit(CF_INIT_PENDING, &con->flags))
1293 lowcomms_connect_sock(con);
811 return; 1294 return;
812} 1295}
813 1296
@@ -832,9 +1315,6 @@ int dlm_lowcomms_close(int nodeid)
832{ 1315{
833 struct connection *con; 1316 struct connection *con;
834 1317
835 if (!connections)
836 goto out;
837
838 log_print("closing connection to node %d", nodeid); 1318 log_print("closing connection to node %d", nodeid);
839 con = nodeid2con(nodeid, 0); 1319 con = nodeid2con(nodeid, 0);
840 if (con) { 1320 if (con) {
@@ -842,12 +1322,9 @@ int dlm_lowcomms_close(int nodeid)
842 close_connection(con, true); 1322 close_connection(con, true);
843 } 1323 }
844 return 0; 1324 return 0;
845
846out:
847 return -1;
848} 1325}
849 1326
850/* Look for activity on active sockets */ 1327/* Receive workqueue function */
851static void process_recv_sockets(struct work_struct *work) 1328static void process_recv_sockets(struct work_struct *work)
852{ 1329{
853 struct connection *con = container_of(work, struct connection, rwork); 1330 struct connection *con = container_of(work, struct connection, rwork);
@@ -859,15 +1336,14 @@ static void process_recv_sockets(struct work_struct *work)
859 } while (!err); 1336 } while (!err);
860} 1337}
861 1338
862 1339/* Send workqueue function */
863static void process_send_sockets(struct work_struct *work) 1340static void process_send_sockets(struct work_struct *work)
864{ 1341{
865 struct connection *con = container_of(work, struct connection, swork); 1342 struct connection *con = container_of(work, struct connection, swork);
866 1343
867 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 1344 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
868 connect_to_sock(con); 1345 con->connect_action(con);
869 } 1346 }
870
871 clear_bit(CF_WRITE_PENDING, &con->flags); 1347 clear_bit(CF_WRITE_PENDING, &con->flags);
872 send_to_sock(con); 1348 send_to_sock(con);
873} 1349}
@@ -878,8 +1354,8 @@ static void clean_writequeues(void)
878{ 1354{
879 int nodeid; 1355 int nodeid;
880 1356
881 for (nodeid = 1; nodeid < conn_array_size; nodeid++) { 1357 for (nodeid = 1; nodeid <= max_nodeid; nodeid++) {
882 struct connection *con = nodeid2con(nodeid, 0); 1358 struct connection *con = __nodeid2con(nodeid, 0);
883 1359
884 if (con) 1360 if (con)
885 clean_one_writequeue(con); 1361 clean_one_writequeue(con);
@@ -916,64 +1392,67 @@ static int work_start(void)
916void dlm_lowcomms_stop(void) 1392void dlm_lowcomms_stop(void)
917{ 1393{
918 int i; 1394 int i;
1395 struct connection *con;
919 1396
920 /* Set all the flags to prevent any 1397 /* Set all the flags to prevent any
921 socket activity. 1398 socket activity.
922 */ 1399 */
923 for (i = 0; i < conn_array_size; i++) { 1400 down(&connections_lock);
924 if (connections[i]) 1401 for (i = 0; i <= max_nodeid; i++) {
925 connections[i]->flags |= 0xFF; 1402 con = __nodeid2con(i, 0);
1403 if (con)
1404 con->flags |= 0xFF;
926 } 1405 }
1406 up(&connections_lock);
927 1407
928 work_stop(); 1408 work_stop();
1409
1410 down(&connections_lock);
929 clean_writequeues(); 1411 clean_writequeues();
930 1412
931 for (i = 0; i < conn_array_size; i++) { 1413 for (i = 0; i <= max_nodeid; i++) {
932 if (connections[i]) { 1414 con = __nodeid2con(i, 0);
933 close_connection(connections[i], true); 1415 if (con) {
934 if (connections[i]->othercon) 1416 close_connection(con, true);
935 kmem_cache_free(con_cache, connections[i]->othercon); 1417 if (con->othercon)
936 kmem_cache_free(con_cache, connections[i]); 1418 kmem_cache_free(con_cache, con->othercon);
1419 kmem_cache_free(con_cache, con);
937 } 1420 }
938 } 1421 }
939 1422 max_nodeid = 0;
940 kfree(connections); 1423 up(&connections_lock);
941 connections = NULL;
942
943 kmem_cache_destroy(con_cache); 1424 kmem_cache_destroy(con_cache);
1425 idr_init(&connections_idr);
944} 1426}
945 1427
946/* This is quite likely to sleep... */
947int dlm_lowcomms_start(void) 1428int dlm_lowcomms_start(void)
948{ 1429{
949 int error = 0; 1430 int error = -EINVAL;
950 1431 struct connection *con;
951 error = -ENOMEM;
952 connections = kzalloc(sizeof(struct connection *) *
953 NODE_INCREMENT, GFP_KERNEL);
954 if (!connections)
955 goto out;
956
957 conn_array_size = NODE_INCREMENT;
958 1432
959 if (dlm_our_addr(&dlm_local_addr, 0)) { 1433 init_local();
1434 if (!dlm_local_count) {
1435 error = -ENOTCONN;
960 log_print("no local IP address has been set"); 1436 log_print("no local IP address has been set");
961 goto fail_free_conn; 1437 goto out;
962 }
963 if (!dlm_our_addr(&dlm_local_addr, 1)) {
964 log_print("This dlm comms module does not support multi-homed clustering");
965 goto fail_free_conn;
966 } 1438 }
967 1439
1440 error = -ENOMEM;
968 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1441 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
969 __alignof__(struct connection), 0, 1442 __alignof__(struct connection), 0,
970 NULL, NULL); 1443 NULL, NULL);
971 if (!con_cache) 1444 if (!con_cache)
972 goto fail_free_conn; 1445 goto out;
973 1446
1447 /* Set some sysctl minima */
1448 if (sysctl_rmem_max < NEEDED_RMEM)
1449 sysctl_rmem_max = NEEDED_RMEM;
974 1450
975 /* Start listening */ 1451 /* Start listening */
976 error = listen_for_all(); 1452 if (dlm_config.ci_protocol == 0)
1453 error = tcp_listen_for_all();
1454 else
1455 error = sctp_listen_for_all();
977 if (error) 1456 if (error)
978 goto fail_unlisten; 1457 goto fail_unlisten;
979 1458
@@ -984,24 +1463,13 @@ int dlm_lowcomms_start(void)
984 return 0; 1463 return 0;
985 1464
986fail_unlisten: 1465fail_unlisten:
987 close_connection(connections[0], false); 1466 con = nodeid2con(0,0);
988 kmem_cache_free(con_cache, connections[0]); 1467 if (con) {
1468 close_connection(con, false);
1469 kmem_cache_free(con_cache, con);
1470 }
989 kmem_cache_destroy(con_cache); 1471 kmem_cache_destroy(con_cache);
990 1472
991fail_free_conn:
992 kfree(connections);
993
994out: 1473out:
995 return error; 1474 return error;
996} 1475}
997
998/*
999 * Overrides for Emacs so that we follow Linus's tabbing style.
1000 * Emacs will notice this stuff at the end of the file and automatically
1001 * adjust the settings for this buffer only. This must remain at the end
1002 * of the file.
1003 * ---------------------------------------------------------------------------
1004 * Local variables:
1005 * c-file-style: "linux"
1006 * End:
1007 */
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 3870150b83a4..b0201ec325a7 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -1,5 +1,5 @@
1/* 1/*
2 * Copyright (C) 2006 Red Hat, Inc. All rights reserved. 2 * Copyright (C) 2006-2007 Red Hat, Inc. All rights reserved.
3 * 3 *
4 * This copyrighted material is made available to anyone wishing to use, 4 * This copyrighted material is made available to anyone wishing to use,
5 * modify, copy, or redistribute it subject to the terms and conditions 5 * modify, copy, or redistribute it subject to the terms and conditions
@@ -56,6 +56,7 @@ struct dlm_write_request32 {
56 union { 56 union {
57 struct dlm_lock_params32 lock; 57 struct dlm_lock_params32 lock;
58 struct dlm_lspace_params lspace; 58 struct dlm_lspace_params lspace;
59 struct dlm_purge_params purge;
59 } i; 60 } i;
60}; 61};
61 62
@@ -92,6 +93,9 @@ static void compat_input(struct dlm_write_request *kb,
92 kb->i.lspace.flags = kb32->i.lspace.flags; 93 kb->i.lspace.flags = kb32->i.lspace.flags;
93 kb->i.lspace.minor = kb32->i.lspace.minor; 94 kb->i.lspace.minor = kb32->i.lspace.minor;
94 strcpy(kb->i.lspace.name, kb32->i.lspace.name); 95 strcpy(kb->i.lspace.name, kb32->i.lspace.name);
96 } else if (kb->cmd == DLM_USER_PURGE) {
97 kb->i.purge.nodeid = kb32->i.purge.nodeid;
98 kb->i.purge.pid = kb32->i.purge.pid;
95 } else { 99 } else {
96 kb->i.lock.mode = kb32->i.lock.mode; 100 kb->i.lock.mode = kb32->i.lock.mode;
97 kb->i.lock.namelen = kb32->i.lock.namelen; 101 kb->i.lock.namelen = kb32->i.lock.namelen;
@@ -111,8 +115,6 @@ static void compat_input(struct dlm_write_request *kb,
111static void compat_output(struct dlm_lock_result *res, 115static void compat_output(struct dlm_lock_result *res,
112 struct dlm_lock_result32 *res32) 116 struct dlm_lock_result32 *res32)
113{ 117{
114 res32->length = res->length - (sizeof(struct dlm_lock_result) -
115 sizeof(struct dlm_lock_result32));
116 res32->user_astaddr = (__u32)(long)res->user_astaddr; 118 res32->user_astaddr = (__u32)(long)res->user_astaddr;
117 res32->user_astparam = (__u32)(long)res->user_astparam; 119 res32->user_astparam = (__u32)(long)res->user_astparam;
118 res32->user_lksb = (__u32)(long)res->user_lksb; 120 res32->user_lksb = (__u32)(long)res->user_lksb;
@@ -128,35 +130,30 @@ static void compat_output(struct dlm_lock_result *res,
128} 130}
129#endif 131#endif
130 132
133/* we could possibly check if the cancel of an orphan has resulted in the lkb
134 being removed and then remove that lkb from the orphans list and free it */
131 135
132void dlm_user_add_ast(struct dlm_lkb *lkb, int type) 136void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
133{ 137{
134 struct dlm_ls *ls; 138 struct dlm_ls *ls;
135 struct dlm_user_args *ua; 139 struct dlm_user_args *ua;
136 struct dlm_user_proc *proc; 140 struct dlm_user_proc *proc;
137 int remove_ownqueue = 0; 141 int eol = 0, ast_type;
138 142
139 /* dlm_clear_proc_locks() sets ORPHAN/DEAD flag on each 143 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
140 lkb before dealing with it. We need to check this
141 flag before taking ls_clear_proc_locks mutex because if
142 it's set, dlm_clear_proc_locks() holds the mutex. */
143
144 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
145 /* log_print("user_add_ast skip1 %x", lkb->lkb_flags); */
146 return; 144 return;
147 }
148 145
149 ls = lkb->lkb_resource->res_ls; 146 ls = lkb->lkb_resource->res_ls;
150 mutex_lock(&ls->ls_clear_proc_locks); 147 mutex_lock(&ls->ls_clear_proc_locks);
151 148
152 /* If ORPHAN/DEAD flag is set, it means the process is dead so an ast 149 /* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
153 can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed 150 can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed
154 lkb->ua so we can't try to use it. */ 151 lkb->ua so we can't try to use it. This second check is necessary
152 for cases where a completion ast is received for an operation that
153 began before clear_proc_locks did its cancel/unlock. */
155 154
156 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) { 155 if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
157 /* log_print("user_add_ast skip2 %x", lkb->lkb_flags); */
158 goto out; 156 goto out;
159 }
160 157
161 DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb);); 158 DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb););
162 ua = (struct dlm_user_args *)lkb->lkb_astparam; 159 ua = (struct dlm_user_args *)lkb->lkb_astparam;
@@ -166,28 +163,42 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
166 goto out; 163 goto out;
167 164
168 spin_lock(&proc->asts_spin); 165 spin_lock(&proc->asts_spin);
169 if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) { 166
167 ast_type = lkb->lkb_ast_type;
168 lkb->lkb_ast_type |= type;
169
170 if (!ast_type) {
170 kref_get(&lkb->lkb_ref); 171 kref_get(&lkb->lkb_ref);
171 list_add_tail(&lkb->lkb_astqueue, &proc->asts); 172 list_add_tail(&lkb->lkb_astqueue, &proc->asts);
172 lkb->lkb_ast_type |= type;
173 wake_up_interruptible(&proc->wait); 173 wake_up_interruptible(&proc->wait);
174 } 174 }
175 175 if (type == AST_COMP && (ast_type & AST_COMP))
176 /* noqueue requests that fail may need to be removed from the 176 log_debug(ls, "ast overlap %x status %x %x",
177 proc's locks list, there should be a better way of detecting 177 lkb->lkb_id, ua->lksb.sb_status, lkb->lkb_flags);
178 this situation than checking all these things... */ 178
179 179 /* Figure out if this lock is at the end of its life and no longer
180 if (type == AST_COMP && lkb->lkb_grmode == DLM_LOCK_IV && 180 available for the application to use. The lkb still exists until
181 ua->lksb.sb_status == -EAGAIN && !list_empty(&lkb->lkb_ownqueue)) 181 the final ast is read. A lock becomes EOL in three situations:
182 remove_ownqueue = 1; 182 1. a noqueue request fails with EAGAIN
183 183 2. an unlock completes with EUNLOCK
184 /* unlocks or cancels of waiting requests need to be removed from the 184 3. a cancel of a waiting request completes with ECANCEL
185 proc's unlocking list, again there must be a better way... */ 185 An EOL lock needs to be removed from the process's list of locks.
186 186 And we can't allow any new operation on an EOL lock. This is
187 if (ua->lksb.sb_status == -DLM_EUNLOCK || 187 not related to the lifetime of the lkb struct which is managed
188 entirely by refcount. */
189
190 if (type == AST_COMP &&
191 lkb->lkb_grmode == DLM_LOCK_IV &&
192 ua->lksb.sb_status == -EAGAIN)
193 eol = 1;
194 else if (ua->lksb.sb_status == -DLM_EUNLOCK ||
188 (ua->lksb.sb_status == -DLM_ECANCEL && 195 (ua->lksb.sb_status == -DLM_ECANCEL &&
189 lkb->lkb_grmode == DLM_LOCK_IV)) 196 lkb->lkb_grmode == DLM_LOCK_IV))
190 remove_ownqueue = 1; 197 eol = 1;
198 if (eol) {
199 lkb->lkb_ast_type &= ~AST_BAST;
200 lkb->lkb_flags |= DLM_IFL_ENDOFLIFE;
201 }
191 202
192 /* We want to copy the lvb to userspace when the completion 203 /* We want to copy the lvb to userspace when the completion
193 ast is read if the status is 0, the lock has an lvb and 204 ast is read if the status is 0, the lock has an lvb and
@@ -204,11 +215,13 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
204 215
205 spin_unlock(&proc->asts_spin); 216 spin_unlock(&proc->asts_spin);
206 217
207 if (remove_ownqueue) { 218 if (eol) {
208 spin_lock(&ua->proc->locks_spin); 219 spin_lock(&ua->proc->locks_spin);
209 list_del_init(&lkb->lkb_ownqueue); 220 if (!list_empty(&lkb->lkb_ownqueue)) {
221 list_del_init(&lkb->lkb_ownqueue);
222 dlm_put_lkb(lkb);
223 }
210 spin_unlock(&ua->proc->locks_spin); 224 spin_unlock(&ua->proc->locks_spin);
211 dlm_put_lkb(lkb);
212 } 225 }
213 out: 226 out:
214 mutex_unlock(&ls->ls_clear_proc_locks); 227 mutex_unlock(&ls->ls_clear_proc_locks);
@@ -286,47 +299,71 @@ static int device_user_unlock(struct dlm_user_proc *proc,
286 return error; 299 return error;
287} 300}
288 301
289static int device_create_lockspace(struct dlm_lspace_params *params) 302static int create_misc_device(struct dlm_ls *ls, char *name)
290{ 303{
291 dlm_lockspace_t *lockspace;
292 struct dlm_ls *ls;
293 int error, len; 304 int error, len;
294 305
295 if (!capable(CAP_SYS_ADMIN))
296 return -EPERM;
297
298 error = dlm_new_lockspace(params->name, strlen(params->name),
299 &lockspace, 0, DLM_USER_LVB_LEN);
300 if (error)
301 return error;
302
303 ls = dlm_find_lockspace_local(lockspace);
304 if (!ls)
305 return -ENOENT;
306
307 error = -ENOMEM; 306 error = -ENOMEM;
308 len = strlen(params->name) + strlen(name_prefix) + 2; 307 len = strlen(name) + strlen(name_prefix) + 2;
309 ls->ls_device.name = kzalloc(len, GFP_KERNEL); 308 ls->ls_device.name = kzalloc(len, GFP_KERNEL);
310 if (!ls->ls_device.name) 309 if (!ls->ls_device.name)
311 goto fail; 310 goto fail;
311
312 snprintf((char *)ls->ls_device.name, len, "%s_%s", name_prefix, 312 snprintf((char *)ls->ls_device.name, len, "%s_%s", name_prefix,
313 params->name); 313 name);
314 ls->ls_device.fops = &device_fops; 314 ls->ls_device.fops = &device_fops;
315 ls->ls_device.minor = MISC_DYNAMIC_MINOR; 315 ls->ls_device.minor = MISC_DYNAMIC_MINOR;
316 316
317 error = misc_register(&ls->ls_device); 317 error = misc_register(&ls->ls_device);
318 if (error) { 318 if (error) {
319 kfree(ls->ls_device.name); 319 kfree(ls->ls_device.name);
320 goto fail;
321 } 320 }
321fail:
322 return error;
323}
324
325static int device_user_purge(struct dlm_user_proc *proc,
326 struct dlm_purge_params *params)
327{
328 struct dlm_ls *ls;
329 int error;
330
331 ls = dlm_find_lockspace_local(proc->lockspace);
332 if (!ls)
333 return -ENOENT;
334
335 error = dlm_user_purge(ls, proc, params->nodeid, params->pid);
322 336
323 error = ls->ls_device.minor;
324 dlm_put_lockspace(ls); 337 dlm_put_lockspace(ls);
325 return error; 338 return error;
339}
340
341static int device_create_lockspace(struct dlm_lspace_params *params)
342{
343 dlm_lockspace_t *lockspace;
344 struct dlm_ls *ls;
345 int error;
326 346
327 fail: 347 if (!capable(CAP_SYS_ADMIN))
348 return -EPERM;
349
350 error = dlm_new_lockspace(params->name, strlen(params->name),
351 &lockspace, 0, DLM_USER_LVB_LEN);
352 if (error)
353 return error;
354
355 ls = dlm_find_lockspace_local(lockspace);
356 if (!ls)
357 return -ENOENT;
358
359 error = create_misc_device(ls, params->name);
328 dlm_put_lockspace(ls); 360 dlm_put_lockspace(ls);
329 dlm_release_lockspace(lockspace, 0); 361
362 if (error)
363 dlm_release_lockspace(lockspace, 0);
364 else
365 error = ls->ls_device.minor;
366
330 return error; 367 return error;
331} 368}
332 369
@@ -343,6 +380,10 @@ static int device_remove_lockspace(struct dlm_lspace_params *params)
343 if (!ls) 380 if (!ls)
344 return -ENOENT; 381 return -ENOENT;
345 382
383 /* Deregister the misc device first, so we don't have
384 * a device that's not attached to a lockspace. If
385 * dlm_release_lockspace fails then we can recreate it
386 */
346 error = misc_deregister(&ls->ls_device); 387 error = misc_deregister(&ls->ls_device);
347 if (error) { 388 if (error) {
348 dlm_put_lockspace(ls); 389 dlm_put_lockspace(ls);
@@ -361,6 +402,8 @@ static int device_remove_lockspace(struct dlm_lspace_params *params)
361 402
362 dlm_put_lockspace(ls); 403 dlm_put_lockspace(ls);
363 error = dlm_release_lockspace(lockspace, force); 404 error = dlm_release_lockspace(lockspace, force);
405 if (error)
406 create_misc_device(ls, ls->ls_name);
364 out: 407 out:
365 return error; 408 return error;
366} 409}
@@ -497,6 +540,14 @@ static ssize_t device_write(struct file *file, const char __user *buf,
497 error = device_remove_lockspace(&kbuf->i.lspace); 540 error = device_remove_lockspace(&kbuf->i.lspace);
498 break; 541 break;
499 542
543 case DLM_USER_PURGE:
544 if (!proc) {
545 log_print("no locking on control device");
546 goto out_sig;
547 }
548 error = device_user_purge(proc, &kbuf->i.purge);
549 break;
550
500 default: 551 default:
501 log_print("Unknown command passed to DLM device : %d\n", 552 log_print("Unknown command passed to DLM device : %d\n",
502 kbuf->cmd); 553 kbuf->cmd);