aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/msg.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/msg.c')
-rw-r--r--net/tipc/msg.c270
1 files changed, 164 insertions, 106 deletions
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 74745a47d72a..b6eb90cd3ef7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -34,6 +34,7 @@
34 * POSSIBILITY OF SUCH DAMAGE. 34 * POSSIBILITY OF SUCH DAMAGE.
35 */ 35 */
36 36
37#include <net/sock.h>
37#include "core.h" 38#include "core.h"
38#include "msg.h" 39#include "msg.h"
39#include "addr.h" 40#include "addr.h"
@@ -46,25 +47,48 @@ static unsigned int align(unsigned int i)
46 return (i + 3) & ~3u; 47 return (i + 3) & ~3u;
47} 48}
48 49
49void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize, 50/**
50 u32 destnode) 51 * tipc_buf_acquire - creates a TIPC message buffer
52 * @size: message size (including TIPC header)
53 *
54 * Returns a new buffer with data pointers set to the specified size.
55 *
56 * NOTE: Headroom is reserved to allow prepending of a data link header.
57 * There may also be unrequested tailroom present at the buffer's end.
58 */
59struct sk_buff *tipc_buf_acquire(u32 size)
60{
61 struct sk_buff *skb;
62 unsigned int buf_size = (BUF_HEADROOM + size + 3) & ~3u;
63
64 skb = alloc_skb_fclone(buf_size, GFP_ATOMIC);
65 if (skb) {
66 skb_reserve(skb, BUF_HEADROOM);
67 skb_put(skb, size);
68 skb->next = NULL;
69 }
70 return skb;
71}
72
73void tipc_msg_init(u32 own_node, struct tipc_msg *m, u32 user, u32 type,
74 u32 hsize, u32 dnode)
51{ 75{
52 memset(m, 0, hsize); 76 memset(m, 0, hsize);
53 msg_set_version(m); 77 msg_set_version(m);
54 msg_set_user(m, user); 78 msg_set_user(m, user);
55 msg_set_hdr_sz(m, hsize); 79 msg_set_hdr_sz(m, hsize);
56 msg_set_size(m, hsize); 80 msg_set_size(m, hsize);
57 msg_set_prevnode(m, tipc_own_addr); 81 msg_set_prevnode(m, own_node);
58 msg_set_type(m, type); 82 msg_set_type(m, type);
59 if (hsize > SHORT_H_SIZE) { 83 if (hsize > SHORT_H_SIZE) {
60 msg_set_orignode(m, tipc_own_addr); 84 msg_set_orignode(m, own_node);
61 msg_set_destnode(m, destnode); 85 msg_set_destnode(m, dnode);
62 } 86 }
63} 87}
64 88
65struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, 89struct sk_buff *tipc_msg_create(uint user, uint type,
66 uint data_sz, u32 dnode, u32 onode, 90 uint hdr_sz, uint data_sz, u32 dnode,
67 u32 dport, u32 oport, int errcode) 91 u32 onode, u32 dport, u32 oport, int errcode)
68{ 92{
69 struct tipc_msg *msg; 93 struct tipc_msg *msg;
70 struct sk_buff *buf; 94 struct sk_buff *buf;
@@ -74,9 +98,8 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
74 return NULL; 98 return NULL;
75 99
76 msg = buf_msg(buf); 100 msg = buf_msg(buf);
77 tipc_msg_init(msg, user, type, hdr_sz, dnode); 101 tipc_msg_init(onode, msg, user, type, hdr_sz, dnode);
78 msg_set_size(msg, hdr_sz + data_sz); 102 msg_set_size(msg, hdr_sz + data_sz);
79 msg_set_prevnode(msg, onode);
80 msg_set_origport(msg, oport); 103 msg_set_origport(msg, oport);
81 msg_set_destport(msg, dport); 104 msg_set_destport(msg, dport);
82 msg_set_errcode(msg, errcode); 105 msg_set_errcode(msg, errcode);
@@ -91,7 +114,7 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
91 * @*headbuf: in: NULL for first frag, otherwise value returned from prev call 114 * @*headbuf: in: NULL for first frag, otherwise value returned from prev call
92 * out: set when successful non-complete reassembly, otherwise NULL 115 * out: set when successful non-complete reassembly, otherwise NULL
93 * @*buf: in: the buffer to append. Always defined 116 * @*buf: in: the buffer to append. Always defined
94 * out: head buf after sucessful complete reassembly, otherwise NULL 117 * out: head buf after successful complete reassembly, otherwise NULL
95 * Returns 1 when reassembly complete, otherwise 0 118 * Returns 1 when reassembly complete, otherwise 0
96 */ 119 */
97int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) 120int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
@@ -162,15 +185,15 @@ err:
162/** 185/**
163 * tipc_msg_build - create buffer chain containing specified header and data 186 * tipc_msg_build - create buffer chain containing specified header and data
164 * @mhdr: Message header, to be prepended to data 187 * @mhdr: Message header, to be prepended to data
165 * @iov: User data 188 * @m: User message
166 * @offset: Posision in iov to start copying from
167 * @dsz: Total length of user data 189 * @dsz: Total length of user data
168 * @pktmax: Max packet size that can be used 190 * @pktmax: Max packet size that can be used
169 * @chain: Buffer or chain of buffers to be returned to caller 191 * @list: Buffer or chain of buffers to be returned to caller
192 *
170 * Returns message data size or errno: -ENOMEM, -EFAULT 193 * Returns message data size or errno: -ENOMEM, -EFAULT
171 */ 194 */
172int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov, 195int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
173 int offset, int dsz, int pktmax , struct sk_buff **chain) 196 int offset, int dsz, int pktmax, struct sk_buff_head *list)
174{ 197{
175 int mhsz = msg_hdr_sz(mhdr); 198 int mhsz = msg_hdr_sz(mhdr);
176 int msz = mhsz + dsz; 199 int msz = mhsz + dsz;
@@ -179,43 +202,44 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
179 int pktrem = pktmax; 202 int pktrem = pktmax;
180 int drem = dsz; 203 int drem = dsz;
181 struct tipc_msg pkthdr; 204 struct tipc_msg pkthdr;
182 struct sk_buff *buf, *prev; 205 struct sk_buff *skb;
183 char *pktpos; 206 char *pktpos;
184 int rc; 207 int rc;
185 uint chain_sz = 0; 208
186 msg_set_size(mhdr, msz); 209 msg_set_size(mhdr, msz);
187 210
188 /* No fragmentation needed? */ 211 /* No fragmentation needed? */
189 if (likely(msz <= pktmax)) { 212 if (likely(msz <= pktmax)) {
190 buf = tipc_buf_acquire(msz); 213 skb = tipc_buf_acquire(msz);
191 *chain = buf; 214 if (unlikely(!skb))
192 if (unlikely(!buf))
193 return -ENOMEM; 215 return -ENOMEM;
194 skb_copy_to_linear_data(buf, mhdr, mhsz); 216 skb_orphan(skb);
195 pktpos = buf->data + mhsz; 217 __skb_queue_tail(list, skb);
196 TIPC_SKB_CB(buf)->chain_sz = 1; 218 skb_copy_to_linear_data(skb, mhdr, mhsz);
197 if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz)) 219 pktpos = skb->data + mhsz;
220 if (copy_from_iter(pktpos, dsz, &m->msg_iter) == dsz)
198 return dsz; 221 return dsz;
199 rc = -EFAULT; 222 rc = -EFAULT;
200 goto error; 223 goto error;
201 } 224 }
202 225
203 /* Prepare reusable fragment header */ 226 /* Prepare reusable fragment header */
204 tipc_msg_init(&pkthdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 227 tipc_msg_init(msg_prevnode(mhdr), &pkthdr, MSG_FRAGMENTER,
205 INT_H_SIZE, msg_destnode(mhdr)); 228 FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));
206 msg_set_size(&pkthdr, pktmax); 229 msg_set_size(&pkthdr, pktmax);
207 msg_set_fragm_no(&pkthdr, pktno); 230 msg_set_fragm_no(&pkthdr, pktno);
208 231
209 /* Prepare first fragment */ 232 /* Prepare first fragment */
210 *chain = buf = tipc_buf_acquire(pktmax); 233 skb = tipc_buf_acquire(pktmax);
211 if (!buf) 234 if (!skb)
212 return -ENOMEM; 235 return -ENOMEM;
213 chain_sz = 1; 236 skb_orphan(skb);
214 pktpos = buf->data; 237 __skb_queue_tail(list, skb);
215 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); 238 pktpos = skb->data;
239 skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
216 pktpos += INT_H_SIZE; 240 pktpos += INT_H_SIZE;
217 pktrem -= INT_H_SIZE; 241 pktrem -= INT_H_SIZE;
218 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz); 242 skb_copy_to_linear_data_offset(skb, INT_H_SIZE, mhdr, mhsz);
219 pktpos += mhsz; 243 pktpos += mhsz;
220 pktrem -= mhsz; 244 pktrem -= mhsz;
221 245
@@ -223,12 +247,11 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
223 if (drem < pktrem) 247 if (drem < pktrem)
224 pktrem = drem; 248 pktrem = drem;
225 249
226 if (memcpy_fromiovecend(pktpos, iov, offset, pktrem)) { 250 if (copy_from_iter(pktpos, pktrem, &m->msg_iter) != pktrem) {
227 rc = -EFAULT; 251 rc = -EFAULT;
228 goto error; 252 goto error;
229 } 253 }
230 drem -= pktrem; 254 drem -= pktrem;
231 offset += pktrem;
232 255
233 if (!drem) 256 if (!drem)
234 break; 257 break;
@@ -238,43 +261,42 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
238 pktsz = drem + INT_H_SIZE; 261 pktsz = drem + INT_H_SIZE;
239 else 262 else
240 pktsz = pktmax; 263 pktsz = pktmax;
241 prev = buf; 264 skb = tipc_buf_acquire(pktsz);
242 buf = tipc_buf_acquire(pktsz); 265 if (!skb) {
243 if (!buf) {
244 rc = -ENOMEM; 266 rc = -ENOMEM;
245 goto error; 267 goto error;
246 } 268 }
247 chain_sz++; 269 skb_orphan(skb);
248 prev->next = buf; 270 __skb_queue_tail(list, skb);
249 msg_set_type(&pkthdr, FRAGMENT); 271 msg_set_type(&pkthdr, FRAGMENT);
250 msg_set_size(&pkthdr, pktsz); 272 msg_set_size(&pkthdr, pktsz);
251 msg_set_fragm_no(&pkthdr, ++pktno); 273 msg_set_fragm_no(&pkthdr, ++pktno);
252 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); 274 skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
253 pktpos = buf->data + INT_H_SIZE; 275 pktpos = skb->data + INT_H_SIZE;
254 pktrem = pktsz - INT_H_SIZE; 276 pktrem = pktsz - INT_H_SIZE;
255 277
256 } while (1); 278 } while (1);
257 TIPC_SKB_CB(*chain)->chain_sz = chain_sz; 279 msg_set_type(buf_msg(skb), LAST_FRAGMENT);
258 msg_set_type(buf_msg(buf), LAST_FRAGMENT);
259 return dsz; 280 return dsz;
260error: 281error:
261 kfree_skb_list(*chain); 282 __skb_queue_purge(list);
262 *chain = NULL; 283 __skb_queue_head_init(list);
263 return rc; 284 return rc;
264} 285}
265 286
266/** 287/**
267 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one 288 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
268 * @bbuf: the existing buffer ("bundle") 289 * @list: the buffer chain of the existing buffer ("bundle")
269 * @buf: buffer to be appended 290 * @skb: buffer to be appended
270 * @mtu: max allowable size for the bundle buffer 291 * @mtu: max allowable size for the bundle buffer
271 * Consumes buffer if successful 292 * Consumes buffer if successful
272 * Returns true if bundling could be performed, otherwise false 293 * Returns true if bundling could be performed, otherwise false
273 */ 294 */
274bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu) 295bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
275{ 296{
276 struct tipc_msg *bmsg = buf_msg(bbuf); 297 struct sk_buff *bskb = skb_peek_tail(list);
277 struct tipc_msg *msg = buf_msg(buf); 298 struct tipc_msg *bmsg = buf_msg(bskb);
299 struct tipc_msg *msg = buf_msg(skb);
278 unsigned int bsz = msg_size(bmsg); 300 unsigned int bsz = msg_size(bmsg);
279 unsigned int msz = msg_size(msg); 301 unsigned int msz = msg_size(msg);
280 u32 start = align(bsz); 302 u32 start = align(bsz);
@@ -289,35 +311,70 @@ bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
289 return false; 311 return false;
290 if (likely(msg_user(bmsg) != MSG_BUNDLER)) 312 if (likely(msg_user(bmsg) != MSG_BUNDLER))
291 return false; 313 return false;
292 if (likely(msg_type(bmsg) != BUNDLE_OPEN)) 314 if (likely(!TIPC_SKB_CB(bskb)->bundling))
293 return false; 315 return false;
294 if (unlikely(skb_tailroom(bbuf) < (pad + msz))) 316 if (unlikely(skb_tailroom(bskb) < (pad + msz)))
295 return false; 317 return false;
296 if (unlikely(max < (start + msz))) 318 if (unlikely(max < (start + msz)))
297 return false; 319 return false;
298 320
299 skb_put(bbuf, pad + msz); 321 skb_put(bskb, pad + msz);
300 skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz); 322 skb_copy_to_linear_data_offset(bskb, start, skb->data, msz);
301 msg_set_size(bmsg, start + msz); 323 msg_set_size(bmsg, start + msz);
302 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1); 324 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
303 bbuf->next = buf->next; 325 kfree_skb(skb);
304 kfree_skb(buf);
305 return true; 326 return true;
306} 327}
307 328
308/** 329/**
330 * tipc_msg_extract(): extract bundled inner packet from buffer
331 * @skb: linear outer buffer, to be extracted from.
332 * @iskb: extracted inner buffer, to be returned
333 * @pos: position of msg to be extracted. Returns with pointer of next msg
334 * Consumes outer buffer when last packet extracted
335 * Returns true when when there is an extracted buffer, otherwise false
336 */
337bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
338{
339 struct tipc_msg *msg = buf_msg(skb);
340 int imsz;
341 struct tipc_msg *imsg = (struct tipc_msg *)(msg_data(msg) + *pos);
342
343 /* Is there space left for shortest possible message? */
344 if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE))
345 goto none;
346 imsz = msg_size(imsg);
347
348 /* Is there space left for current message ? */
349 if ((*pos + imsz) > msg_data_sz(msg))
350 goto none;
351 *iskb = tipc_buf_acquire(imsz);
352 if (!*iskb)
353 goto none;
354 skb_copy_to_linear_data(*iskb, imsg, imsz);
355 *pos += align(imsz);
356 return true;
357none:
358 kfree_skb(skb);
359 *iskb = NULL;
360 return false;
361}
362
363/**
309 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail 364 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
310 * @buf: buffer to be appended and replaced 365 * @list: the buffer chain
311 * @mtu: max allowable size for the bundle buffer, inclusive header 366 * @skb: buffer to be appended and replaced
367 * @mtu: max allowable size for the bundle buffer, inclusive header
312 * @dnode: destination node for message. (Not always present in header) 368 * @dnode: destination node for message. (Not always present in header)
313 * Replaces buffer if successful 369 * Replaces buffer if successful
314 * Returns true if sucess, otherwise false 370 * Returns true if success, otherwise false
315 */ 371 */
316bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode) 372bool tipc_msg_make_bundle(struct sk_buff_head *list,
373 struct sk_buff *skb, u32 mtu, u32 dnode)
317{ 374{
318 struct sk_buff *bbuf; 375 struct sk_buff *bskb;
319 struct tipc_msg *bmsg; 376 struct tipc_msg *bmsg;
320 struct tipc_msg *msg = buf_msg(*buf); 377 struct tipc_msg *msg = buf_msg(skb);
321 u32 msz = msg_size(msg); 378 u32 msz = msg_size(msg);
322 u32 max = mtu - INT_H_SIZE; 379 u32 max = mtu - INT_H_SIZE;
323 380
@@ -330,20 +387,20 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
330 if (msz > (max / 2)) 387 if (msz > (max / 2))
331 return false; 388 return false;
332 389
333 bbuf = tipc_buf_acquire(max); 390 bskb = tipc_buf_acquire(max);
334 if (!bbuf) 391 if (!bskb)
335 return false; 392 return false;
336 393
337 skb_trim(bbuf, INT_H_SIZE); 394 skb_trim(bskb, INT_H_SIZE);
338 bmsg = buf_msg(bbuf); 395 bmsg = buf_msg(bskb);
339 tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode); 396 tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0,
397 INT_H_SIZE, dnode);
340 msg_set_seqno(bmsg, msg_seqno(msg)); 398 msg_set_seqno(bmsg, msg_seqno(msg));
341 msg_set_ack(bmsg, msg_ack(msg)); 399 msg_set_ack(bmsg, msg_ack(msg));
342 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); 400 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
343 bbuf->next = (*buf)->next; 401 TIPC_SKB_CB(bskb)->bundling = true;
344 tipc_msg_bundle(bbuf, *buf, mtu); 402 __skb_queue_tail(list, bskb);
345 *buf = bbuf; 403 return tipc_msg_bundle(list, skb, mtu);
346 return true;
347} 404}
348 405
349/** 406/**
@@ -354,7 +411,8 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
354 * Consumes buffer if failure 411 * Consumes buffer if failure
355 * Returns true if success, otherwise false 412 * Returns true if success, otherwise false
356 */ 413 */
357bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err) 414bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
415 int err)
358{ 416{
359 struct tipc_msg *msg = buf_msg(buf); 417 struct tipc_msg *msg = buf_msg(buf);
360 uint imp = msg_importance(msg); 418 uint imp = msg_importance(msg);
@@ -375,7 +433,7 @@ bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err)
375 msg_set_errcode(msg, err); 433 msg_set_errcode(msg, err);
376 msg_set_origport(msg, msg_destport(&ohdr)); 434 msg_set_origport(msg, msg_destport(&ohdr));
377 msg_set_destport(msg, msg_origport(&ohdr)); 435 msg_set_destport(msg, msg_origport(&ohdr));
378 msg_set_prevnode(msg, tipc_own_addr); 436 msg_set_prevnode(msg, own_addr);
379 if (!msg_short(msg)) { 437 if (!msg_short(msg)) {
380 msg_set_orignode(msg, msg_destnode(&ohdr)); 438 msg_set_orignode(msg, msg_destnode(&ohdr));
381 msg_set_destnode(msg, msg_orignode(&ohdr)); 439 msg_set_destnode(msg, msg_orignode(&ohdr));
@@ -387,64 +445,65 @@ bool tipc_msg_reverse(struct sk_buff *buf, u32 *dnode, int err)
387 return true; 445 return true;
388exit: 446exit:
389 kfree_skb(buf); 447 kfree_skb(buf);
448 *dnode = 0;
390 return false; 449 return false;
391} 450}
392 451
393/** 452/**
394 * tipc_msg_eval: determine fate of message that found no destination 453 * tipc_msg_lookup_dest(): try to find new destination for named message
395 * @buf: the buffer containing the message. 454 * @skb: the buffer containing the message.
396 * @dnode: return value: next-hop node, if message to be forwarded 455 * @dnode: return value: next-hop node, if destination found
397 * @err: error code to use, if message to be rejected 456 * @err: return value: error code to use, if message to be rejected
398 *
399 * Does not consume buffer 457 * Does not consume buffer
400 * Returns 0 (TIPC_OK) if message ok and we can try again, -TIPC error 458 * Returns true if a destination is found, false otherwise
401 * code if message to be rejected
402 */ 459 */
403int tipc_msg_eval(struct sk_buff *buf, u32 *dnode) 460bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
461 u32 *dnode, int *err)
404{ 462{
405 struct tipc_msg *msg = buf_msg(buf); 463 struct tipc_msg *msg = buf_msg(skb);
406 u32 dport; 464 u32 dport;
407 465
408 if (msg_type(msg) != TIPC_NAMED_MSG) 466 if (!msg_isdata(msg))
409 return -TIPC_ERR_NO_PORT; 467 return false;
410 if (skb_linearize(buf)) 468 if (!msg_named(msg))
411 return -TIPC_ERR_NO_NAME; 469 return false;
412 if (msg_data_sz(msg) > MAX_FORWARD_SIZE) 470 *err = -TIPC_ERR_NO_NAME;
413 return -TIPC_ERR_NO_NAME; 471 if (skb_linearize(skb))
472 return false;
414 if (msg_reroute_cnt(msg) > 0) 473 if (msg_reroute_cnt(msg) > 0)
415 return -TIPC_ERR_NO_NAME; 474 return false;
416 475 *dnode = addr_domain(net, msg_lookup_scope(msg));
417 *dnode = addr_domain(msg_lookup_scope(msg)); 476 dport = tipc_nametbl_translate(net, msg_nametype(msg),
418 dport = tipc_nametbl_translate(msg_nametype(msg), 477 msg_nameinst(msg), dnode);
419 msg_nameinst(msg),
420 dnode);
421 if (!dport) 478 if (!dport)
422 return -TIPC_ERR_NO_NAME; 479 return false;
423 msg_incr_reroute_cnt(msg); 480 msg_incr_reroute_cnt(msg);
424 msg_set_destnode(msg, *dnode); 481 msg_set_destnode(msg, *dnode);
425 msg_set_destport(msg, dport); 482 msg_set_destport(msg, dport);
426 return TIPC_OK; 483 *err = TIPC_OK;
484 return true;
427} 485}
428 486
429/* tipc_msg_reassemble() - clone a buffer chain of fragments and 487/* tipc_msg_reassemble() - clone a buffer chain of fragments and
430 * reassemble the clones into one message 488 * reassemble the clones into one message
431 */ 489 */
432struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain) 490struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
433{ 491{
434 struct sk_buff *buf = chain; 492 struct sk_buff *skb;
435 struct sk_buff *frag = buf; 493 struct sk_buff *frag = NULL;
436 struct sk_buff *head = NULL; 494 struct sk_buff *head = NULL;
437 int hdr_sz; 495 int hdr_sz;
438 496
439 /* Copy header if single buffer */ 497 /* Copy header if single buffer */
440 if (!buf->next) { 498 if (skb_queue_len(list) == 1) {
441 hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf)); 499 skb = skb_peek(list);
442 return __pskb_copy(buf, hdr_sz, GFP_ATOMIC); 500 hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
501 return __pskb_copy(skb, hdr_sz, GFP_ATOMIC);
443 } 502 }
444 503
445 /* Clone all fragments and reassemble */ 504 /* Clone all fragments and reassemble */
446 while (buf) { 505 skb_queue_walk(list, skb) {
447 frag = skb_clone(buf, GFP_ATOMIC); 506 frag = skb_clone(skb, GFP_ATOMIC);
448 if (!frag) 507 if (!frag)
449 goto error; 508 goto error;
450 frag->next = NULL; 509 frag->next = NULL;
@@ -452,7 +511,6 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
452 break; 511 break;
453 if (!head) 512 if (!head)
454 goto error; 513 goto error;
455 buf = buf->next;
456 } 514 }
457 return frag; 515 return frag;
458error: 516error: