aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-01-08 15:03:26 -0500
committerDavid S. Miller <davem@davemloft.net>2018-01-09 12:35:57 -0500
commit7ad32bcb7855ae8a60a8cf98e1b9da77cfdba4d0 (patch)
treeda31cea7f49f2793e536b42da165cea1caf00a1d /net/tipc
parent0233493a5fad227645f7f02539cb42db72e76030 (diff)
tipc: create group member event messages when they are needed
In the current implementation, a group socket receiving topology events about other members just converts the topology event message into a group event message and stores it until it reaches the right state to issue it to the user. This complicates the code unnecessarily, and becomes impractical when we in the coming commits will need to create and issue membership events independently. In this commit, we change this so that we just notice the type and origin of the incoming topology event, and then drop the buffer. Only when it is time to actually send a group event to the user do we explicitly create a new message and send it upwards. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/group.c95
-rw-r--r--net/tipc/group.h2
-rw-r--r--net/tipc/socket.c3
3 files changed, 56 insertions, 44 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index a352e098f0e7..e08b7acc7b2d 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -64,7 +64,6 @@ struct tipc_member {
64 struct rb_node tree_node; 64 struct rb_node tree_node;
65 struct list_head list; 65 struct list_head list;
66 struct list_head small_win; 66 struct list_head small_win;
67 struct sk_buff *event_msg;
68 struct sk_buff_head deferredq; 67 struct sk_buff_head deferredq;
69 struct tipc_group *group; 68 struct tipc_group *group;
70 u32 node; 69 u32 node;
@@ -632,6 +631,40 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
632 } 631 }
633} 632}
634 633
634static void tipc_group_create_event(struct tipc_group *grp,
635 struct tipc_member *m,
636 u32 event, u16 seqno,
637 struct sk_buff_head *inputq)
638{ u32 dnode = tipc_own_addr(grp->net);
639 struct tipc_event evt;
640 struct sk_buff *skb;
641 struct tipc_msg *hdr;
642
643 evt.event = event;
644 evt.found_lower = m->instance;
645 evt.found_upper = m->instance;
646 evt.port.ref = m->port;
647 evt.port.node = m->node;
648 evt.s.seq.type = grp->type;
649 evt.s.seq.lower = m->instance;
650 evt.s.seq.upper = m->instance;
651
652 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
653 GROUP_H_SIZE, sizeof(evt), dnode, m->node,
654 grp->portid, m->port, 0);
655 if (!skb)
656 return;
657
658 hdr = buf_msg(skb);
659 msg_set_nametype(hdr, grp->type);
660 msg_set_grp_evt(hdr, event);
661 msg_set_dest_droppable(hdr, true);
662 msg_set_grp_bc_seqno(hdr, seqno);
663 memcpy(msg_data(hdr), &evt, sizeof(evt));
664 TIPC_SKB_CB(skb)->orig_member = m->instance;
665 __skb_queue_tail(inputq, skb);
666}
667
635static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 668static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
636 int mtyp, struct sk_buff_head *xmitq) 669 int mtyp, struct sk_buff_head *xmitq)
637{ 670{
@@ -677,7 +710,6 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
677 u32 node = msg_orignode(hdr); 710 u32 node = msg_orignode(hdr);
678 u32 port = msg_origport(hdr); 711 u32 port = msg_origport(hdr);
679 struct tipc_member *m, *pm; 712 struct tipc_member *m, *pm;
680 struct tipc_msg *ehdr;
681 u16 remitted, in_flight; 713 u16 remitted, in_flight;
682 714
683 if (!grp) 715 if (!grp)
@@ -704,9 +736,8 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
704 *usr_wakeup = true; 736 *usr_wakeup = true;
705 m->usr_pending = false; 737 m->usr_pending = false;
706 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 738 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
707 ehdr = buf_msg(m->event_msg); 739 tipc_group_create_event(grp, m, TIPC_PUBLISHED,
708 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 740 m->bc_syncpt, inputq);
709 __skb_queue_tail(inputq, m->event_msg);
710 } 741 }
711 list_del_init(&m->small_win); 742 list_del_init(&m->small_win);
712 tipc_group_update_member(m, 0); 743 tipc_group_update_member(m, 0);
@@ -725,10 +756,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
725 m->state = MBR_LEAVING; 756 m->state = MBR_LEAVING;
726 return; 757 return;
727 } 758 }
728 /* Otherwise deliver already received WITHDRAW event */ 759 /* Otherwise deliver member WITHDRAW event */
729 ehdr = buf_msg(m->event_msg); 760 tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
730 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 761 m->bc_syncpt, inputq);
731 __skb_queue_tail(inputq, m->event_msg);
732 return; 762 return;
733 case GRP_ADV_MSG: 763 case GRP_ADV_MSG:
734 if (!m) 764 if (!m)
@@ -797,11 +827,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
797void tipc_group_member_evt(struct tipc_group *grp, 827void tipc_group_member_evt(struct tipc_group *grp,
798 bool *usr_wakeup, 828 bool *usr_wakeup,
799 int *sk_rcvbuf, 829 int *sk_rcvbuf,
800 struct sk_buff *skb, 830 struct tipc_msg *hdr,
801 struct sk_buff_head *inputq, 831 struct sk_buff_head *inputq,
802 struct sk_buff_head *xmitq) 832 struct sk_buff_head *xmitq)
803{ 833{
804 struct tipc_msg *hdr = buf_msg(skb);
805 struct tipc_event *evt = (void *)msg_data(hdr); 834 struct tipc_event *evt = (void *)msg_data(hdr);
806 u32 instance = evt->found_lower; 835 u32 instance = evt->found_lower;
807 u32 node = evt->port.node; 836 u32 node = evt->port.node;
@@ -813,21 +842,12 @@ void tipc_group_member_evt(struct tipc_group *grp,
813 u32 self; 842 u32 self;
814 843
815 if (!grp) 844 if (!grp)
816 goto drop; 845 return;
817 846
818 net = grp->net; 847 net = grp->net;
819 self = tipc_own_addr(net); 848 self = tipc_own_addr(net);
820 if (!grp->loopback && node == self && port == grp->portid) 849 if (!grp->loopback && node == self && port == grp->portid)
821 goto drop; 850 return;
822
823 /* Convert message before delivery to user */
824 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
825 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
826 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
827 msg_set_origport(hdr, port);
828 msg_set_orignode(hdr, node);
829 msg_set_nametype(hdr, grp->type);
830 msg_set_grp_evt(hdr, event);
831 851
832 m = tipc_group_find_member(grp, node, port); 852 m = tipc_group_find_member(grp, node, port);
833 853
@@ -836,59 +856,52 @@ void tipc_group_member_evt(struct tipc_group *grp,
836 m = tipc_group_create_member(grp, node, port, 856 m = tipc_group_create_member(grp, node, port,
837 MBR_DISCOVERED); 857 MBR_DISCOVERED);
838 if (!m) 858 if (!m)
839 goto drop; 859 return;
860
861 m->instance = instance;
840 862
841 /* Hold back event if JOIN message not yet received */ 863 /* Hold back event if JOIN message not yet received */
842 if (m->state == MBR_DISCOVERED) { 864 if (m->state == MBR_DISCOVERED) {
843 m->event_msg = skb;
844 m->state = MBR_PUBLISHED; 865 m->state = MBR_PUBLISHED;
845 } else { 866 } else {
846 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 867 tipc_group_create_event(grp, m, TIPC_PUBLISHED,
847 __skb_queue_tail(inputq, skb); 868 m->bc_syncpt, inputq);
848 m->state = MBR_JOINED; 869 m->state = MBR_JOINED;
849 *usr_wakeup = true; 870 *usr_wakeup = true;
850 m->usr_pending = false; 871 m->usr_pending = false;
851 } 872 }
852 m->instance = instance;
853 TIPC_SKB_CB(skb)->orig_member = m->instance;
854 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 873 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
855 tipc_group_update_member(m, 0); 874 tipc_group_update_member(m, 0);
856 } else if (event == TIPC_WITHDRAWN) { 875 } else if (event == TIPC_WITHDRAWN) {
857 if (!m) 876 if (!m)
858 goto drop; 877 return;
859
860 TIPC_SKB_CB(skb)->orig_member = m->instance;
861 878
862 *usr_wakeup = true; 879 *usr_wakeup = true;
863 m->usr_pending = false; 880 m->usr_pending = false;
864 node_up = tipc_node_is_up(net, node); 881 node_up = tipc_node_is_up(net, node);
865 m->event_msg = NULL;
866 882
867 if (node_up) { 883 if (node_up) {
868 /* Hold back event if a LEAVE msg should be expected */ 884 /* Hold back event if a LEAVE msg should be expected */
869 if (m->state != MBR_LEAVING) { 885 if (m->state != MBR_LEAVING) {
870 m->event_msg = skb;
871 tipc_group_decr_active(grp, m); 886 tipc_group_decr_active(grp, m);
872 m->state = MBR_LEAVING; 887 m->state = MBR_LEAVING;
873 } else { 888 } else {
874 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 889 tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
875 __skb_queue_tail(inputq, skb); 890 m->bc_syncpt, inputq);
876 } 891 }
877 } else { 892 } else {
878 if (m->state != MBR_LEAVING) { 893 if (m->state != MBR_LEAVING) {
879 tipc_group_decr_active(grp, m); 894 tipc_group_decr_active(grp, m);
880 m->state = MBR_LEAVING; 895 m->state = MBR_LEAVING;
881 msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); 896 tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
897 m->bc_rcv_nxt, inputq);
882 } else { 898 } else {
883 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 899 tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
900 m->bc_syncpt, inputq);
884 } 901 }
885 __skb_queue_tail(inputq, skb);
886 } 902 }
887 list_del_init(&m->list); 903 list_del_init(&m->list);
888 list_del_init(&m->small_win); 904 list_del_init(&m->small_win);
889 } 905 }
890 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 906 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
891 return;
892drop:
893 kfree_skb(skb);
894} 907}
diff --git a/net/tipc/group.h b/net/tipc/group.h
index d525e1cd7de5..5ffffd0121a2 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -54,7 +54,7 @@ void tipc_group_filter_msg(struct tipc_group *grp,
54 struct sk_buff_head *inputq, 54 struct sk_buff_head *inputq,
55 struct sk_buff_head *xmitq); 55 struct sk_buff_head *xmitq);
56void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup, 56void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup,
57 int *sk_rcvbuf, struct sk_buff *skb, 57 int *sk_rcvbuf, struct tipc_msg *hdr,
58 struct sk_buff_head *inputq, 58 struct sk_buff_head *inputq,
59 struct sk_buff_head *xmitq); 59 struct sk_buff_head *xmitq);
60void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, 60void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index b51d5cba5094..36744ebef74f 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1933,8 +1933,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
1933 break; 1933 break;
1934 case TOP_SRV: 1934 case TOP_SRV:
1935 tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf, 1935 tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
1936 skb, inputq, xmitq); 1936 hdr, inputq, xmitq);
1937 skb = NULL;
1938 break; 1937 break;
1939 default: 1938 default:
1940 break; 1939 break;