aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-07-16 20:40:58 -0400
committerDavid S. Miller <davem@davemloft.net>2014-07-17 00:38:18 -0400
commitdbdf6d24ad37d63938f29a2d134a1a9f6e9e673c (patch)
tree3c2046727403ba015f699b835fc374b58093c8b4 /net/tipc
parenta9f559c37b582c9eb12f82ac9bb77476cfda6309 (diff)
tipc: make name table distributor use new send function
In a previous commit series ("tipc: new unicast transmission code") we introduced a new message sending function, tipc_link_xmit2(), and moved the unicast data users over to use that function. We now let the internal name table distributor do the same. The interaction between the name distributor and the node/link layer also becomes significantly simpler, so we can eliminate the function tipc_link_names_xmit(). Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/link.c41
-rw-r--r--net/tipc/link.h1
-rw-r--r--net/tipc/name_distr.c76
-rw-r--r--net/tipc/name_distr.h2
-rw-r--r--net/tipc/node.c13
5 files changed, 48 insertions, 85 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index a235b245f682..367b0f5886f8 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1033,47 +1033,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
1033} 1033}
1034 1034
1035/* 1035/*
1036 * tipc_link_names_xmit - send name table entries to new neighbor
1037 *
1038 * Send routine for bulk delivery of name table messages when contact
1039 * with a new neighbor occurs. No link congestion checking is performed
1040 * because name table messages *must* be delivered. The messages must be
1041 * small enough not to require fragmentation.
1042 * Called without any locks held.
1043 */
1044void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
1045{
1046 struct tipc_node *n_ptr;
1047 struct tipc_link *l_ptr;
1048 struct sk_buff *buf;
1049 struct sk_buff *temp_buf;
1050
1051 if (list_empty(message_list))
1052 return;
1053
1054 n_ptr = tipc_node_find(dest);
1055 if (n_ptr) {
1056 tipc_node_lock(n_ptr);
1057 l_ptr = n_ptr->active_links[0];
1058 if (l_ptr) {
1059 /* convert circular list to linear list */
1060 ((struct sk_buff *)message_list->prev)->next = NULL;
1061 link_add_chain_to_outqueue(l_ptr,
1062 (struct sk_buff *)message_list->next, 0);
1063 tipc_link_push_queue(l_ptr);
1064 INIT_LIST_HEAD(message_list);
1065 }
1066 tipc_node_unlock(n_ptr);
1067 }
1068
1069 /* discard the messages if they couldn't be sent */
1070 list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
1071 list_del((struct list_head *)buf);
1072 kfree_skb(buf);
1073 }
1074}
1075
1076/*
1077 * tipc_link_push_packet: Push one unsent packet to the media 1036 * tipc_link_push_packet: Push one unsent packet to the media
1078 */ 1037 */
1079static u32 tipc_link_push_packet(struct tipc_link *l_ptr) 1038static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 227ff8120897..04a59c58d385 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -228,7 +228,6 @@ void tipc_link_reset(struct tipc_link *l_ptr);
228void tipc_link_reset_list(unsigned int bearer_id); 228void tipc_link_reset_list(unsigned int bearer_id);
229int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); 229int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
230int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector); 230int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
231void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
232int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); 231int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
233int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf); 232int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
234int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); 233int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 8ce730984aa1..d16f9475fa76 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
101 101
102void named_cluster_distribute(struct sk_buff *buf) 102void named_cluster_distribute(struct sk_buff *buf)
103{ 103{
104 struct sk_buff *buf_copy; 104 struct sk_buff *obuf;
105 struct tipc_node *n_ptr; 105 struct tipc_node *node;
106 struct tipc_link *l_ptr; 106 u32 dnode;
107 107
108 rcu_read_lock(); 108 rcu_read_lock();
109 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 109 list_for_each_entry_rcu(node, &tipc_node_list, list) {
110 tipc_node_lock(n_ptr); 110 dnode = node->addr;
111 l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 111 if (in_own_node(dnode))
112 if (l_ptr) { 112 continue;
113 buf_copy = skb_copy(buf, GFP_ATOMIC); 113 if (!tipc_node_active_links(node))
114 if (!buf_copy) { 114 continue;
115 tipc_node_unlock(n_ptr); 115 obuf = skb_copy(buf, GFP_ATOMIC);
116 break; 116 if (!obuf)
117 } 117 break;
118 msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); 118 msg_set_destnode(buf_msg(obuf), dnode);
119 __tipc_link_xmit(l_ptr, buf_copy); 119 tipc_link_xmit2(obuf, dnode, dnode);
120 }
121 tipc_node_unlock(n_ptr);
122 } 120 }
123 rcu_read_unlock(); 121 rcu_read_unlock();
124 122
@@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
175 return buf; 173 return buf;
176} 174}
177 175
178/* 176/**
179 * named_distribute - prepare name info for bulk distribution to another node 177 * named_distribute - prepare name info for bulk distribution to another node
178 * @msg_list: list of messages (buffers) to be returned from this function
179 * @dnode: node to be updated
180 * @pls: linked list of publication items to be packed into buffer chain
180 */ 181 */
181static void named_distribute(struct list_head *message_list, u32 node, 182static void named_distribute(struct list_head *msg_list, u32 dnode,
182 struct publ_list *pls, u32 max_item_buf) 183 struct publ_list *pls)
183{ 184{
184 struct publication *publ; 185 struct publication *publ;
185 struct sk_buff *buf = NULL; 186 struct sk_buff *buf = NULL;
186 struct distr_item *item = NULL; 187 struct distr_item *item = NULL;
187 u32 left = 0; 188 uint dsz = pls->size * ITEM_SIZE;
188 u32 rest = pls->size * ITEM_SIZE; 189 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
190 uint rem = dsz;
191 uint msg_rem = 0;
189 192
190 list_for_each_entry(publ, &pls->list, local_list) { 193 list_for_each_entry(publ, &pls->list, local_list) {
194 /* Prepare next buffer: */
191 if (!buf) { 195 if (!buf) {
192 left = (rest <= max_item_buf) ? rest : max_item_buf; 196 msg_rem = min_t(uint, rem, msg_dsz);
193 rest -= left; 197 rem -= msg_rem;
194 buf = named_prepare_buf(PUBLICATION, left, node); 198 buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
195 if (!buf) { 199 if (!buf) {
196 pr_warn("Bulk publication failure\n"); 200 pr_warn("Bulk publication failure\n");
197 return; 201 return;
198 } 202 }
199 item = (struct distr_item *)msg_data(buf_msg(buf)); 203 item = (struct distr_item *)msg_data(buf_msg(buf));
200 } 204 }
205
206 /* Pack publication into message: */
201 publ_to_item(item, publ); 207 publ_to_item(item, publ);
202 item++; 208 item++;
203 left -= ITEM_SIZE; 209 msg_rem -= ITEM_SIZE;
204 if (!left) { 210
205 list_add_tail((struct list_head *)buf, message_list); 211 /* Append full buffer to list: */
212 if (!msg_rem) {
213 list_add_tail((struct list_head *)buf, msg_list);
206 buf = NULL; 214 buf = NULL;
207 } 215 }
208 } 216 }
@@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
211/** 219/**
212 * tipc_named_node_up - tell specified node about all publications by this node 220 * tipc_named_node_up - tell specified node about all publications by this node
213 */ 221 */
214void tipc_named_node_up(u32 max_item_buf, u32 node) 222void tipc_named_node_up(u32 dnode)
215{ 223{
216 LIST_HEAD(message_list); 224 LIST_HEAD(msg_list);
225 struct sk_buff *buf_chain;
217 226
218 read_lock_bh(&tipc_nametbl_lock); 227 read_lock_bh(&tipc_nametbl_lock);
219 named_distribute(&message_list, node, &publ_cluster, max_item_buf); 228 named_distribute(&msg_list, dnode, &publ_cluster);
220 named_distribute(&message_list, node, &publ_zone, max_item_buf); 229 named_distribute(&msg_list, dnode, &publ_zone);
221 read_unlock_bh(&tipc_nametbl_lock); 230 read_unlock_bh(&tipc_nametbl_lock);
222 231
223 tipc_link_names_xmit(&message_list, node); 232 /* Convert circular list to linear list and send: */
233 buf_chain = (struct sk_buff *)msg_list.next;
234 ((struct sk_buff *)msg_list.prev)->next = NULL;
235 tipc_link_xmit2(buf_chain, dnode, dnode);
224} 236}
225 237
226/** 238/**
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index b2eed4ec1526..8afe32b7fc9a 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -70,7 +70,7 @@ struct distr_item {
70struct sk_buff *tipc_named_publish(struct publication *publ); 70struct sk_buff *tipc_named_publish(struct publication *publ);
71struct sk_buff *tipc_named_withdraw(struct publication *publ); 71struct sk_buff *tipc_named_withdraw(struct publication *publ);
72void named_cluster_distribute(struct sk_buff *buf); 72void named_cluster_distribute(struct sk_buff *buf);
73void tipc_named_node_up(u32 max_item_buf, u32 node); 73void tipc_named_node_up(u32 dnode);
74void tipc_named_rcv(struct sk_buff *buf); 74void tipc_named_rcv(struct sk_buff *buf);
75void tipc_named_reinit(void); 75void tipc_named_reinit(void);
76 76
diff --git a/net/tipc/node.c b/net/tipc/node.c
index d959343043fd..f7069299943f 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -474,8 +474,6 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
474void tipc_node_unlock(struct tipc_node *node) 474void tipc_node_unlock(struct tipc_node *node)
475{ 475{
476 LIST_HEAD(nsub_list); 476 LIST_HEAD(nsub_list);
477 struct tipc_link *link;
478 int pkt_sz = 0;
479 u32 addr = 0; 477 u32 addr = 0;
480 478
481 if (likely(!node->action_flags)) { 479 if (likely(!node->action_flags)) {
@@ -488,18 +486,13 @@ void tipc_node_unlock(struct tipc_node *node)
488 node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; 486 node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
489 } 487 }
490 if (node->action_flags & TIPC_NOTIFY_NODE_UP) { 488 if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
491 link = node->active_links[0];
492 node->action_flags &= ~TIPC_NOTIFY_NODE_UP; 489 node->action_flags &= ~TIPC_NOTIFY_NODE_UP;
493 if (link) { 490 addr = node->addr;
494 pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
495 ITEM_SIZE;
496 addr = node->addr;
497 }
498 } 491 }
499 spin_unlock_bh(&node->lock); 492 spin_unlock_bh(&node->lock);
500 493
501 if (!list_empty(&nsub_list)) 494 if (!list_empty(&nsub_list))
502 tipc_nodesub_notify(&nsub_list); 495 tipc_nodesub_notify(&nsub_list);
503 if (pkt_sz) 496 if (addr)
504 tipc_named_node_up(pkt_sz, addr); 497 tipc_named_node_up(addr);
505} 498}