aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/name_distr.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/name_distr.c')
-rw-r--r--net/tipc/name_distr.c76
1 files changed, 44 insertions, 32 deletions
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 8ce730984aa1..dcc15bcd5692 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -101,24 +101,22 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
101 101
102void named_cluster_distribute(struct sk_buff *buf) 102void named_cluster_distribute(struct sk_buff *buf)
103{ 103{
104 struct sk_buff *buf_copy; 104 struct sk_buff *obuf;
105 struct tipc_node *n_ptr; 105 struct tipc_node *node;
106 struct tipc_link *l_ptr; 106 u32 dnode;
107 107
108 rcu_read_lock(); 108 rcu_read_lock();
109 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 109 list_for_each_entry_rcu(node, &tipc_node_list, list) {
110 tipc_node_lock(n_ptr); 110 dnode = node->addr;
111 l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 111 if (in_own_node(dnode))
112 if (l_ptr) { 112 continue;
113 buf_copy = skb_copy(buf, GFP_ATOMIC); 113 if (!tipc_node_active_links(node))
114 if (!buf_copy) { 114 continue;
115 tipc_node_unlock(n_ptr); 115 obuf = skb_copy(buf, GFP_ATOMIC);
116 break; 116 if (!obuf)
117 } 117 break;
118 msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); 118 msg_set_destnode(buf_msg(obuf), dnode);
119 __tipc_link_xmit(l_ptr, buf_copy); 119 tipc_link_xmit(obuf, dnode, dnode);
120 }
121 tipc_node_unlock(n_ptr);
122 } 120 }
123 rcu_read_unlock(); 121 rcu_read_unlock();
124 122
@@ -175,34 +173,44 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
175 return buf; 173 return buf;
176} 174}
177 175
178/* 176/**
179 * named_distribute - prepare name info for bulk distribution to another node 177 * named_distribute - prepare name info for bulk distribution to another node
178 * @msg_list: list of messages (buffers) to be returned from this function
179 * @dnode: node to be updated
180 * @pls: linked list of publication items to be packed into buffer chain
180 */ 181 */
181static void named_distribute(struct list_head *message_list, u32 node, 182static void named_distribute(struct list_head *msg_list, u32 dnode,
182 struct publ_list *pls, u32 max_item_buf) 183 struct publ_list *pls)
183{ 184{
184 struct publication *publ; 185 struct publication *publ;
185 struct sk_buff *buf = NULL; 186 struct sk_buff *buf = NULL;
186 struct distr_item *item = NULL; 187 struct distr_item *item = NULL;
187 u32 left = 0; 188 uint dsz = pls->size * ITEM_SIZE;
188 u32 rest = pls->size * ITEM_SIZE; 189 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
190 uint rem = dsz;
191 uint msg_rem = 0;
189 192
190 list_for_each_entry(publ, &pls->list, local_list) { 193 list_for_each_entry(publ, &pls->list, local_list) {
194 /* Prepare next buffer: */
191 if (!buf) { 195 if (!buf) {
192 left = (rest <= max_item_buf) ? rest : max_item_buf; 196 msg_rem = min_t(uint, rem, msg_dsz);
193 rest -= left; 197 rem -= msg_rem;
194 buf = named_prepare_buf(PUBLICATION, left, node); 198 buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
195 if (!buf) { 199 if (!buf) {
196 pr_warn("Bulk publication failure\n"); 200 pr_warn("Bulk publication failure\n");
197 return; 201 return;
198 } 202 }
199 item = (struct distr_item *)msg_data(buf_msg(buf)); 203 item = (struct distr_item *)msg_data(buf_msg(buf));
200 } 204 }
205
206 /* Pack publication into message: */
201 publ_to_item(item, publ); 207 publ_to_item(item, publ);
202 item++; 208 item++;
203 left -= ITEM_SIZE; 209 msg_rem -= ITEM_SIZE;
204 if (!left) { 210
205 list_add_tail((struct list_head *)buf, message_list); 211 /* Append full buffer to list: */
212 if (!msg_rem) {
213 list_add_tail((struct list_head *)buf, msg_list);
206 buf = NULL; 214 buf = NULL;
207 } 215 }
208 } 216 }
@@ -211,16 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
211/** 219/**
212 * tipc_named_node_up - tell specified node about all publications by this node 220 * tipc_named_node_up - tell specified node about all publications by this node
213 */ 221 */
214void tipc_named_node_up(u32 max_item_buf, u32 node) 222void tipc_named_node_up(u32 dnode)
215{ 223{
216 LIST_HEAD(message_list); 224 LIST_HEAD(msg_list);
225 struct sk_buff *buf_chain;
217 226
218 read_lock_bh(&tipc_nametbl_lock); 227 read_lock_bh(&tipc_nametbl_lock);
219 named_distribute(&message_list, node, &publ_cluster, max_item_buf); 228 named_distribute(&msg_list, dnode, &publ_cluster);
220 named_distribute(&message_list, node, &publ_zone, max_item_buf); 229 named_distribute(&msg_list, dnode, &publ_zone);
221 read_unlock_bh(&tipc_nametbl_lock); 230 read_unlock_bh(&tipc_nametbl_lock);
222 231
223 tipc_link_names_xmit(&message_list, node); 232 /* Convert circular list to linear list and send: */
233 buf_chain = (struct sk_buff *)msg_list.next;
234 ((struct sk_buff *)msg_list.prev)->next = NULL;
235 tipc_link_xmit(buf_chain, dnode, dnode);
224} 236}
225 237
226/** 238/**