aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/name_distr.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-07-16 20:40:58 -0400
committerDavid S. Miller <davem@davemloft.net>2014-07-17 00:38:18 -0400
commitdbdf6d24ad37d63938f29a2d134a1a9f6e9e673c (patch)
tree3c2046727403ba015f699b835fc374b58093c8b4 /net/tipc/name_distr.c
parenta9f559c37b582c9eb12f82ac9bb77476cfda6309 (diff)
tipc: make name table distributor use new send function
In a previous commit series ("tipc: new unicast transmission code") we introduced a new message sending function, tipc_link_xmit2(), and moved the unicast data users over to use that function. We now let the internal name table distributor do the same. The interaction between the name distributor and the node/link layer also becomes significantly simpler, so we can eliminate the function tipc_link_names_xmit(). Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/name_distr.c')
-rw-r--r--net/tipc/name_distr.c76
1 files changed, 44 insertions, 32 deletions
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 8ce730984aa1..d16f9475fa76 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
101 101
102void named_cluster_distribute(struct sk_buff *buf) 102void named_cluster_distribute(struct sk_buff *buf)
103{ 103{
104 struct sk_buff *buf_copy; 104 struct sk_buff *obuf;
105 struct tipc_node *n_ptr; 105 struct tipc_node *node;
106 struct tipc_link *l_ptr; 106 u32 dnode;
107 107
108 rcu_read_lock(); 108 rcu_read_lock();
109 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 109 list_for_each_entry_rcu(node, &tipc_node_list, list) {
110 tipc_node_lock(n_ptr); 110 dnode = node->addr;
111 l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 111 if (in_own_node(dnode))
112 if (l_ptr) { 112 continue;
113 buf_copy = skb_copy(buf, GFP_ATOMIC); 113 if (!tipc_node_active_links(node))
114 if (!buf_copy) { 114 continue;
115 tipc_node_unlock(n_ptr); 115 obuf = skb_copy(buf, GFP_ATOMIC);
116 break; 116 if (!obuf)
117 } 117 break;
118 msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); 118 msg_set_destnode(buf_msg(obuf), dnode);
119 __tipc_link_xmit(l_ptr, buf_copy); 119 tipc_link_xmit2(obuf, dnode, dnode);
120 }
121 tipc_node_unlock(n_ptr);
122 } 120 }
123 rcu_read_unlock(); 121 rcu_read_unlock();
124 122
@@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
175 return buf; 173 return buf;
176} 174}
177 175
178/* 176/**
179 * named_distribute - prepare name info for bulk distribution to another node 177 * named_distribute - prepare name info for bulk distribution to another node
178 * @msg_list: list of messages (buffers) to be returned from this function
179 * @dnode: node to be updated
180 * @pls: linked list of publication items to be packed into buffer chain
180 */ 181 */
181static void named_distribute(struct list_head *message_list, u32 node, 182static void named_distribute(struct list_head *msg_list, u32 dnode,
182 struct publ_list *pls, u32 max_item_buf) 183 struct publ_list *pls)
183{ 184{
184 struct publication *publ; 185 struct publication *publ;
185 struct sk_buff *buf = NULL; 186 struct sk_buff *buf = NULL;
186 struct distr_item *item = NULL; 187 struct distr_item *item = NULL;
187 u32 left = 0; 188 uint dsz = pls->size * ITEM_SIZE;
188 u32 rest = pls->size * ITEM_SIZE; 189 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
190 uint rem = dsz;
191 uint msg_rem = 0;
189 192
190 list_for_each_entry(publ, &pls->list, local_list) { 193 list_for_each_entry(publ, &pls->list, local_list) {
194 /* Prepare next buffer: */
191 if (!buf) { 195 if (!buf) {
192 left = (rest <= max_item_buf) ? rest : max_item_buf; 196 msg_rem = min_t(uint, rem, msg_dsz);
193 rest -= left; 197 rem -= msg_rem;
194 buf = named_prepare_buf(PUBLICATION, left, node); 198 buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
195 if (!buf) { 199 if (!buf) {
196 pr_warn("Bulk publication failure\n"); 200 pr_warn("Bulk publication failure\n");
197 return; 201 return;
198 } 202 }
199 item = (struct distr_item *)msg_data(buf_msg(buf)); 203 item = (struct distr_item *)msg_data(buf_msg(buf));
200 } 204 }
205
206 /* Pack publication into message: */
201 publ_to_item(item, publ); 207 publ_to_item(item, publ);
202 item++; 208 item++;
203 left -= ITEM_SIZE; 209 msg_rem -= ITEM_SIZE;
204 if (!left) { 210
205 list_add_tail((struct list_head *)buf, message_list); 211 /* Append full buffer to list: */
212 if (!msg_rem) {
213 list_add_tail((struct list_head *)buf, msg_list);
206 buf = NULL; 214 buf = NULL;
207 } 215 }
208 } 216 }
@@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
211/** 219/**
212 * tipc_named_node_up - tell specified node about all publications by this node 220 * tipc_named_node_up - tell specified node about all publications by this node
213 */ 221 */
214void tipc_named_node_up(u32 max_item_buf, u32 node) 222void tipc_named_node_up(u32 dnode)
215{ 223{
216 LIST_HEAD(message_list); 224 LIST_HEAD(msg_list);
225 struct sk_buff *buf_chain;
217 226
218 read_lock_bh(&tipc_nametbl_lock); 227 read_lock_bh(&tipc_nametbl_lock);
219 named_distribute(&message_list, node, &publ_cluster, max_item_buf); 228 named_distribute(&msg_list, dnode, &publ_cluster);
220 named_distribute(&message_list, node, &publ_zone, max_item_buf); 229 named_distribute(&msg_list, dnode, &publ_zone);
221 read_unlock_bh(&tipc_nametbl_lock); 230 read_unlock_bh(&tipc_nametbl_lock);
222 231
223 tipc_link_names_xmit(&message_list, node); 232 /* Convert circular list to linear list and send: */
233 buf_chain = (struct sk_buff *)msg_list.next;
234 ((struct sk_buff *)msg_list.prev)->next = NULL;
235 tipc_link_xmit2(buf_chain, dnode, dnode);
224} 236}
225 237
226/** 238/**