aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-10-13 05:04:25 -0400
committerDavid S. Miller <davem@davemloft.net>2017-10-13 11:46:00 -0400
commitae236fb208a6fbbd2e7a6913385e8fb78ac807f8 (patch)
tree451b5798223b599434923a90243a94372d7c6cf1 /net/tipc
parent31c82a2d9d51fccbb85cbd2be983eb115225301c (diff)
tipc: receive group membership events via member socket
Like with any other service, group members' availability can be subscribed for by connecting to be topology server. However, because the events arrive via a different socket than the member socket, there is a real risk that membership events my arrive out of synch with the actual JOIN/LEAVE action. I.e., it is possible to receive the first messages from a new member before the corresponding JOIN event arrives, just as it is possible to receive the last messages from a leaving member after the LEAVE event has already been received. Since each member socket is internally also subscribing for membership events, we now fix this problem by passing those events on to the user via the member socket. We leverage the already present member synch- ronization protocol to guarantee correct message/event order. An event is delivered to the user as an empty message where the two source addresses identify the new/lost member. Furthermore, we set the MSG_OOB bit in the message flags to mark it as an event. If the event is an indication about a member loss we also set the MSG_EOR bit, so it can be distinguished from a member addition event. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/group.c60
-rw-r--r--net/tipc/group.h2
-rw-r--r--net/tipc/msg.h22
-rw-r--r--net/tipc/socket.c49
4 files changed, 100 insertions, 33 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index beb214a3420c..1bfa9348b26d 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -59,6 +59,7 @@ enum mbr_state {
59struct tipc_member { 59struct tipc_member {
60 struct rb_node tree_node; 60 struct rb_node tree_node;
61 struct list_head list; 61 struct list_head list;
62 struct sk_buff *event_msg;
62 u32 node; 63 u32 node;
63 u32 port; 64 u32 port;
64 u32 instance; 65 u32 instance;
@@ -79,6 +80,7 @@ struct tipc_group {
79 u16 member_cnt; 80 u16 member_cnt;
80 u16 bc_snd_nxt; 81 u16 bc_snd_nxt;
81 bool loopback; 82 bool loopback;
83 bool events;
82}; 84};
83 85
84static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 86static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
@@ -117,6 +119,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
117 grp->instance = mreq->instance; 119 grp->instance = mreq->instance;
118 grp->scope = mreq->scope; 120 grp->scope = mreq->scope;
119 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 121 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
122 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
120 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) 123 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
121 return grp; 124 return grp;
122 kfree(grp); 125 kfree(grp);
@@ -279,6 +282,13 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
279 if (!msg_in_group(hdr)) 282 if (!msg_in_group(hdr))
280 goto drop; 283 goto drop;
281 284
285 if (mtyp == TIPC_GRP_MEMBER_EVT) {
286 if (!grp->events)
287 goto drop;
288 __skb_queue_tail(inputq, skb);
289 return;
290 }
291
282 m = tipc_group_find_member(grp, node, port); 292 m = tipc_group_find_member(grp, node, port);
283 if (!tipc_group_is_receiver(m)) 293 if (!tipc_group_is_receiver(m))
284 goto drop; 294 goto drop;
@@ -311,6 +321,7 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
311} 321}
312 322
313void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, 323void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
324 struct sk_buff_head *inputq,
314 struct sk_buff_head *xmitq) 325 struct sk_buff_head *xmitq)
315{ 326{
316 u32 node = msg_orignode(hdr); 327 u32 node = msg_orignode(hdr);
@@ -332,10 +343,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
332 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); 343 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
333 344
334 /* Wait until PUBLISH event is received */ 345 /* Wait until PUBLISH event is received */
335 if (m->state == MBR_DISCOVERED) 346 if (m->state == MBR_DISCOVERED) {
336 m->state = MBR_JOINING; 347 m->state = MBR_JOINING;
337 else if (m->state == MBR_PUBLISHED) 348 } else if (m->state == MBR_PUBLISHED) {
338 m->state = MBR_JOINED; 349 m->state = MBR_JOINED;
350 __skb_queue_tail(inputq, m->event_msg);
351 }
339 return; 352 return;
340 case GRP_LEAVE_MSG: 353 case GRP_LEAVE_MSG:
341 if (!m) 354 if (!m)
@@ -347,6 +360,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
347 return; 360 return;
348 } 361 }
349 /* Otherwise deliver already received WITHDRAW event */ 362 /* Otherwise deliver already received WITHDRAW event */
363 __skb_queue_tail(inputq, m->event_msg);
350 tipc_group_delete_member(grp, m); 364 tipc_group_delete_member(grp, m);
351 return; 365 return;
352 default: 366 default:
@@ -354,16 +368,17 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
354 } 368 }
355} 369}
356 370
357/* tipc_group_member_evt() - receive and handle a member up/down event
358 */
359void tipc_group_member_evt(struct tipc_group *grp, 371void tipc_group_member_evt(struct tipc_group *grp,
360 struct sk_buff *skb, 372 struct sk_buff *skb,
373 struct sk_buff_head *inputq,
361 struct sk_buff_head *xmitq) 374 struct sk_buff_head *xmitq)
362{ 375{
363 struct tipc_msg *hdr = buf_msg(skb); 376 struct tipc_msg *hdr = buf_msg(skb);
364 struct tipc_event *evt = (void *)msg_data(hdr); 377 struct tipc_event *evt = (void *)msg_data(hdr);
378 u32 instance = evt->found_lower;
365 u32 node = evt->port.node; 379 u32 node = evt->port.node;
366 u32 port = evt->port.ref; 380 u32 port = evt->port.ref;
381 int event = evt->event;
367 struct tipc_member *m; 382 struct tipc_member *m;
368 struct net *net; 383 struct net *net;
369 u32 self; 384 u32 self;
@@ -376,32 +391,51 @@ void tipc_group_member_evt(struct tipc_group *grp,
376 if (!grp->loopback && node == self && port == grp->portid) 391 if (!grp->loopback && node == self && port == grp->portid)
377 goto drop; 392 goto drop;
378 393
394 /* Convert message before delivery to user */
395 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
396 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
397 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
398 msg_set_origport(hdr, port);
399 msg_set_orignode(hdr, node);
400 msg_set_nametype(hdr, grp->type);
401 msg_set_grp_evt(hdr, event);
402
379 m = tipc_group_find_member(grp, node, port); 403 m = tipc_group_find_member(grp, node, port);
380 404
381 if (evt->event == TIPC_PUBLISHED) { 405 if (event == TIPC_PUBLISHED) {
382 if (!m) 406 if (!m)
383 m = tipc_group_create_member(grp, node, port, 407 m = tipc_group_create_member(grp, node, port,
384 MBR_DISCOVERED); 408 MBR_DISCOVERED);
385 if (!m) 409 if (!m)
386 goto drop; 410 goto drop;
387 411
388 /* Wait if JOIN message not yet received */ 412 /* Hold back event if JOIN message not yet received */
389 if (m->state == MBR_DISCOVERED) 413 if (m->state == MBR_DISCOVERED) {
414 m->event_msg = skb;
390 m->state = MBR_PUBLISHED; 415 m->state = MBR_PUBLISHED;
391 else 416 } else {
417 __skb_queue_tail(inputq, skb);
392 m->state = MBR_JOINED; 418 m->state = MBR_JOINED;
393 m->instance = evt->found_lower; 419 }
420 m->instance = instance;
421 TIPC_SKB_CB(skb)->orig_member = m->instance;
394 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 422 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
395 } else if (evt->event == TIPC_WITHDRAWN) { 423 } else if (event == TIPC_WITHDRAWN) {
396 if (!m) 424 if (!m)
397 goto drop; 425 goto drop;
398 426
399 /* Keep back event if more messages might be expected */ 427 TIPC_SKB_CB(skb)->orig_member = m->instance;
400 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) 428
429 /* Hold back event if more messages might be expected */
430 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
431 m->event_msg = skb;
401 m->state = MBR_LEAVING; 432 m->state = MBR_LEAVING;
402 else 433 } else {
434 __skb_queue_tail(inputq, skb);
403 tipc_group_delete_member(grp, m); 435 tipc_group_delete_member(grp, m);
436 }
404 } 437 }
438 return;
405drop: 439drop:
406 kfree_skb(skb); 440 kfree_skb(skb);
407} 441}
diff --git a/net/tipc/group.h b/net/tipc/group.h
index 9bdf4479fc03..5d3f10d28967 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -54,9 +54,11 @@ void tipc_group_filter_msg(struct tipc_group *grp,
54 struct sk_buff_head *xmitq); 54 struct sk_buff_head *xmitq);
55void tipc_group_member_evt(struct tipc_group *grp, 55void tipc_group_member_evt(struct tipc_group *grp,
56 struct sk_buff *skb, 56 struct sk_buff *skb,
57 struct sk_buff_head *inputq,
57 struct sk_buff_head *xmitq); 58 struct sk_buff_head *xmitq);
58void tipc_group_proto_rcv(struct tipc_group *grp, 59void tipc_group_proto_rcv(struct tipc_group *grp,
59 struct tipc_msg *hdr, 60 struct tipc_msg *hdr,
61 struct sk_buff_head *inputq,
60 struct sk_buff_head *xmitq); 62 struct sk_buff_head *xmitq);
61void tipc_group_update_bc_members(struct tipc_group *grp); 63void tipc_group_update_bc_members(struct tipc_group *grp);
62u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); 64u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index e438716d2372..1b527b154e46 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -65,7 +65,8 @@ struct plist;
65#define TIPC_MCAST_MSG 1 65#define TIPC_MCAST_MSG 1
66#define TIPC_NAMED_MSG 2 66#define TIPC_NAMED_MSG 2
67#define TIPC_DIRECT_MSG 3 67#define TIPC_DIRECT_MSG 3
68#define TIPC_GRP_BCAST_MSG 4 68#define TIPC_GRP_MEMBER_EVT 4
69#define TIPC_GRP_BCAST_MSG 5
69 70
70/* 71/*
71 * Internal message users 72 * Internal message users
@@ -258,7 +259,14 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n)
258 259
259static inline int msg_in_group(struct tipc_msg *m) 260static inline int msg_in_group(struct tipc_msg *m)
260{ 261{
261 return (msg_type(m) == TIPC_GRP_BCAST_MSG); 262 int mtyp = msg_type(m);
263
264 return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT);
265}
266
267static inline bool msg_is_grp_evt(struct tipc_msg *m)
268{
269 return msg_type(m) == TIPC_GRP_MEMBER_EVT;
262} 270}
263 271
264static inline u32 msg_named(struct tipc_msg *m) 272static inline u32 msg_named(struct tipc_msg *m)
@@ -824,6 +832,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n)
824 832
825/* Word 10 833/* Word 10
826 */ 834 */
835static inline u16 msg_grp_evt(struct tipc_msg *m)
836{
837 return msg_bits(m, 10, 0, 0x3);
838}
839
840static inline void msg_set_grp_evt(struct tipc_msg *m, int n)
841{
842 msg_set_bits(m, 10, 0, 0x3, n);
843}
844
827static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) 845static inline u16 msg_grp_bc_seqno(struct tipc_msg *m)
828{ 846{
829 return msg_bits(m, 10, 16, 0xffff); 847 return msg_bits(m, 10, 16, 0xffff);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 25ecf1201527..0a2eac309177 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -709,41 +709,43 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
709 poll_table *wait) 709 poll_table *wait)
710{ 710{
711 struct sock *sk = sock->sk; 711 struct sock *sk = sock->sk;
712 struct sk_buff *skb = skb_peek(&sk->sk_receive_queue);
712 struct tipc_sock *tsk = tipc_sk(sk); 713 struct tipc_sock *tsk = tipc_sk(sk);
713 struct tipc_group *grp = tsk->group; 714 struct tipc_group *grp = tsk->group;
714 u32 mask = 0; 715 u32 revents = 0;
715 716
716 sock_poll_wait(file, sk_sleep(sk), wait); 717 sock_poll_wait(file, sk_sleep(sk), wait);
717 718
718 if (sk->sk_shutdown & RCV_SHUTDOWN) 719 if (sk->sk_shutdown & RCV_SHUTDOWN)
719 mask |= POLLRDHUP | POLLIN | POLLRDNORM; 720 revents |= POLLRDHUP | POLLIN | POLLRDNORM;
720 if (sk->sk_shutdown == SHUTDOWN_MASK) 721 if (sk->sk_shutdown == SHUTDOWN_MASK)
721 mask |= POLLHUP; 722 revents |= POLLHUP;
722 723
723 switch (sk->sk_state) { 724 switch (sk->sk_state) {
724 case TIPC_ESTABLISHED: 725 case TIPC_ESTABLISHED:
725 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) 726 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
726 mask |= POLLOUT; 727 revents |= POLLOUT;
727 /* fall thru' */ 728 /* fall thru' */
728 case TIPC_LISTEN: 729 case TIPC_LISTEN:
729 case TIPC_CONNECTING: 730 case TIPC_CONNECTING:
730 if (!skb_queue_empty(&sk->sk_receive_queue)) 731 if (skb)
731 mask |= (POLLIN | POLLRDNORM); 732 revents |= POLLIN | POLLRDNORM;
732 break; 733 break;
733 case TIPC_OPEN: 734 case TIPC_OPEN:
734 if (!grp || tipc_group_size(grp)) 735 if (!grp || tipc_group_size(grp))
735 if (!tsk->cong_link_cnt) 736 if (!tsk->cong_link_cnt)
736 mask |= POLLOUT; 737 revents |= POLLOUT;
737 if (tipc_sk_type_connectionless(sk) && 738 if (!tipc_sk_type_connectionless(sk))
738 (!skb_queue_empty(&sk->sk_receive_queue))) 739 break;
739 mask |= (POLLIN | POLLRDNORM); 740 if (!skb)
741 break;
742 revents |= POLLIN | POLLRDNORM;
740 break; 743 break;
741 case TIPC_DISCONNECTING: 744 case TIPC_DISCONNECTING:
742 mask = (POLLIN | POLLRDNORM | POLLHUP); 745 revents = POLLIN | POLLRDNORM | POLLHUP;
743 break; 746 break;
744 } 747 }
745 748 return revents;
746 return mask;
747} 749}
748 750
749/** 751/**
@@ -1415,11 +1417,12 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1415 size_t buflen, int flags) 1417 size_t buflen, int flags)
1416{ 1418{
1417 struct sock *sk = sock->sk; 1419 struct sock *sk = sock->sk;
1418 struct tipc_sock *tsk = tipc_sk(sk);
1419 struct sk_buff *skb;
1420 struct tipc_msg *hdr;
1421 bool connected = !tipc_sk_type_connectionless(sk); 1420 bool connected = !tipc_sk_type_connectionless(sk);
1421 struct tipc_sock *tsk = tipc_sk(sk);
1422 int rc, err, hlen, dlen, copy; 1422 int rc, err, hlen, dlen, copy;
1423 struct tipc_msg *hdr;
1424 struct sk_buff *skb;
1425 bool grp_evt;
1423 long timeout; 1426 long timeout;
1424 1427
1425 /* Catch invalid receive requests */ 1428 /* Catch invalid receive requests */
@@ -1443,6 +1446,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1443 dlen = msg_data_sz(hdr); 1446 dlen = msg_data_sz(hdr);
1444 hlen = msg_hdr_sz(hdr); 1447 hlen = msg_hdr_sz(hdr);
1445 err = msg_errcode(hdr); 1448 err = msg_errcode(hdr);
1449 grp_evt = msg_is_grp_evt(hdr);
1446 if (likely(dlen || err)) 1450 if (likely(dlen || err))
1447 break; 1451 break;
1448 tsk_advance_rx_queue(sk); 1452 tsk_advance_rx_queue(sk);
@@ -1469,11 +1473,20 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1469 if (unlikely(rc)) 1473 if (unlikely(rc))
1470 goto exit; 1474 goto exit;
1471 1475
1476 /* Mark message as group event if applicable */
1477 if (unlikely(grp_evt)) {
1478 if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
1479 m->msg_flags |= MSG_EOR;
1480 m->msg_flags |= MSG_OOB;
1481 copy = 0;
1482 }
1483
1472 /* Caption of data or error code/rejected data was successful */ 1484 /* Caption of data or error code/rejected data was successful */
1473 if (unlikely(flags & MSG_PEEK)) 1485 if (unlikely(flags & MSG_PEEK))
1474 goto exit; 1486 goto exit;
1475 1487
1476 tsk_advance_rx_queue(sk); 1488 tsk_advance_rx_queue(sk);
1489
1477 if (likely(!connected)) 1490 if (likely(!connected))
1478 goto exit; 1491 goto exit;
1479 1492
@@ -1648,10 +1661,10 @@ static void tipc_sk_proto_rcv(struct sock *sk,
1648 sk->sk_write_space(sk); 1661 sk->sk_write_space(sk);
1649 break; 1662 break;
1650 case GROUP_PROTOCOL: 1663 case GROUP_PROTOCOL:
1651 tipc_group_proto_rcv(grp, hdr, xmitq); 1664 tipc_group_proto_rcv(grp, hdr, inputq, xmitq);
1652 break; 1665 break;
1653 case TOP_SRV: 1666 case TOP_SRV:
1654 tipc_group_member_evt(tsk->group, skb, xmitq); 1667 tipc_group_member_evt(tsk->group, skb, inputq, xmitq);
1655 skb = NULL; 1668 skb = NULL;
1656 break; 1669 break;
1657 default: 1670 default: