aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/stack_user.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/stack_user.c')
-rw-r--r--fs/ocfs2/stack_user.c308
1 files changed, 268 insertions, 40 deletions
diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index 286edf1e231f..13a8537d8e8b 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -23,6 +23,7 @@
23#include <linux/mutex.h> 23#include <linux/mutex.h>
24#include <linux/slab.h> 24#include <linux/slab.h>
25#include <linux/reboot.h> 25#include <linux/reboot.h>
26#include <linux/sched.h>
26#include <asm/uaccess.h> 27#include <asm/uaccess.h>
27 28
28#include "stackglue.h" 29#include "stackglue.h"
@@ -102,6 +103,12 @@
102#define OCFS2_TEXT_UUID_LEN 32 103#define OCFS2_TEXT_UUID_LEN 32
103#define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2 104#define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2
104#define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8 105#define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8
106#define VERSION_LOCK "version_lock"
107
108enum ocfs2_connection_type {
109 WITH_CONTROLD,
110 NO_CONTROLD
111};
105 112
106/* 113/*
107 * ocfs2_live_connection is refcounted because the filesystem and 114 * ocfs2_live_connection is refcounted because the filesystem and
@@ -110,6 +117,13 @@
110struct ocfs2_live_connection { 117struct ocfs2_live_connection {
111 struct list_head oc_list; 118 struct list_head oc_list;
112 struct ocfs2_cluster_connection *oc_conn; 119 struct ocfs2_cluster_connection *oc_conn;
120 enum ocfs2_connection_type oc_type;
121 atomic_t oc_this_node;
122 int oc_our_slot;
123 struct dlm_lksb oc_version_lksb;
124 char oc_lvb[DLM_LVB_LEN];
125 struct completion oc_sync_wait;
126 wait_queue_head_t oc_wait;
113}; 127};
114 128
115struct ocfs2_control_private { 129struct ocfs2_control_private {
@@ -198,20 +212,15 @@ static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
198 * mount path. Since the VFS prevents multiple calls to 212 * mount path. Since the VFS prevents multiple calls to
199 * fill_super(), we can't get dupes here. 213 * fill_super(), we can't get dupes here.
200 */ 214 */
201static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn, 215static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
202 struct ocfs2_live_connection **c_ret) 216 struct ocfs2_live_connection *c)
203{ 217{
204 int rc = 0; 218 int rc = 0;
205 struct ocfs2_live_connection *c;
206
207 c = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
208 if (!c)
209 return -ENOMEM;
210 219
211 mutex_lock(&ocfs2_control_lock); 220 mutex_lock(&ocfs2_control_lock);
212 c->oc_conn = conn; 221 c->oc_conn = conn;
213 222
214 if (atomic_read(&ocfs2_control_opened)) 223 if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
215 list_add(&c->oc_list, &ocfs2_live_connection_list); 224 list_add(&c->oc_list, &ocfs2_live_connection_list);
216 else { 225 else {
217 printk(KERN_ERR 226 printk(KERN_ERR
@@ -220,12 +229,6 @@ static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,
220 } 229 }
221 230
222 mutex_unlock(&ocfs2_control_lock); 231 mutex_unlock(&ocfs2_control_lock);
223
224 if (!rc)
225 *c_ret = c;
226 else
227 kfree(c);
228
229 return rc; 232 return rc;
230} 233}
231 234
@@ -799,18 +802,251 @@ static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
799 return 0; 802 return 0;
800} 803}
801 804
805static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
806{
807 struct ocfs2_protocol_version *pv =
808 (struct ocfs2_protocol_version *)lvb;
809 /*
810 * ocfs2_protocol_version has two u8 variables, so we don't
811 * need any endian conversion.
812 */
813 ver->pv_major = pv->pv_major;
814 ver->pv_minor = pv->pv_minor;
815}
816
817static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
818{
819 struct ocfs2_protocol_version *pv =
820 (struct ocfs2_protocol_version *)lvb;
821 /*
822 * ocfs2_protocol_version has two u8 variables, so we don't
823 * need any endian conversion.
824 */
825 pv->pv_major = ver->pv_major;
826 pv->pv_minor = ver->pv_minor;
827}
828
829static void sync_wait_cb(void *arg)
830{
831 struct ocfs2_cluster_connection *conn = arg;
832 struct ocfs2_live_connection *lc = conn->cc_private;
833 complete(&lc->oc_sync_wait);
834}
835
836static int sync_unlock(struct ocfs2_cluster_connection *conn,
837 struct dlm_lksb *lksb, char *name)
838{
839 int error;
840 struct ocfs2_live_connection *lc = conn->cc_private;
841
842 error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
843 if (error) {
844 printk(KERN_ERR "%s lkid %x error %d\n",
845 name, lksb->sb_lkid, error);
846 return error;
847 }
848
849 wait_for_completion(&lc->oc_sync_wait);
850
851 if (lksb->sb_status != -DLM_EUNLOCK) {
852 printk(KERN_ERR "%s lkid %x status %d\n",
853 name, lksb->sb_lkid, lksb->sb_status);
854 return -1;
855 }
856 return 0;
857}
858
859static int sync_lock(struct ocfs2_cluster_connection *conn,
860 int mode, uint32_t flags,
861 struct dlm_lksb *lksb, char *name)
862{
863 int error, status;
864 struct ocfs2_live_connection *lc = conn->cc_private;
865
866 error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
867 name, strlen(name),
868 0, sync_wait_cb, conn, NULL);
869 if (error) {
870 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
871 name, lksb->sb_lkid, flags, mode, error);
872 return error;
873 }
874
875 wait_for_completion(&lc->oc_sync_wait);
876
877 status = lksb->sb_status;
878
879 if (status && status != -EAGAIN) {
880 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
881 name, lksb->sb_lkid, flags, mode, status);
882 }
883
884 return status;
885}
886
887
888static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
889 int flags)
890{
891 struct ocfs2_live_connection *lc = conn->cc_private;
892 return sync_lock(conn, mode, flags,
893 &lc->oc_version_lksb, VERSION_LOCK);
894}
895
896static int version_unlock(struct ocfs2_cluster_connection *conn)
897{
898 struct ocfs2_live_connection *lc = conn->cc_private;
899 return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
900}
901
902/* get_protocol_version()
903 *
904 * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
905 * The algorithm is:
906 * 1. Attempt to take the lock in EX mode (non-blocking).
907 * 2. If successful (which means it is the first mount), write the
908 * version number and downconvert to PR lock.
909 * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
910 * taking the PR lock.
911 */
912
913static int get_protocol_version(struct ocfs2_cluster_connection *conn)
914{
915 int ret;
916 struct ocfs2_live_connection *lc = conn->cc_private;
917 struct ocfs2_protocol_version pv;
918
919 running_proto.pv_major =
920 ocfs2_user_plugin.sp_max_proto.pv_major;
921 running_proto.pv_minor =
922 ocfs2_user_plugin.sp_max_proto.pv_minor;
923
924 lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
925 ret = version_lock(conn, DLM_LOCK_EX,
926 DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
927 if (!ret) {
928 conn->cc_version.pv_major = running_proto.pv_major;
929 conn->cc_version.pv_minor = running_proto.pv_minor;
930 version_to_lvb(&running_proto, lc->oc_lvb);
931 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
932 } else if (ret == -EAGAIN) {
933 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
934 if (ret)
935 goto out;
936 lvb_to_version(lc->oc_lvb, &pv);
937
938 if ((pv.pv_major != running_proto.pv_major) ||
939 (pv.pv_minor > running_proto.pv_minor)) {
940 ret = -EINVAL;
941 goto out;
942 }
943
944 conn->cc_version.pv_major = pv.pv_major;
945 conn->cc_version.pv_minor = pv.pv_minor;
946 }
947out:
948 return ret;
949}
950
951static void user_recover_prep(void *arg)
952{
953}
954
955static void user_recover_slot(void *arg, struct dlm_slot *slot)
956{
957 struct ocfs2_cluster_connection *conn = arg;
958 printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
959 slot->nodeid, slot->slot);
960 conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
961
962}
963
964static void user_recover_done(void *arg, struct dlm_slot *slots,
965 int num_slots, int our_slot,
966 uint32_t generation)
967{
968 struct ocfs2_cluster_connection *conn = arg;
969 struct ocfs2_live_connection *lc = conn->cc_private;
970 int i;
971
972 for (i = 0; i < num_slots; i++)
973 if (slots[i].slot == our_slot) {
974 atomic_set(&lc->oc_this_node, slots[i].nodeid);
975 break;
976 }
977
978 lc->oc_our_slot = our_slot;
979 wake_up(&lc->oc_wait);
980}
981
982static const struct dlm_lockspace_ops ocfs2_ls_ops = {
983 .recover_prep = user_recover_prep,
984 .recover_slot = user_recover_slot,
985 .recover_done = user_recover_done,
986};
987
988static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
989{
990 version_unlock(conn);
991 dlm_release_lockspace(conn->cc_lockspace, 2);
992 conn->cc_lockspace = NULL;
993 ocfs2_live_connection_drop(conn->cc_private);
994 conn->cc_private = NULL;
995 return 0;
996}
997
802static int user_cluster_connect(struct ocfs2_cluster_connection *conn) 998static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
803{ 999{
804 dlm_lockspace_t *fsdlm; 1000 dlm_lockspace_t *fsdlm;
805 struct ocfs2_live_connection *uninitialized_var(control); 1001 struct ocfs2_live_connection *lc;
806 int rc = 0; 1002 int rc, ops_rv;
807 1003
808 BUG_ON(conn == NULL); 1004 BUG_ON(conn == NULL);
809 1005
810 rc = ocfs2_live_connection_new(conn, &control); 1006 lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
1007 if (!lc) {
1008 rc = -ENOMEM;
1009 goto out;
1010 }
1011
1012 init_waitqueue_head(&lc->oc_wait);
1013 init_completion(&lc->oc_sync_wait);
1014 atomic_set(&lc->oc_this_node, 0);
1015 conn->cc_private = lc;
1016 lc->oc_type = NO_CONTROLD;
1017
1018 rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
1019 DLM_LSFL_FS, DLM_LVB_LEN,
1020 &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
1021 if (rc)
1022 goto out;
1023
1024 if (ops_rv == -EOPNOTSUPP) {
1025 lc->oc_type = WITH_CONTROLD;
1026 printk(KERN_NOTICE "ocfs2: You seem to be using an older "
1027 "version of dlm_controld and/or ocfs2-tools."
1028 " Please consider upgrading.\n");
1029 } else if (ops_rv) {
1030 rc = ops_rv;
1031 goto out;
1032 }
1033 conn->cc_lockspace = fsdlm;
1034
1035 rc = ocfs2_live_connection_attach(conn, lc);
811 if (rc) 1036 if (rc)
812 goto out; 1037 goto out;
813 1038
1039 if (lc->oc_type == NO_CONTROLD) {
1040 rc = get_protocol_version(conn);
1041 if (rc) {
1042 printk(KERN_ERR "ocfs2: Could not determine"
1043 " locking version\n");
1044 user_cluster_disconnect(conn);
1045 goto out;
1046 }
1047 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
1048 }
1049
814 /* 1050 /*
815 * running_proto must have been set before we allowed any mounts 1051 * running_proto must have been set before we allowed any mounts
816 * to proceed. 1052 * to proceed.
@@ -818,42 +1054,34 @@ static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
818 if (fs_protocol_compare(&running_proto, &conn->cc_version)) { 1054 if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
819 printk(KERN_ERR 1055 printk(KERN_ERR
820 "Unable to mount with fs locking protocol version " 1056 "Unable to mount with fs locking protocol version "
821 "%u.%u because the userspace control daemon has " 1057 "%u.%u because negotiated protocol is %u.%u\n",
822 "negotiated %u.%u\n",
823 conn->cc_version.pv_major, conn->cc_version.pv_minor, 1058 conn->cc_version.pv_major, conn->cc_version.pv_minor,
824 running_proto.pv_major, running_proto.pv_minor); 1059 running_proto.pv_major, running_proto.pv_minor);
825 rc = -EPROTO; 1060 rc = -EPROTO;
826 ocfs2_live_connection_drop(control); 1061 ocfs2_live_connection_drop(lc);
827 goto out; 1062 lc = NULL;
828 }
829
830 rc = dlm_new_lockspace(conn->cc_name, NULL, DLM_LSFL_FS, DLM_LVB_LEN,
831 NULL, NULL, NULL, &fsdlm);
832 if (rc) {
833 ocfs2_live_connection_drop(control);
834 goto out;
835 } 1063 }
836 1064
837 conn->cc_private = control;
838 conn->cc_lockspace = fsdlm;
839out: 1065out:
1066 if (rc && lc)
1067 kfree(lc);
840 return rc; 1068 return rc;
841} 1069}
842 1070
843static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
844{
845 dlm_release_lockspace(conn->cc_lockspace, 2);
846 conn->cc_lockspace = NULL;
847 ocfs2_live_connection_drop(conn->cc_private);
848 conn->cc_private = NULL;
849 return 0;
850}
851 1071
852static int user_cluster_this_node(unsigned int *this_node) 1072static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
1073 unsigned int *this_node)
853{ 1074{
854 int rc; 1075 int rc;
1076 struct ocfs2_live_connection *lc = conn->cc_private;
1077
1078 if (lc->oc_type == WITH_CONTROLD)
1079 rc = ocfs2_control_get_this_node();
1080 else if (lc->oc_type == NO_CONTROLD)
1081 rc = atomic_read(&lc->oc_this_node);
1082 else
1083 rc = -EINVAL;
855 1084
856 rc = ocfs2_control_get_this_node();
857 if (rc < 0) 1085 if (rc < 0)
858 return rc; 1086 return rc;
859 1087