aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/port.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/port.c')
-rw-r--r--net/tipc/port.c351
1 files changed, 175 insertions, 176 deletions
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 067bab2a0b98..c68dc956a423 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -2,7 +2,7 @@
2 * net/tipc/port.c: TIPC port code 2 * net/tipc/port.c: TIPC port code
3 * 3 *
4 * Copyright (c) 1992-2007, Ericsson AB 4 * Copyright (c) 1992-2007, Ericsson AB
5 * Copyright (c) 2004-2008, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -54,33 +54,19 @@ static DEFINE_SPINLOCK(queue_lock);
54 54
55static LIST_HEAD(ports); 55static LIST_HEAD(ports);
56static void port_handle_node_down(unsigned long ref); 56static void port_handle_node_down(unsigned long ref);
57static struct sk_buff *port_build_self_abort_msg(struct port *, u32 err); 57static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
58static struct sk_buff *port_build_peer_abort_msg(struct port *, u32 err); 58static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
59static void port_timeout(unsigned long ref); 59static void port_timeout(unsigned long ref);
60 60
61 61
62static u32 port_peernode(struct port *p_ptr) 62static u32 port_peernode(struct tipc_port *p_ptr)
63{ 63{
64 return msg_destnode(&p_ptr->publ.phdr); 64 return msg_destnode(&p_ptr->phdr);
65} 65}
66 66
67static u32 port_peerport(struct port *p_ptr) 67static u32 port_peerport(struct tipc_port *p_ptr)
68{ 68{
69 return msg_destport(&p_ptr->publ.phdr); 69 return msg_destport(&p_ptr->phdr);
70}
71
72static u32 port_out_seqno(struct port *p_ptr)
73{
74 return msg_transp_seqno(&p_ptr->publ.phdr);
75}
76
77static void port_incr_out_seqno(struct port *p_ptr)
78{
79 struct tipc_msg *m = &p_ptr->publ.phdr;
80
81 if (likely(!msg_routed(m)))
82 return;
83 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
84} 70}
85 71
86/** 72/**
@@ -88,13 +74,14 @@ static void port_incr_out_seqno(struct port *p_ptr)
88 */ 74 */
89 75
90int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, 76int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
91 u32 num_sect, struct iovec const *msg_sect) 77 u32 num_sect, struct iovec const *msg_sect,
78 unsigned int total_len)
92{ 79{
93 struct tipc_msg *hdr; 80 struct tipc_msg *hdr;
94 struct sk_buff *buf; 81 struct sk_buff *buf;
95 struct sk_buff *ibuf = NULL; 82 struct sk_buff *ibuf = NULL;
96 struct port_list dports = {0, NULL, }; 83 struct port_list dports = {0, NULL, };
97 struct port *oport = tipc_port_deref(ref); 84 struct tipc_port *oport = tipc_port_deref(ref);
98 int ext_targets; 85 int ext_targets;
99 int res; 86 int res;
100 87
@@ -103,13 +90,16 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
103 90
104 /* Create multicast message */ 91 /* Create multicast message */
105 92
106 hdr = &oport->publ.phdr; 93 hdr = &oport->phdr;
107 msg_set_type(hdr, TIPC_MCAST_MSG); 94 msg_set_type(hdr, TIPC_MCAST_MSG);
95 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
96 msg_set_destport(hdr, 0);
97 msg_set_destnode(hdr, 0);
108 msg_set_nametype(hdr, seq->type); 98 msg_set_nametype(hdr, seq->type);
109 msg_set_namelower(hdr, seq->lower); 99 msg_set_namelower(hdr, seq->lower);
110 msg_set_nameupper(hdr, seq->upper); 100 msg_set_nameupper(hdr, seq->upper);
111 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 101 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
112 res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 102 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
113 !oport->user_port, &buf); 103 !oport->user_port, &buf);
114 if (unlikely(!buf)) 104 if (unlikely(!buf))
115 return res; 105 return res;
@@ -175,6 +165,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
175 /* Deliver a copy of message to each destination port */ 165 /* Deliver a copy of message to each destination port */
176 166
177 if (dp->count != 0) { 167 if (dp->count != 0) {
168 msg_set_destnode(msg, tipc_own_addr);
178 if (dp->count == 1) { 169 if (dp->count == 1) {
179 msg_set_destport(msg, dp->ports[0]); 170 msg_set_destport(msg, dp->ports[0]);
180 tipc_port_recv_msg(buf); 171 tipc_port_recv_msg(buf);
@@ -211,7 +202,7 @@ struct tipc_port *tipc_createport_raw(void *usr_handle,
211 void (*wakeup)(struct tipc_port *), 202 void (*wakeup)(struct tipc_port *),
212 const u32 importance) 203 const u32 importance)
213{ 204{
214 struct port *p_ptr; 205 struct tipc_port *p_ptr;
215 struct tipc_msg *msg; 206 struct tipc_msg *msg;
216 u32 ref; 207 u32 ref;
217 208
@@ -220,21 +211,19 @@ struct tipc_port *tipc_createport_raw(void *usr_handle,
220 warn("Port creation failed, no memory\n"); 211 warn("Port creation failed, no memory\n");
221 return NULL; 212 return NULL;
222 } 213 }
223 ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); 214 ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
224 if (!ref) { 215 if (!ref) {
225 warn("Port creation failed, reference table exhausted\n"); 216 warn("Port creation failed, reference table exhausted\n");
226 kfree(p_ptr); 217 kfree(p_ptr);
227 return NULL; 218 return NULL;
228 } 219 }
229 220
230 p_ptr->publ.usr_handle = usr_handle; 221 p_ptr->usr_handle = usr_handle;
231 p_ptr->publ.max_pkt = MAX_PKT_DEFAULT; 222 p_ptr->max_pkt = MAX_PKT_DEFAULT;
232 p_ptr->publ.ref = ref; 223 p_ptr->ref = ref;
233 msg = &p_ptr->publ.phdr; 224 msg = &p_ptr->phdr;
234 tipc_msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0); 225 tipc_msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0);
235 msg_set_origport(msg, ref); 226 msg_set_origport(msg, ref);
236 p_ptr->last_in_seqno = 41;
237 p_ptr->sent = 1;
238 INIT_LIST_HEAD(&p_ptr->wait_list); 227 INIT_LIST_HEAD(&p_ptr->wait_list);
239 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 228 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
240 p_ptr->dispatcher = dispatcher; 229 p_ptr->dispatcher = dispatcher;
@@ -246,12 +235,12 @@ struct tipc_port *tipc_createport_raw(void *usr_handle,
246 INIT_LIST_HEAD(&p_ptr->port_list); 235 INIT_LIST_HEAD(&p_ptr->port_list);
247 list_add_tail(&p_ptr->port_list, &ports); 236 list_add_tail(&p_ptr->port_list, &ports);
248 spin_unlock_bh(&tipc_port_list_lock); 237 spin_unlock_bh(&tipc_port_list_lock);
249 return &(p_ptr->publ); 238 return p_ptr;
250} 239}
251 240
252int tipc_deleteport(u32 ref) 241int tipc_deleteport(u32 ref)
253{ 242{
254 struct port *p_ptr; 243 struct tipc_port *p_ptr;
255 struct sk_buff *buf = NULL; 244 struct sk_buff *buf = NULL;
256 245
257 tipc_withdraw(ref, 0, NULL); 246 tipc_withdraw(ref, 0, NULL);
@@ -263,7 +252,7 @@ int tipc_deleteport(u32 ref)
263 tipc_port_unlock(p_ptr); 252 tipc_port_unlock(p_ptr);
264 253
265 k_cancel_timer(&p_ptr->timer); 254 k_cancel_timer(&p_ptr->timer);
266 if (p_ptr->publ.connected) { 255 if (p_ptr->connected) {
267 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 256 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
268 tipc_nodesub_unsubscribe(&p_ptr->subscription); 257 tipc_nodesub_unsubscribe(&p_ptr->subscription);
269 } 258 }
@@ -279,14 +268,14 @@ int tipc_deleteport(u32 ref)
279 return 0; 268 return 0;
280} 269}
281 270
282static int port_unreliable(struct port *p_ptr) 271static int port_unreliable(struct tipc_port *p_ptr)
283{ 272{
284 return msg_src_droppable(&p_ptr->publ.phdr); 273 return msg_src_droppable(&p_ptr->phdr);
285} 274}
286 275
287int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 276int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
288{ 277{
289 struct port *p_ptr; 278 struct tipc_port *p_ptr;
290 279
291 p_ptr = tipc_port_lock(ref); 280 p_ptr = tipc_port_lock(ref);
292 if (!p_ptr) 281 if (!p_ptr)
@@ -298,24 +287,24 @@ int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
298 287
299int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 288int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
300{ 289{
301 struct port *p_ptr; 290 struct tipc_port *p_ptr;
302 291
303 p_ptr = tipc_port_lock(ref); 292 p_ptr = tipc_port_lock(ref);
304 if (!p_ptr) 293 if (!p_ptr)
305 return -EINVAL; 294 return -EINVAL;
306 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); 295 msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
307 tipc_port_unlock(p_ptr); 296 tipc_port_unlock(p_ptr);
308 return 0; 297 return 0;
309} 298}
310 299
311static int port_unreturnable(struct port *p_ptr) 300static int port_unreturnable(struct tipc_port *p_ptr)
312{ 301{
313 return msg_dest_droppable(&p_ptr->publ.phdr); 302 return msg_dest_droppable(&p_ptr->phdr);
314} 303}
315 304
316int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 305int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
317{ 306{
318 struct port *p_ptr; 307 struct tipc_port *p_ptr;
319 308
320 p_ptr = tipc_port_lock(ref); 309 p_ptr = tipc_port_lock(ref);
321 if (!p_ptr) 310 if (!p_ptr)
@@ -327,12 +316,12 @@ int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
327 316
328int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 317int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
329{ 318{
330 struct port *p_ptr; 319 struct tipc_port *p_ptr;
331 320
332 p_ptr = tipc_port_lock(ref); 321 p_ptr = tipc_port_lock(ref);
333 if (!p_ptr) 322 if (!p_ptr)
334 return -EINVAL; 323 return -EINVAL;
335 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); 324 msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
336 tipc_port_unlock(p_ptr); 325 tipc_port_unlock(p_ptr);
337 return 0; 326 return 0;
338} 327}
@@ -345,7 +334,7 @@ int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
345static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, 334static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
346 u32 origport, u32 orignode, 335 u32 origport, u32 orignode,
347 u32 usr, u32 type, u32 err, 336 u32 usr, u32 type, u32 err,
348 u32 seqno, u32 ack) 337 u32 ack)
349{ 338{
350 struct sk_buff *buf; 339 struct sk_buff *buf;
351 struct tipc_msg *msg; 340 struct tipc_msg *msg;
@@ -358,7 +347,6 @@ static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
358 msg_set_destport(msg, destport); 347 msg_set_destport(msg, destport);
359 msg_set_origport(msg, origport); 348 msg_set_origport(msg, origport);
360 msg_set_orignode(msg, orignode); 349 msg_set_orignode(msg, orignode);
361 msg_set_transp_seqno(msg, seqno);
362 msg_set_msgcnt(msg, ack); 350 msg_set_msgcnt(msg, ack);
363 } 351 }
364 return buf; 352 return buf;
@@ -413,10 +401,10 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
413 /* send self-abort message when rejecting on a connected port */ 401 /* send self-abort message when rejecting on a connected port */
414 if (msg_connected(msg)) { 402 if (msg_connected(msg)) {
415 struct sk_buff *abuf = NULL; 403 struct sk_buff *abuf = NULL;
416 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 404 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
417 405
418 if (p_ptr) { 406 if (p_ptr) {
419 if (p_ptr->publ.connected) 407 if (p_ptr->connected)
420 abuf = port_build_self_abort_msg(p_ptr, err); 408 abuf = port_build_self_abort_msg(p_ptr, err);
421 tipc_port_unlock(p_ptr); 409 tipc_port_unlock(p_ptr);
422 } 410 }
@@ -429,14 +417,14 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
429 return data_sz; 417 return data_sz;
430} 418}
431 419
432int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, 420int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
433 struct iovec const *msg_sect, u32 num_sect, 421 struct iovec const *msg_sect, u32 num_sect,
434 int err) 422 unsigned int total_len, int err)
435{ 423{
436 struct sk_buff *buf; 424 struct sk_buff *buf;
437 int res; 425 int res;
438 426
439 res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 427 res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
440 !p_ptr->user_port, &buf); 428 !p_ptr->user_port, &buf);
441 if (!buf) 429 if (!buf)
442 return res; 430 return res;
@@ -446,13 +434,13 @@ int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
446 434
447static void port_timeout(unsigned long ref) 435static void port_timeout(unsigned long ref)
448{ 436{
449 struct port *p_ptr = tipc_port_lock(ref); 437 struct tipc_port *p_ptr = tipc_port_lock(ref);
450 struct sk_buff *buf = NULL; 438 struct sk_buff *buf = NULL;
451 439
452 if (!p_ptr) 440 if (!p_ptr)
453 return; 441 return;
454 442
455 if (!p_ptr->publ.connected) { 443 if (!p_ptr->connected) {
456 tipc_port_unlock(p_ptr); 444 tipc_port_unlock(p_ptr);
457 return; 445 return;
458 } 446 }
@@ -463,14 +451,12 @@ static void port_timeout(unsigned long ref)
463 } else { 451 } else {
464 buf = port_build_proto_msg(port_peerport(p_ptr), 452 buf = port_build_proto_msg(port_peerport(p_ptr),
465 port_peernode(p_ptr), 453 port_peernode(p_ptr),
466 p_ptr->publ.ref, 454 p_ptr->ref,
467 tipc_own_addr, 455 tipc_own_addr,
468 CONN_MANAGER, 456 CONN_MANAGER,
469 CONN_PROBE, 457 CONN_PROBE,
470 TIPC_OK, 458 TIPC_OK,
471 port_out_seqno(p_ptr),
472 0); 459 0);
473 port_incr_out_seqno(p_ptr);
474 p_ptr->probing_state = PROBING; 460 p_ptr->probing_state = PROBING;
475 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 461 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
476 } 462 }
@@ -481,7 +467,7 @@ static void port_timeout(unsigned long ref)
481 467
482static void port_handle_node_down(unsigned long ref) 468static void port_handle_node_down(unsigned long ref)
483{ 469{
484 struct port *p_ptr = tipc_port_lock(ref); 470 struct tipc_port *p_ptr = tipc_port_lock(ref);
485 struct sk_buff *buf = NULL; 471 struct sk_buff *buf = NULL;
486 472
487 if (!p_ptr) 473 if (!p_ptr)
@@ -492,73 +478,71 @@ static void port_handle_node_down(unsigned long ref)
492} 478}
493 479
494 480
495static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err) 481static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
496{ 482{
497 u32 imp = msg_importance(&p_ptr->publ.phdr); 483 u32 imp = msg_importance(&p_ptr->phdr);
498 484
499 if (!p_ptr->publ.connected) 485 if (!p_ptr->connected)
500 return NULL; 486 return NULL;
501 if (imp < TIPC_CRITICAL_IMPORTANCE) 487 if (imp < TIPC_CRITICAL_IMPORTANCE)
502 imp++; 488 imp++;
503 return port_build_proto_msg(p_ptr->publ.ref, 489 return port_build_proto_msg(p_ptr->ref,
504 tipc_own_addr, 490 tipc_own_addr,
505 port_peerport(p_ptr), 491 port_peerport(p_ptr),
506 port_peernode(p_ptr), 492 port_peernode(p_ptr),
507 imp, 493 imp,
508 TIPC_CONN_MSG, 494 TIPC_CONN_MSG,
509 err, 495 err,
510 p_ptr->last_in_seqno + 1,
511 0); 496 0);
512} 497}
513 498
514 499
515static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err) 500static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
516{ 501{
517 u32 imp = msg_importance(&p_ptr->publ.phdr); 502 u32 imp = msg_importance(&p_ptr->phdr);
518 503
519 if (!p_ptr->publ.connected) 504 if (!p_ptr->connected)
520 return NULL; 505 return NULL;
521 if (imp < TIPC_CRITICAL_IMPORTANCE) 506 if (imp < TIPC_CRITICAL_IMPORTANCE)
522 imp++; 507 imp++;
523 return port_build_proto_msg(port_peerport(p_ptr), 508 return port_build_proto_msg(port_peerport(p_ptr),
524 port_peernode(p_ptr), 509 port_peernode(p_ptr),
525 p_ptr->publ.ref, 510 p_ptr->ref,
526 tipc_own_addr, 511 tipc_own_addr,
527 imp, 512 imp,
528 TIPC_CONN_MSG, 513 TIPC_CONN_MSG,
529 err, 514 err,
530 port_out_seqno(p_ptr),
531 0); 515 0);
532} 516}
533 517
534void tipc_port_recv_proto_msg(struct sk_buff *buf) 518void tipc_port_recv_proto_msg(struct sk_buff *buf)
535{ 519{
536 struct tipc_msg *msg = buf_msg(buf); 520 struct tipc_msg *msg = buf_msg(buf);
537 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 521 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
538 u32 err = TIPC_OK; 522 u32 err = TIPC_OK;
539 struct sk_buff *r_buf = NULL; 523 struct sk_buff *r_buf = NULL;
540 struct sk_buff *abort_buf = NULL; 524 struct sk_buff *abort_buf = NULL;
541 525
542 if (!p_ptr) { 526 if (!p_ptr) {
543 err = TIPC_ERR_NO_PORT; 527 err = TIPC_ERR_NO_PORT;
544 } else if (p_ptr->publ.connected) { 528 } else if (p_ptr->connected) {
545 if ((port_peernode(p_ptr) != msg_orignode(msg)) || 529 if ((port_peernode(p_ptr) != msg_orignode(msg)) ||
546 (port_peerport(p_ptr) != msg_origport(msg))) { 530 (port_peerport(p_ptr) != msg_origport(msg))) {
547 err = TIPC_ERR_NO_PORT; 531 err = TIPC_ERR_NO_PORT;
548 } else if (msg_type(msg) == CONN_ACK) { 532 } else if (msg_type(msg) == CONN_ACK) {
549 int wakeup = tipc_port_congested(p_ptr) && 533 int wakeup = tipc_port_congested(p_ptr) &&
550 p_ptr->publ.congested && 534 p_ptr->congested &&
551 p_ptr->wakeup; 535 p_ptr->wakeup;
552 p_ptr->acked += msg_msgcnt(msg); 536 p_ptr->acked += msg_msgcnt(msg);
553 if (tipc_port_congested(p_ptr)) 537 if (tipc_port_congested(p_ptr))
554 goto exit; 538 goto exit;
555 p_ptr->publ.congested = 0; 539 p_ptr->congested = 0;
556 if (!wakeup) 540 if (!wakeup)
557 goto exit; 541 goto exit;
558 p_ptr->wakeup(&p_ptr->publ); 542 p_ptr->wakeup(p_ptr);
559 goto exit; 543 goto exit;
560 } 544 }
561 } else if (p_ptr->publ.published) { 545 } else if (p_ptr->published) {
562 err = TIPC_ERR_NO_PORT; 546 err = TIPC_ERR_NO_PORT;
563 } 547 }
564 if (err) { 548 if (err) {
@@ -569,7 +553,6 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
569 TIPC_HIGH_IMPORTANCE, 553 TIPC_HIGH_IMPORTANCE,
570 TIPC_CONN_MSG, 554 TIPC_CONN_MSG,
571 err, 555 err,
572 0,
573 0); 556 0);
574 goto exit; 557 goto exit;
575 } 558 }
@@ -583,11 +566,9 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
583 CONN_MANAGER, 566 CONN_MANAGER,
584 CONN_PROBE_REPLY, 567 CONN_PROBE_REPLY,
585 TIPC_OK, 568 TIPC_OK,
586 port_out_seqno(p_ptr),
587 0); 569 0);
588 } 570 }
589 p_ptr->probing_state = CONFIRMED; 571 p_ptr->probing_state = CONFIRMED;
590 port_incr_out_seqno(p_ptr);
591exit: 572exit:
592 if (p_ptr) 573 if (p_ptr)
593 tipc_port_unlock(p_ptr); 574 tipc_port_unlock(p_ptr);
@@ -596,29 +577,29 @@ exit:
596 buf_discard(buf); 577 buf_discard(buf);
597} 578}
598 579
599static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id) 580static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id)
600{ 581{
601 struct publication *publ; 582 struct publication *publ;
602 583
603 if (full_id) 584 if (full_id)
604 tipc_printf(buf, "<%u.%u.%u:%u>:", 585 tipc_printf(buf, "<%u.%u.%u:%u>:",
605 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 586 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
606 tipc_node(tipc_own_addr), p_ptr->publ.ref); 587 tipc_node(tipc_own_addr), p_ptr->ref);
607 else 588 else
608 tipc_printf(buf, "%-10u:", p_ptr->publ.ref); 589 tipc_printf(buf, "%-10u:", p_ptr->ref);
609 590
610 if (p_ptr->publ.connected) { 591 if (p_ptr->connected) {
611 u32 dport = port_peerport(p_ptr); 592 u32 dport = port_peerport(p_ptr);
612 u32 destnode = port_peernode(p_ptr); 593 u32 destnode = port_peernode(p_ptr);
613 594
614 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 595 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
615 tipc_zone(destnode), tipc_cluster(destnode), 596 tipc_zone(destnode), tipc_cluster(destnode),
616 tipc_node(destnode), dport); 597 tipc_node(destnode), dport);
617 if (p_ptr->publ.conn_type != 0) 598 if (p_ptr->conn_type != 0)
618 tipc_printf(buf, " via {%u,%u}", 599 tipc_printf(buf, " via {%u,%u}",
619 p_ptr->publ.conn_type, 600 p_ptr->conn_type,
620 p_ptr->publ.conn_instance); 601 p_ptr->conn_instance);
621 } else if (p_ptr->publ.published) { 602 } else if (p_ptr->published) {
622 tipc_printf(buf, " bound to"); 603 tipc_printf(buf, " bound to");
623 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 604 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
624 if (publ->lower == publ->upper) 605 if (publ->lower == publ->upper)
@@ -639,7 +620,7 @@ struct sk_buff *tipc_port_get_ports(void)
639 struct sk_buff *buf; 620 struct sk_buff *buf;
640 struct tlv_desc *rep_tlv; 621 struct tlv_desc *rep_tlv;
641 struct print_buf pb; 622 struct print_buf pb;
642 struct port *p_ptr; 623 struct tipc_port *p_ptr;
643 int str_len; 624 int str_len;
644 625
645 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 626 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
@@ -650,9 +631,9 @@ struct sk_buff *tipc_port_get_ports(void)
650 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 631 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
651 spin_lock_bh(&tipc_port_list_lock); 632 spin_lock_bh(&tipc_port_list_lock);
652 list_for_each_entry(p_ptr, &ports, port_list) { 633 list_for_each_entry(p_ptr, &ports, port_list) {
653 spin_lock_bh(p_ptr->publ.lock); 634 spin_lock_bh(p_ptr->lock);
654 port_print(p_ptr, &pb, 0); 635 port_print(p_ptr, &pb, 0);
655 spin_unlock_bh(p_ptr->publ.lock); 636 spin_unlock_bh(p_ptr->lock);
656 } 637 }
657 spin_unlock_bh(&tipc_port_list_lock); 638 spin_unlock_bh(&tipc_port_list_lock);
658 str_len = tipc_printbuf_validate(&pb); 639 str_len = tipc_printbuf_validate(&pb);
@@ -665,12 +646,12 @@ struct sk_buff *tipc_port_get_ports(void)
665 646
666void tipc_port_reinit(void) 647void tipc_port_reinit(void)
667{ 648{
668 struct port *p_ptr; 649 struct tipc_port *p_ptr;
669 struct tipc_msg *msg; 650 struct tipc_msg *msg;
670 651
671 spin_lock_bh(&tipc_port_list_lock); 652 spin_lock_bh(&tipc_port_list_lock);
672 list_for_each_entry(p_ptr, &ports, port_list) { 653 list_for_each_entry(p_ptr, &ports, port_list) {
673 msg = &p_ptr->publ.phdr; 654 msg = &p_ptr->phdr;
674 if (msg_orignode(msg) == tipc_own_addr) 655 if (msg_orignode(msg) == tipc_own_addr)
675 break; 656 break;
676 msg_set_prevnode(msg, tipc_own_addr); 657 msg_set_prevnode(msg, tipc_own_addr);
@@ -695,7 +676,7 @@ static void port_dispatcher_sigh(void *dummy)
695 spin_unlock_bh(&queue_lock); 676 spin_unlock_bh(&queue_lock);
696 677
697 while (buf) { 678 while (buf) {
698 struct port *p_ptr; 679 struct tipc_port *p_ptr;
699 struct user_port *up_ptr; 680 struct user_port *up_ptr;
700 struct tipc_portid orig; 681 struct tipc_portid orig;
701 struct tipc_name_seq dseq; 682 struct tipc_name_seq dseq;
@@ -720,8 +701,8 @@ static void port_dispatcher_sigh(void *dummy)
720 orig.node = msg_orignode(msg); 701 orig.node = msg_orignode(msg);
721 up_ptr = p_ptr->user_port; 702 up_ptr = p_ptr->user_port;
722 usr_handle = up_ptr->usr_handle; 703 usr_handle = up_ptr->usr_handle;
723 connected = p_ptr->publ.connected; 704 connected = p_ptr->connected;
724 published = p_ptr->publ.published; 705 published = p_ptr->published;
725 706
726 if (unlikely(msg_errcode(msg))) 707 if (unlikely(msg_errcode(msg)))
727 goto err; 708 goto err;
@@ -732,6 +713,7 @@ static void port_dispatcher_sigh(void *dummy)
732 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 713 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
733 u32 peer_port = port_peerport(p_ptr); 714 u32 peer_port = port_peerport(p_ptr);
734 u32 peer_node = port_peernode(p_ptr); 715 u32 peer_node = port_peernode(p_ptr);
716 u32 dsz;
735 717
736 tipc_port_unlock(p_ptr); 718 tipc_port_unlock(p_ptr);
737 if (unlikely(!cb)) 719 if (unlikely(!cb))
@@ -742,13 +724,14 @@ static void port_dispatcher_sigh(void *dummy)
742 } else if ((msg_origport(msg) != peer_port) || 724 } else if ((msg_origport(msg) != peer_port) ||
743 (msg_orignode(msg) != peer_node)) 725 (msg_orignode(msg) != peer_node))
744 goto reject; 726 goto reject;
745 if (unlikely(++p_ptr->publ.conn_unacked >= 727 dsz = msg_data_sz(msg);
746 TIPC_FLOW_CONTROL_WIN)) 728 if (unlikely(dsz &&
729 (++p_ptr->conn_unacked >=
730 TIPC_FLOW_CONTROL_WIN)))
747 tipc_acknowledge(dref, 731 tipc_acknowledge(dref,
748 p_ptr->publ.conn_unacked); 732 p_ptr->conn_unacked);
749 skb_pull(buf, msg_hdr_sz(msg)); 733 skb_pull(buf, msg_hdr_sz(msg));
750 cb(usr_handle, dref, &buf, msg_data(msg), 734 cb(usr_handle, dref, &buf, msg_data(msg), dsz);
751 msg_data_sz(msg));
752 break; 735 break;
753 } 736 }
754 case TIPC_DIRECT_MSG:{ 737 case TIPC_DIRECT_MSG:{
@@ -872,7 +855,7 @@ static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
872 855
873static void port_wakeup_sh(unsigned long ref) 856static void port_wakeup_sh(unsigned long ref)
874{ 857{
875 struct port *p_ptr; 858 struct tipc_port *p_ptr;
876 struct user_port *up_ptr; 859 struct user_port *up_ptr;
877 tipc_continue_event cb = NULL; 860 tipc_continue_event cb = NULL;
878 void *uh = NULL; 861 void *uh = NULL;
@@ -898,14 +881,14 @@ static void port_wakeup(struct tipc_port *p_ptr)
898 881
899void tipc_acknowledge(u32 ref, u32 ack) 882void tipc_acknowledge(u32 ref, u32 ack)
900{ 883{
901 struct port *p_ptr; 884 struct tipc_port *p_ptr;
902 struct sk_buff *buf = NULL; 885 struct sk_buff *buf = NULL;
903 886
904 p_ptr = tipc_port_lock(ref); 887 p_ptr = tipc_port_lock(ref);
905 if (!p_ptr) 888 if (!p_ptr)
906 return; 889 return;
907 if (p_ptr->publ.connected) { 890 if (p_ptr->connected) {
908 p_ptr->publ.conn_unacked -= ack; 891 p_ptr->conn_unacked -= ack;
909 buf = port_build_proto_msg(port_peerport(p_ptr), 892 buf = port_build_proto_msg(port_peerport(p_ptr),
910 port_peernode(p_ptr), 893 port_peernode(p_ptr),
911 ref, 894 ref,
@@ -913,7 +896,6 @@ void tipc_acknowledge(u32 ref, u32 ack)
913 CONN_MANAGER, 896 CONN_MANAGER,
914 CONN_ACK, 897 CONN_ACK,
915 TIPC_OK, 898 TIPC_OK,
916 port_out_seqno(p_ptr),
917 ack); 899 ack);
918 } 900 }
919 tipc_port_unlock(p_ptr); 901 tipc_port_unlock(p_ptr);
@@ -936,14 +918,14 @@ int tipc_createport(void *usr_handle,
936 u32 *portref) 918 u32 *portref)
937{ 919{
938 struct user_port *up_ptr; 920 struct user_port *up_ptr;
939 struct port *p_ptr; 921 struct tipc_port *p_ptr;
940 922
941 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 923 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
942 if (!up_ptr) { 924 if (!up_ptr) {
943 warn("Port creation failed, no memory\n"); 925 warn("Port creation failed, no memory\n");
944 return -ENOMEM; 926 return -ENOMEM;
945 } 927 }
946 p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher, 928 p_ptr = (struct tipc_port *)tipc_createport_raw(NULL, port_dispatcher,
947 port_wakeup, importance); 929 port_wakeup, importance);
948 if (!p_ptr) { 930 if (!p_ptr) {
949 kfree(up_ptr); 931 kfree(up_ptr);
@@ -952,7 +934,7 @@ int tipc_createport(void *usr_handle,
952 934
953 p_ptr->user_port = up_ptr; 935 p_ptr->user_port = up_ptr;
954 up_ptr->usr_handle = usr_handle; 936 up_ptr->usr_handle = usr_handle;
955 up_ptr->ref = p_ptr->publ.ref; 937 up_ptr->ref = p_ptr->ref;
956 up_ptr->err_cb = error_cb; 938 up_ptr->err_cb = error_cb;
957 up_ptr->named_err_cb = named_error_cb; 939 up_ptr->named_err_cb = named_error_cb;
958 up_ptr->conn_err_cb = conn_error_cb; 940 up_ptr->conn_err_cb = conn_error_cb;
@@ -960,26 +942,26 @@ int tipc_createport(void *usr_handle,
960 up_ptr->named_msg_cb = named_msg_cb; 942 up_ptr->named_msg_cb = named_msg_cb;
961 up_ptr->conn_msg_cb = conn_msg_cb; 943 up_ptr->conn_msg_cb = conn_msg_cb;
962 up_ptr->continue_event_cb = continue_event_cb; 944 up_ptr->continue_event_cb = continue_event_cb;
963 *portref = p_ptr->publ.ref; 945 *portref = p_ptr->ref;
964 tipc_port_unlock(p_ptr); 946 tipc_port_unlock(p_ptr);
965 return 0; 947 return 0;
966} 948}
967 949
968int tipc_portimportance(u32 ref, unsigned int *importance) 950int tipc_portimportance(u32 ref, unsigned int *importance)
969{ 951{
970 struct port *p_ptr; 952 struct tipc_port *p_ptr;
971 953
972 p_ptr = tipc_port_lock(ref); 954 p_ptr = tipc_port_lock(ref);
973 if (!p_ptr) 955 if (!p_ptr)
974 return -EINVAL; 956 return -EINVAL;
975 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); 957 *importance = (unsigned int)msg_importance(&p_ptr->phdr);
976 tipc_port_unlock(p_ptr); 958 tipc_port_unlock(p_ptr);
977 return 0; 959 return 0;
978} 960}
979 961
980int tipc_set_portimportance(u32 ref, unsigned int imp) 962int tipc_set_portimportance(u32 ref, unsigned int imp)
981{ 963{
982 struct port *p_ptr; 964 struct tipc_port *p_ptr;
983 965
984 if (imp > TIPC_CRITICAL_IMPORTANCE) 966 if (imp > TIPC_CRITICAL_IMPORTANCE)
985 return -EINVAL; 967 return -EINVAL;
@@ -987,7 +969,7 @@ int tipc_set_portimportance(u32 ref, unsigned int imp)
987 p_ptr = tipc_port_lock(ref); 969 p_ptr = tipc_port_lock(ref);
988 if (!p_ptr) 970 if (!p_ptr)
989 return -EINVAL; 971 return -EINVAL;
990 msg_set_importance(&p_ptr->publ.phdr, (u32)imp); 972 msg_set_importance(&p_ptr->phdr, (u32)imp);
991 tipc_port_unlock(p_ptr); 973 tipc_port_unlock(p_ptr);
992 return 0; 974 return 0;
993} 975}
@@ -995,7 +977,7 @@ int tipc_set_portimportance(u32 ref, unsigned int imp)
995 977
996int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 978int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
997{ 979{
998 struct port *p_ptr; 980 struct tipc_port *p_ptr;
999 struct publication *publ; 981 struct publication *publ;
1000 u32 key; 982 u32 key;
1001 int res = -EINVAL; 983 int res = -EINVAL;
@@ -1004,7 +986,7 @@ int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1004 if (!p_ptr) 986 if (!p_ptr)
1005 return -EINVAL; 987 return -EINVAL;
1006 988
1007 if (p_ptr->publ.connected) 989 if (p_ptr->connected)
1008 goto exit; 990 goto exit;
1009 if (seq->lower > seq->upper) 991 if (seq->lower > seq->upper)
1010 goto exit; 992 goto exit;
@@ -1016,11 +998,11 @@ int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1016 goto exit; 998 goto exit;
1017 } 999 }
1018 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 1000 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
1019 scope, p_ptr->publ.ref, key); 1001 scope, p_ptr->ref, key);
1020 if (publ) { 1002 if (publ) {
1021 list_add(&publ->pport_list, &p_ptr->publications); 1003 list_add(&publ->pport_list, &p_ptr->publications);
1022 p_ptr->pub_count++; 1004 p_ptr->pub_count++;
1023 p_ptr->publ.published = 1; 1005 p_ptr->published = 1;
1024 res = 0; 1006 res = 0;
1025 } 1007 }
1026exit: 1008exit:
@@ -1030,7 +1012,7 @@ exit:
1030 1012
1031int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1013int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1032{ 1014{
1033 struct port *p_ptr; 1015 struct tipc_port *p_ptr;
1034 struct publication *publ; 1016 struct publication *publ;
1035 struct publication *tpubl; 1017 struct publication *tpubl;
1036 int res = -EINVAL; 1018 int res = -EINVAL;
@@ -1063,37 +1045,37 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1063 } 1045 }
1064 } 1046 }
1065 if (list_empty(&p_ptr->publications)) 1047 if (list_empty(&p_ptr->publications))
1066 p_ptr->publ.published = 0; 1048 p_ptr->published = 0;
1067 tipc_port_unlock(p_ptr); 1049 tipc_port_unlock(p_ptr);
1068 return res; 1050 return res;
1069} 1051}
1070 1052
1071int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1053int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1072{ 1054{
1073 struct port *p_ptr; 1055 struct tipc_port *p_ptr;
1074 struct tipc_msg *msg; 1056 struct tipc_msg *msg;
1075 int res = -EINVAL; 1057 int res = -EINVAL;
1076 1058
1077 p_ptr = tipc_port_lock(ref); 1059 p_ptr = tipc_port_lock(ref);
1078 if (!p_ptr) 1060 if (!p_ptr)
1079 return -EINVAL; 1061 return -EINVAL;
1080 if (p_ptr->publ.published || p_ptr->publ.connected) 1062 if (p_ptr->published || p_ptr->connected)
1081 goto exit; 1063 goto exit;
1082 if (!peer->ref) 1064 if (!peer->ref)
1083 goto exit; 1065 goto exit;
1084 1066
1085 msg = &p_ptr->publ.phdr; 1067 msg = &p_ptr->phdr;
1086 msg_set_destnode(msg, peer->node); 1068 msg_set_destnode(msg, peer->node);
1087 msg_set_destport(msg, peer->ref); 1069 msg_set_destport(msg, peer->ref);
1088 msg_set_orignode(msg, tipc_own_addr); 1070 msg_set_orignode(msg, tipc_own_addr);
1089 msg_set_origport(msg, p_ptr->publ.ref); 1071 msg_set_origport(msg, p_ptr->ref);
1090 msg_set_transp_seqno(msg, 42);
1091 msg_set_type(msg, TIPC_CONN_MSG); 1072 msg_set_type(msg, TIPC_CONN_MSG);
1073 msg_set_lookup_scope(msg, 0);
1092 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1074 msg_set_hdr_sz(msg, SHORT_H_SIZE);
1093 1075
1094 p_ptr->probing_interval = PROBING_INTERVAL; 1076 p_ptr->probing_interval = PROBING_INTERVAL;
1095 p_ptr->probing_state = CONFIRMED; 1077 p_ptr->probing_state = CONFIRMED;
1096 p_ptr->publ.connected = 1; 1078 p_ptr->connected = 1;
1097 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1079 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1098 1080
1099 tipc_nodesub_subscribe(&p_ptr->subscription, peer->node, 1081 tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
@@ -1102,7 +1084,7 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1102 res = 0; 1084 res = 0;
1103exit: 1085exit:
1104 tipc_port_unlock(p_ptr); 1086 tipc_port_unlock(p_ptr);
1105 p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1087 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1106 return res; 1088 return res;
1107} 1089}
1108 1090
@@ -1120,7 +1102,7 @@ int tipc_disconnect_port(struct tipc_port *tp_ptr)
1120 tp_ptr->connected = 0; 1102 tp_ptr->connected = 0;
1121 /* let timer expire on it's own to avoid deadlock! */ 1103 /* let timer expire on it's own to avoid deadlock! */
1122 tipc_nodesub_unsubscribe( 1104 tipc_nodesub_unsubscribe(
1123 &((struct port *)tp_ptr)->subscription); 1105 &((struct tipc_port *)tp_ptr)->subscription);
1124 res = 0; 1106 res = 0;
1125 } else { 1107 } else {
1126 res = -ENOTCONN; 1108 res = -ENOTCONN;
@@ -1135,7 +1117,7 @@ int tipc_disconnect_port(struct tipc_port *tp_ptr)
1135 1117
1136int tipc_disconnect(u32 ref) 1118int tipc_disconnect(u32 ref)
1137{ 1119{
1138 struct port *p_ptr; 1120 struct tipc_port *p_ptr;
1139 int res; 1121 int res;
1140 1122
1141 p_ptr = tipc_port_lock(ref); 1123 p_ptr = tipc_port_lock(ref);
@@ -1151,15 +1133,15 @@ int tipc_disconnect(u32 ref)
1151 */ 1133 */
1152int tipc_shutdown(u32 ref) 1134int tipc_shutdown(u32 ref)
1153{ 1135{
1154 struct port *p_ptr; 1136 struct tipc_port *p_ptr;
1155 struct sk_buff *buf = NULL; 1137 struct sk_buff *buf = NULL;
1156 1138
1157 p_ptr = tipc_port_lock(ref); 1139 p_ptr = tipc_port_lock(ref);
1158 if (!p_ptr) 1140 if (!p_ptr)
1159 return -EINVAL; 1141 return -EINVAL;
1160 1142
1161 if (p_ptr->publ.connected) { 1143 if (p_ptr->connected) {
1162 u32 imp = msg_importance(&p_ptr->publ.phdr); 1144 u32 imp = msg_importance(&p_ptr->phdr);
1163 if (imp < TIPC_CRITICAL_IMPORTANCE) 1145 if (imp < TIPC_CRITICAL_IMPORTANCE)
1164 imp++; 1146 imp++;
1165 buf = port_build_proto_msg(port_peerport(p_ptr), 1147 buf = port_build_proto_msg(port_peerport(p_ptr),
@@ -1169,7 +1151,6 @@ int tipc_shutdown(u32 ref)
1169 imp, 1151 imp,
1170 TIPC_CONN_MSG, 1152 TIPC_CONN_MSG,
1171 TIPC_CONN_SHUTDOWN, 1153 TIPC_CONN_SHUTDOWN,
1172 port_out_seqno(p_ptr),
1173 0); 1154 0);
1174 } 1155 }
1175 tipc_port_unlock(p_ptr); 1156 tipc_port_unlock(p_ptr);
@@ -1182,13 +1163,14 @@ int tipc_shutdown(u32 ref)
1182 * message for this node. 1163 * message for this node.
1183 */ 1164 */
1184 1165
1185static int tipc_port_recv_sections(struct port *sender, unsigned int num_sect, 1166static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
1186 struct iovec const *msg_sect) 1167 struct iovec const *msg_sect,
1168 unsigned int total_len)
1187{ 1169{
1188 struct sk_buff *buf; 1170 struct sk_buff *buf;
1189 int res; 1171 int res;
1190 1172
1191 res = tipc_msg_build(&sender->publ.phdr, msg_sect, num_sect, 1173 res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
1192 MAX_MSG_SIZE, !sender->user_port, &buf); 1174 MAX_MSG_SIZE, !sender->user_port, &buf);
1193 if (likely(buf)) 1175 if (likely(buf))
1194 tipc_port_recv_msg(buf); 1176 tipc_port_recv_msg(buf);
@@ -1199,36 +1181,37 @@ static int tipc_port_recv_sections(struct port *sender, unsigned int num_sect,
1199 * tipc_send - send message sections on connection 1181 * tipc_send - send message sections on connection
1200 */ 1182 */
1201 1183
1202int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect) 1184int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
1185 unsigned int total_len)
1203{ 1186{
1204 struct port *p_ptr; 1187 struct tipc_port *p_ptr;
1205 u32 destnode; 1188 u32 destnode;
1206 int res; 1189 int res;
1207 1190
1208 p_ptr = tipc_port_deref(ref); 1191 p_ptr = tipc_port_deref(ref);
1209 if (!p_ptr || !p_ptr->publ.connected) 1192 if (!p_ptr || !p_ptr->connected)
1210 return -EINVAL; 1193 return -EINVAL;
1211 1194
1212 p_ptr->publ.congested = 1; 1195 p_ptr->congested = 1;
1213 if (!tipc_port_congested(p_ptr)) { 1196 if (!tipc_port_congested(p_ptr)) {
1214 destnode = port_peernode(p_ptr); 1197 destnode = port_peernode(p_ptr);
1215 if (likely(destnode != tipc_own_addr)) 1198 if (likely(destnode != tipc_own_addr))
1216 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1199 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1217 destnode); 1200 total_len, destnode);
1218 else 1201 else
1219 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1202 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1203 total_len);
1220 1204
1221 if (likely(res != -ELINKCONG)) { 1205 if (likely(res != -ELINKCONG)) {
1222 port_incr_out_seqno(p_ptr); 1206 p_ptr->congested = 0;
1223 p_ptr->publ.congested = 0; 1207 if (res > 0)
1224 p_ptr->sent++; 1208 p_ptr->sent++;
1225 return res; 1209 return res;
1226 } 1210 }
1227 } 1211 }
1228 if (port_unreliable(p_ptr)) { 1212 if (port_unreliable(p_ptr)) {
1229 p_ptr->publ.congested = 0; 1213 p_ptr->congested = 0;
1230 /* Just calculate msg length and return */ 1214 return total_len;
1231 return tipc_msg_calc_data_size(msg_sect, num_sect);
1232 } 1215 }
1233 return -ELINKCONG; 1216 return -ELINKCONG;
1234} 1217}
@@ -1238,19 +1221,20 @@ int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
1238 */ 1221 */
1239 1222
1240int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain, 1223int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1241 unsigned int num_sect, struct iovec const *msg_sect) 1224 unsigned int num_sect, struct iovec const *msg_sect,
1225 unsigned int total_len)
1242{ 1226{
1243 struct port *p_ptr; 1227 struct tipc_port *p_ptr;
1244 struct tipc_msg *msg; 1228 struct tipc_msg *msg;
1245 u32 destnode = domain; 1229 u32 destnode = domain;
1246 u32 destport; 1230 u32 destport;
1247 int res; 1231 int res;
1248 1232
1249 p_ptr = tipc_port_deref(ref); 1233 p_ptr = tipc_port_deref(ref);
1250 if (!p_ptr || p_ptr->publ.connected) 1234 if (!p_ptr || p_ptr->connected)
1251 return -EINVAL; 1235 return -EINVAL;
1252 1236
1253 msg = &p_ptr->publ.phdr; 1237 msg = &p_ptr->phdr;
1254 msg_set_type(msg, TIPC_NAMED_MSG); 1238 msg_set_type(msg, TIPC_NAMED_MSG);
1255 msg_set_orignode(msg, tipc_own_addr); 1239 msg_set_orignode(msg, tipc_own_addr);
1256 msg_set_origport(msg, ref); 1240 msg_set_origport(msg, ref);
@@ -1263,21 +1247,25 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1263 msg_set_destport(msg, destport); 1247 msg_set_destport(msg, destport);
1264 1248
1265 if (likely(destport)) { 1249 if (likely(destport)) {
1266 p_ptr->sent++;
1267 if (likely(destnode == tipc_own_addr)) 1250 if (likely(destnode == tipc_own_addr))
1268 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1251 res = tipc_port_recv_sections(p_ptr, num_sect,
1269 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1252 msg_sect, total_len);
1270 destnode); 1253 else
1271 if (likely(res != -ELINKCONG)) 1254 res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1255 num_sect, total_len,
1256 destnode);
1257 if (likely(res != -ELINKCONG)) {
1258 if (res > 0)
1259 p_ptr->sent++;
1272 return res; 1260 return res;
1261 }
1273 if (port_unreliable(p_ptr)) { 1262 if (port_unreliable(p_ptr)) {
1274 /* Just calculate msg length and return */ 1263 return total_len;
1275 return tipc_msg_calc_data_size(msg_sect, num_sect);
1276 } 1264 }
1277 return -ELINKCONG; 1265 return -ELINKCONG;
1278 } 1266 }
1279 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1267 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1280 TIPC_ERR_NO_NAME); 1268 total_len, TIPC_ERR_NO_NAME);
1281} 1269}
1282 1270
1283/** 1271/**
@@ -1285,32 +1273,39 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1285 */ 1273 */
1286 1274
1287int tipc_send2port(u32 ref, struct tipc_portid const *dest, 1275int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1288 unsigned int num_sect, struct iovec const *msg_sect) 1276 unsigned int num_sect, struct iovec const *msg_sect,
1277 unsigned int total_len)
1289{ 1278{
1290 struct port *p_ptr; 1279 struct tipc_port *p_ptr;
1291 struct tipc_msg *msg; 1280 struct tipc_msg *msg;
1292 int res; 1281 int res;
1293 1282
1294 p_ptr = tipc_port_deref(ref); 1283 p_ptr = tipc_port_deref(ref);
1295 if (!p_ptr || p_ptr->publ.connected) 1284 if (!p_ptr || p_ptr->connected)
1296 return -EINVAL; 1285 return -EINVAL;
1297 1286
1298 msg = &p_ptr->publ.phdr; 1287 msg = &p_ptr->phdr;
1299 msg_set_type(msg, TIPC_DIRECT_MSG); 1288 msg_set_type(msg, TIPC_DIRECT_MSG);
1289 msg_set_lookup_scope(msg, 0);
1300 msg_set_orignode(msg, tipc_own_addr); 1290 msg_set_orignode(msg, tipc_own_addr);
1301 msg_set_origport(msg, ref); 1291 msg_set_origport(msg, ref);
1302 msg_set_destnode(msg, dest->node); 1292 msg_set_destnode(msg, dest->node);
1303 msg_set_destport(msg, dest->ref); 1293 msg_set_destport(msg, dest->ref);
1304 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1294 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1305 p_ptr->sent++; 1295
1306 if (dest->node == tipc_own_addr) 1296 if (dest->node == tipc_own_addr)
1307 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1297 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1308 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node); 1298 total_len);
1309 if (likely(res != -ELINKCONG)) 1299 else
1300 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1301 total_len, dest->node);
1302 if (likely(res != -ELINKCONG)) {
1303 if (res > 0)
1304 p_ptr->sent++;
1310 return res; 1305 return res;
1306 }
1311 if (port_unreliable(p_ptr)) { 1307 if (port_unreliable(p_ptr)) {
1312 /* Just calculate msg length and return */ 1308 return total_len;
1313 return tipc_msg_calc_data_size(msg_sect, num_sect);
1314 } 1309 }
1315 return -ELINKCONG; 1310 return -ELINKCONG;
1316} 1311}
@@ -1322,15 +1317,15 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1322int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest, 1317int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1323 struct sk_buff *buf, unsigned int dsz) 1318 struct sk_buff *buf, unsigned int dsz)
1324{ 1319{
1325 struct port *p_ptr; 1320 struct tipc_port *p_ptr;
1326 struct tipc_msg *msg; 1321 struct tipc_msg *msg;
1327 int res; 1322 int res;
1328 1323
1329 p_ptr = (struct port *)tipc_ref_deref(ref); 1324 p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
1330 if (!p_ptr || p_ptr->publ.connected) 1325 if (!p_ptr || p_ptr->connected)
1331 return -EINVAL; 1326 return -EINVAL;
1332 1327
1333 msg = &p_ptr->publ.phdr; 1328 msg = &p_ptr->phdr;
1334 msg_set_type(msg, TIPC_DIRECT_MSG); 1329 msg_set_type(msg, TIPC_DIRECT_MSG);
1335 msg_set_orignode(msg, tipc_own_addr); 1330 msg_set_orignode(msg, tipc_own_addr);
1336 msg_set_origport(msg, ref); 1331 msg_set_origport(msg, ref);
@@ -1343,12 +1338,16 @@ int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1343 1338
1344 skb_push(buf, DIR_MSG_H_SIZE); 1339 skb_push(buf, DIR_MSG_H_SIZE);
1345 skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE); 1340 skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE);
1346 p_ptr->sent++; 1341
1347 if (dest->node == tipc_own_addr) 1342 if (dest->node == tipc_own_addr)
1348 return tipc_port_recv_msg(buf); 1343 res = tipc_port_recv_msg(buf);
1349 res = tipc_send_buf_fast(buf, dest->node); 1344 else
1350 if (likely(res != -ELINKCONG)) 1345 res = tipc_send_buf_fast(buf, dest->node);
1346 if (likely(res != -ELINKCONG)) {
1347 if (res > 0)
1348 p_ptr->sent++;
1351 return res; 1349 return res;
1350 }
1352 if (port_unreliable(p_ptr)) 1351 if (port_unreliable(p_ptr))
1353 return dsz; 1352 return dsz;
1354 return -ELINKCONG; 1353 return -ELINKCONG;