aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2008-01-09 10:59:41 -0500
committerDavid Teigland <teigland@redhat.com>2008-01-30 12:04:42 -0500
commitc54e04b00fe027da30ada5af76b6749772dd644a (patch)
tree753945bb24638e9798220b2880650b4ba3211e40 /fs
parent46b43eed7018bab3a4e8c259eed27697b9170cb8 (diff)
dlm: validate messages before processing
There was some hit and miss validation of messages that has now been cleaned up and unified. Before processing a message, the new validate_message() function checks that the lkb is the appropriate type, process-copy or master-copy, and that the message is from the correct nodeid for the the given lkb. Other checks and assertions on the lkb type and nodeid have been removed. The assertions were particularly bad since they would panic the machine instead of just ignoring the bad message. Although other recent patches have made processing old message unlikely, it still may be possible for an old message to be processed and caught by these checks. Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs')
-rw-r--r--fs/dlm/lock.c139
1 files changed, 104 insertions, 35 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index c3b9fca17044..c2890efb0259 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -3008,8 +3008,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3008 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST); 3008 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
3009 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP); 3009 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
3010 3010
3011 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
3012
3013 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3011 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3014 /* lkb was just created so there won't be an lvb yet */ 3012 /* lkb was just created so there won't be an lvb yet */
3015 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3013 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
@@ -3023,16 +3021,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3023static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3021static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3024 struct dlm_message *ms) 3022 struct dlm_message *ms)
3025{ 3023{
3026 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
3027 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
3028 lkb->lkb_nodeid, ms->m_header.h_nodeid,
3029 lkb->lkb_id, lkb->lkb_remid);
3030 return -EINVAL;
3031 }
3032
3033 if (!is_master_copy(lkb))
3034 return -EINVAL;
3035
3036 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 3024 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3037 return -EBUSY; 3025 return -EBUSY;
3038 3026
@@ -3048,8 +3036,6 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3048static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3036static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3049 struct dlm_message *ms) 3037 struct dlm_message *ms)
3050{ 3038{
3051 if (!is_master_copy(lkb))
3052 return -EINVAL;
3053 if (receive_lvb(ls, lkb, ms)) 3039 if (receive_lvb(ls, lkb, ms))
3054 return -ENOMEM; 3040 return -ENOMEM;
3055 return 0; 3041 return 0;
@@ -3065,6 +3051,50 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3065 lkb->lkb_remid = ms->m_lkid; 3051 lkb->lkb_remid = ms->m_lkid;
3066} 3052}
3067 3053
3054/* This is called after the rsb is locked so that we can safely inspect
3055 fields in the lkb. */
3056
3057static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3058{
3059 int from = ms->m_header.h_nodeid;
3060 int error = 0;
3061
3062 switch (ms->m_type) {
3063 case DLM_MSG_CONVERT:
3064 case DLM_MSG_UNLOCK:
3065 case DLM_MSG_CANCEL:
3066 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3067 error = -EINVAL;
3068 break;
3069
3070 case DLM_MSG_CONVERT_REPLY:
3071 case DLM_MSG_UNLOCK_REPLY:
3072 case DLM_MSG_CANCEL_REPLY:
3073 case DLM_MSG_GRANT:
3074 case DLM_MSG_BAST:
3075 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3076 error = -EINVAL;
3077 break;
3078
3079 case DLM_MSG_REQUEST_REPLY:
3080 if (!is_process_copy(lkb))
3081 error = -EINVAL;
3082 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3083 error = -EINVAL;
3084 break;
3085
3086 default:
3087 error = -EINVAL;
3088 }
3089
3090 if (error)
3091 log_error(lkb->lkb_resource->res_ls,
3092 "ignore invalid message %d from %d %x %x %x %d",
3093 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3094 lkb->lkb_flags, lkb->lkb_nodeid);
3095 return error;
3096}
3097
3068static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) 3098static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3069{ 3099{
3070 struct dlm_lkb *lkb; 3100 struct dlm_lkb *lkb;
@@ -3126,17 +3156,21 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3126 hold_rsb(r); 3156 hold_rsb(r);
3127 lock_rsb(r); 3157 lock_rsb(r);
3128 3158
3159 error = validate_message(lkb, ms);
3160 if (error)
3161 goto out;
3162
3129 receive_flags(lkb, ms); 3163 receive_flags(lkb, ms);
3130 error = receive_convert_args(ls, lkb, ms); 3164 error = receive_convert_args(ls, lkb, ms);
3131 if (error) 3165 if (error)
3132 goto out; 3166 goto out_reply;
3133 reply = !down_conversion(lkb); 3167 reply = !down_conversion(lkb);
3134 3168
3135 error = do_convert(r, lkb); 3169 error = do_convert(r, lkb);
3136 out: 3170 out_reply:
3137 if (reply) 3171 if (reply)
3138 send_convert_reply(r, lkb, error); 3172 send_convert_reply(r, lkb, error);
3139 3173 out:
3140 unlock_rsb(r); 3174 unlock_rsb(r);
3141 put_rsb(r); 3175 put_rsb(r);
3142 dlm_put_lkb(lkb); 3176 dlm_put_lkb(lkb);
@@ -3162,15 +3196,19 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3162 hold_rsb(r); 3196 hold_rsb(r);
3163 lock_rsb(r); 3197 lock_rsb(r);
3164 3198
3199 error = validate_message(lkb, ms);
3200 if (error)
3201 goto out;
3202
3165 receive_flags(lkb, ms); 3203 receive_flags(lkb, ms);
3166 error = receive_unlock_args(ls, lkb, ms); 3204 error = receive_unlock_args(ls, lkb, ms);
3167 if (error) 3205 if (error)
3168 goto out; 3206 goto out_reply;
3169 3207
3170 error = do_unlock(r, lkb); 3208 error = do_unlock(r, lkb);
3171 out: 3209 out_reply:
3172 send_unlock_reply(r, lkb, error); 3210 send_unlock_reply(r, lkb, error);
3173 3211 out:
3174 unlock_rsb(r); 3212 unlock_rsb(r);
3175 put_rsb(r); 3213 put_rsb(r);
3176 dlm_put_lkb(lkb); 3214 dlm_put_lkb(lkb);
@@ -3198,9 +3236,13 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3198 hold_rsb(r); 3236 hold_rsb(r);
3199 lock_rsb(r); 3237 lock_rsb(r);
3200 3238
3239 error = validate_message(lkb, ms);
3240 if (error)
3241 goto out;
3242
3201 error = do_cancel(r, lkb); 3243 error = do_cancel(r, lkb);
3202 send_cancel_reply(r, lkb, error); 3244 send_cancel_reply(r, lkb, error);
3203 3245 out:
3204 unlock_rsb(r); 3246 unlock_rsb(r);
3205 put_rsb(r); 3247 put_rsb(r);
3206 dlm_put_lkb(lkb); 3248 dlm_put_lkb(lkb);
@@ -3219,22 +3261,26 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3219 3261
3220 error = find_lkb(ls, ms->m_remid, &lkb); 3262 error = find_lkb(ls, ms->m_remid, &lkb);
3221 if (error) { 3263 if (error) {
3222 log_error(ls, "receive_grant no lkb"); 3264 log_debug(ls, "receive_grant from %d no lkb %x",
3265 ms->m_header.h_nodeid, ms->m_remid);
3223 return; 3266 return;
3224 } 3267 }
3225 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3226 3268
3227 r = lkb->lkb_resource; 3269 r = lkb->lkb_resource;
3228 3270
3229 hold_rsb(r); 3271 hold_rsb(r);
3230 lock_rsb(r); 3272 lock_rsb(r);
3231 3273
3274 error = validate_message(lkb, ms);
3275 if (error)
3276 goto out;
3277
3232 receive_flags_reply(lkb, ms); 3278 receive_flags_reply(lkb, ms);
3233 if (is_altmode(lkb)) 3279 if (is_altmode(lkb))
3234 munge_altmode(lkb, ms); 3280 munge_altmode(lkb, ms);
3235 grant_lock_pc(r, lkb, ms); 3281 grant_lock_pc(r, lkb, ms);
3236 queue_cast(r, lkb, 0); 3282 queue_cast(r, lkb, 0);
3237 3283 out:
3238 unlock_rsb(r); 3284 unlock_rsb(r);
3239 put_rsb(r); 3285 put_rsb(r);
3240 dlm_put_lkb(lkb); 3286 dlm_put_lkb(lkb);
@@ -3248,18 +3294,22 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3248 3294
3249 error = find_lkb(ls, ms->m_remid, &lkb); 3295 error = find_lkb(ls, ms->m_remid, &lkb);
3250 if (error) { 3296 if (error) {
3251 log_error(ls, "receive_bast no lkb"); 3297 log_debug(ls, "receive_bast from %d no lkb %x",
3298 ms->m_header.h_nodeid, ms->m_remid);
3252 return; 3299 return;
3253 } 3300 }
3254 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3255 3301
3256 r = lkb->lkb_resource; 3302 r = lkb->lkb_resource;
3257 3303
3258 hold_rsb(r); 3304 hold_rsb(r);
3259 lock_rsb(r); 3305 lock_rsb(r);
3260 3306
3261 queue_bast(r, lkb, ms->m_bastmode); 3307 error = validate_message(lkb, ms);
3308 if (error)
3309 goto out;
3262 3310
3311 queue_bast(r, lkb, ms->m_bastmode);
3312 out:
3263 unlock_rsb(r); 3313 unlock_rsb(r);
3264 put_rsb(r); 3314 put_rsb(r);
3265 dlm_put_lkb(lkb); 3315 dlm_put_lkb(lkb);
@@ -3325,15 +3375,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3325 3375
3326 error = find_lkb(ls, ms->m_remid, &lkb); 3376 error = find_lkb(ls, ms->m_remid, &lkb);
3327 if (error) { 3377 if (error) {
3328 log_error(ls, "receive_request_reply no lkb"); 3378 log_debug(ls, "receive_request_reply from %d no lkb %x",
3379 ms->m_header.h_nodeid, ms->m_remid);
3329 return; 3380 return;
3330 } 3381 }
3331 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3332 3382
3333 r = lkb->lkb_resource; 3383 r = lkb->lkb_resource;
3334 hold_rsb(r); 3384 hold_rsb(r);
3335 lock_rsb(r); 3385 lock_rsb(r);
3336 3386
3387 error = validate_message(lkb, ms);
3388 if (error)
3389 goto out;
3390
3337 mstype = lkb->lkb_wait_type; 3391 mstype = lkb->lkb_wait_type;
3338 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 3392 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3339 if (error) 3393 if (error)
@@ -3466,6 +3520,10 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3466 hold_rsb(r); 3520 hold_rsb(r);
3467 lock_rsb(r); 3521 lock_rsb(r);
3468 3522
3523 error = validate_message(lkb, ms);
3524 if (error)
3525 goto out;
3526
3469 /* stub reply can happen with waiters_mutex held */ 3527 /* stub reply can happen with waiters_mutex held */
3470 error = remove_from_waiters_ms(lkb, ms); 3528 error = remove_from_waiters_ms(lkb, ms);
3471 if (error) 3529 if (error)
@@ -3484,10 +3542,10 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3484 3542
3485 error = find_lkb(ls, ms->m_remid, &lkb); 3543 error = find_lkb(ls, ms->m_remid, &lkb);
3486 if (error) { 3544 if (error) {
3487 log_error(ls, "receive_convert_reply no lkb"); 3545 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3546 ms->m_header.h_nodeid, ms->m_remid);
3488 return; 3547 return;
3489 } 3548 }
3490 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3491 3549
3492 _receive_convert_reply(lkb, ms); 3550 _receive_convert_reply(lkb, ms);
3493 dlm_put_lkb(lkb); 3551 dlm_put_lkb(lkb);
@@ -3501,6 +3559,10 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3501 hold_rsb(r); 3559 hold_rsb(r);
3502 lock_rsb(r); 3560 lock_rsb(r);
3503 3561
3562 error = validate_message(lkb, ms);
3563 if (error)
3564 goto out;
3565
3504 /* stub reply can happen with waiters_mutex held */ 3566 /* stub reply can happen with waiters_mutex held */
3505 error = remove_from_waiters_ms(lkb, ms); 3567 error = remove_from_waiters_ms(lkb, ms);
3506 if (error) 3568 if (error)
@@ -3532,10 +3594,10 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3532 3594
3533 error = find_lkb(ls, ms->m_remid, &lkb); 3595 error = find_lkb(ls, ms->m_remid, &lkb);
3534 if (error) { 3596 if (error) {
3535 log_error(ls, "receive_unlock_reply no lkb"); 3597 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3598 ms->m_header.h_nodeid, ms->m_remid);
3536 return; 3599 return;
3537 } 3600 }
3538 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3539 3601
3540 _receive_unlock_reply(lkb, ms); 3602 _receive_unlock_reply(lkb, ms);
3541 dlm_put_lkb(lkb); 3603 dlm_put_lkb(lkb);
@@ -3549,6 +3611,10 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3549 hold_rsb(r); 3611 hold_rsb(r);
3550 lock_rsb(r); 3612 lock_rsb(r);
3551 3613
3614 error = validate_message(lkb, ms);
3615 if (error)
3616 goto out;
3617
3552 /* stub reply can happen with waiters_mutex held */ 3618 /* stub reply can happen with waiters_mutex held */
3553 error = remove_from_waiters_ms(lkb, ms); 3619 error = remove_from_waiters_ms(lkb, ms);
3554 if (error) 3620 if (error)
@@ -3580,10 +3646,10 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3580 3646
3581 error = find_lkb(ls, ms->m_remid, &lkb); 3647 error = find_lkb(ls, ms->m_remid, &lkb);
3582 if (error) { 3648 if (error) {
3583 log_error(ls, "receive_cancel_reply no lkb"); 3649 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3650 ms->m_header.h_nodeid, ms->m_remid);
3584 return; 3651 return;
3585 } 3652 }
3586 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3587 3653
3588 _receive_cancel_reply(lkb, ms); 3654 _receive_cancel_reply(lkb, ms);
3589 dlm_put_lkb(lkb); 3655 dlm_put_lkb(lkb);
@@ -3816,6 +3882,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3816 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 3882 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3817 ls->ls_stub_ms.m_result = -EINPROGRESS; 3883 ls->ls_stub_ms.m_result = -EINPROGRESS;
3818 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3884 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3885 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3819 _receive_convert_reply(lkb, &ls->ls_stub_ms); 3886 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3820 3887
3821 /* Same special case as in receive_rcom_lock_args() */ 3888 /* Same special case as in receive_rcom_lock_args() */
@@ -3917,6 +3984,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3917 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY; 3984 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
3918 ls->ls_stub_ms.m_result = stub_unlock_result; 3985 ls->ls_stub_ms.m_result = stub_unlock_result;
3919 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3986 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3987 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3920 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 3988 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3921 dlm_put_lkb(lkb); 3989 dlm_put_lkb(lkb);
3922 break; 3990 break;
@@ -3926,6 +3994,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
3926 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY; 3994 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
3927 ls->ls_stub_ms.m_result = stub_cancel_result; 3995 ls->ls_stub_ms.m_result = stub_cancel_result;
3928 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3996 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3997 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3929 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 3998 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3930 dlm_put_lkb(lkb); 3999 dlm_put_lkb(lkb);
3931 break; 4000 break;