aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-07-30 18:24:16 -0400
committerDavid S. Miller <davem@davemloft.net>2015-07-30 20:25:13 -0400
commit6144a996a65199480eed7521c1c50590c282e78e (patch)
tree7580d28456da45dbec681e3c3ae710b14f4f3c09 /net/tipc
parentcbeb83ca68dcedf69b336fd1c5263658cbe5b51e (diff)
tipc: move all link_reset() calls to link aggregation level
In line with our effort to let the node level have full control over its links, we want to move all link reset calls from link.c to node.c. Some of the calls can be moved by simply moving the calling function, when this is the right thing to do. For the remaining calls we use the now established technique of returning a TIPC_LINK_DOWN_EVT flag from tipc_link_rcv(), whereafter we perform the reset call when the call returns. This change serves as a preparation for the coming commits. Tested-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bearer.c4
-rw-r--r--net/tipc/link.c81
-rw-r--r--net/tipc/link.h3
-rw-r--r--net/tipc/node.c84
-rw-r--r--net/tipc/node.h1
5 files changed, 104 insertions, 69 deletions
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index eae58a6b121c..ce9f7bfc0b92 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -343,7 +343,7 @@ restart:
343static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b_ptr) 343static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b_ptr)
344{ 344{
345 pr_info("Resetting bearer <%s>\n", b_ptr->name); 345 pr_info("Resetting bearer <%s>\n", b_ptr->name);
346 tipc_link_delete_list(net, b_ptr->identity); 346 tipc_node_delete_links(net, b_ptr->identity);
347 tipc_disc_reset(net, b_ptr); 347 tipc_disc_reset(net, b_ptr);
348 return 0; 348 return 0;
349} 349}
@@ -361,7 +361,7 @@ static void bearer_disable(struct net *net, struct tipc_bearer *b_ptr)
361 pr_info("Disabling bearer <%s>\n", b_ptr->name); 361 pr_info("Disabling bearer <%s>\n", b_ptr->name);
362 b_ptr->media->disable_media(b_ptr); 362 b_ptr->media->disable_media(b_ptr);
363 363
364 tipc_link_delete_list(net, b_ptr->identity); 364 tipc_node_delete_links(net, b_ptr->identity);
365 if (b_ptr->link_req) 365 if (b_ptr->link_req)
366 tipc_disc_delete(b_ptr->link_req); 366 tipc_disc_delete(b_ptr->link_req);
367 367
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 05837ba7b68c..8c81db7b17f9 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -137,9 +137,9 @@ static void link_print(struct tipc_link *l_ptr, const char *str);
137static void tipc_link_build_bcast_sync_msg(struct tipc_link *l, 137static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
138 struct sk_buff_head *xmitq); 138 struct sk_buff_head *xmitq);
139static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 139static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
140static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb); 140static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
141static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); 141static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
142static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb); 142static int tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
143 143
144/* 144/*
145 * Simple link routines 145 * Simple link routines
@@ -258,34 +258,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
258 return l_ptr; 258 return l_ptr;
259} 259}
260 260
261/**
262 * tipc_link_delete - Delete a link
263 * @l: link to be deleted
264 */
265void tipc_link_delete(struct tipc_link *l)
266{
267 tipc_link_reset(l);
268 tipc_link_reset_fragments(l);
269 tipc_node_detach_link(l->owner, l);
270}
271
272void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
273{
274 struct tipc_net *tn = net_generic(net, tipc_net_id);
275 struct tipc_link *link;
276 struct tipc_node *node;
277
278 rcu_read_lock();
279 list_for_each_entry_rcu(node, &tn->node_list, list) {
280 tipc_node_lock(node);
281 link = node->links[bearer_id].link;
282 if (link)
283 tipc_link_delete(link);
284 tipc_node_unlock(node);
285 }
286 rcu_read_unlock();
287}
288
289/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. 261/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints.
290 * 262 *
291 * Give a newly added peer node the sequence number where it should 263 * Give a newly added peer node the sequence number where it should
@@ -875,26 +847,6 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
875 l->snd_nxt = seqno; 847 l->snd_nxt = seqno;
876} 848}
877 849
878void tipc_link_reset_all(struct tipc_node *node)
879{
880 char addr_string[16];
881 u32 i;
882
883 tipc_node_lock(node);
884
885 pr_warn("Resetting all links to %s\n",
886 tipc_addr_string_fill(addr_string, node->addr));
887
888 for (i = 0; i < MAX_BEARERS; i++) {
889 if (node->links[i].link) {
890 link_print(node->links[i].link, "Resetting link\n");
891 tipc_link_reset(node->links[i].link);
892 }
893 }
894
895 tipc_node_unlock(node);
896}
897
898static void link_retransmit_failure(struct tipc_link *l_ptr, 850static void link_retransmit_failure(struct tipc_link *l_ptr,
899 struct sk_buff *buf) 851 struct sk_buff *buf)
900{ 852{
@@ -911,7 +863,6 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
911 msg_errcode(msg)); 863 msg_errcode(msg));
912 pr_info("sqno %u, prev: %x, src: %x\n", 864 pr_info("sqno %u, prev: %x, src: %x\n",
913 msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg)); 865 msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
914 tipc_link_reset(l_ptr);
915 } else { 866 } else {
916 /* Handle failure on broadcast link */ 867 /* Handle failure on broadcast link */
917 struct tipc_node *n_ptr; 868 struct tipc_node *n_ptr;
@@ -987,6 +938,7 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
987 l->stale_count = 1; 938 l->stale_count = 1;
988 } else if (++l->stale_count > 100) { 939 } else if (++l->stale_count > 100) {
989 link_retransmit_failure(l, skb); 940 link_retransmit_failure(l, skb);
941 l->exec_mode = TIPC_LINK_BLOCKED;
990 return TIPC_LINK_DOWN_EVT; 942 return TIPC_LINK_DOWN_EVT;
991 } 943 }
992 skb_queue_walk(&l->transmq, skb) { 944 skb_queue_walk(&l->transmq, skb) {
@@ -1079,12 +1031,13 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
1079 * Consumes buffer 1031 * Consumes buffer
1080 * Node lock must be held 1032 * Node lock must be held
1081 */ 1033 */
1082static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb) 1034static int tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1083{ 1035{
1084 struct tipc_node *node = link->owner; 1036 struct tipc_node *node = link->owner;
1085 struct tipc_msg *msg = buf_msg(skb); 1037 struct tipc_msg *msg = buf_msg(skb);
1086 struct sk_buff *iskb; 1038 struct sk_buff *iskb;
1087 int pos = 0; 1039 int pos = 0;
1040 int rc = 0;
1088 1041
1089 switch (msg_user(msg)) { 1042 switch (msg_user(msg)) {
1090 case TUNNEL_PROTOCOL: 1043 case TUNNEL_PROTOCOL:
@@ -1094,7 +1047,8 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1094 kfree_skb(skb); 1047 kfree_skb(skb);
1095 break; 1048 break;
1096 } 1049 }
1097 if (!tipc_link_failover_rcv(link, &skb)) 1050 rc |= tipc_link_failover_rcv(link, &skb);
1051 if (!skb)
1098 break; 1052 break;
1099 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) { 1053 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
1100 tipc_data_input(link, skb); 1054 tipc_data_input(link, skb);
@@ -1113,7 +1067,8 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1113 link->stats.recv_fragmented++; 1067 link->stats.recv_fragmented++;
1114 tipc_data_input(link, skb); 1068 tipc_data_input(link, skb);
1115 } else if (!link->reasm_buf) { 1069 } else if (!link->reasm_buf) {
1116 tipc_link_reset(link); 1070 link->exec_mode = TIPC_LINK_BLOCKED;
1071 rc |= TIPC_LINK_DOWN_EVT;
1117 } 1072 }
1118 break; 1073 break;
1119 case BCAST_PROTOCOL: 1074 case BCAST_PROTOCOL:
@@ -1122,6 +1077,7 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1122 default: 1077 default:
1123 break; 1078 break;
1124 }; 1079 };
1080 return rc;
1125} 1081}
1126 1082
1127static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) 1083static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
@@ -1215,7 +1171,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1215 l->rcv_nxt++; 1171 l->rcv_nxt++;
1216 l->stats.recv_info++; 1172 l->stats.recv_info++;
1217 if (unlikely(!tipc_data_input(l, skb))) 1173 if (unlikely(!tipc_data_input(l, skb)))
1218 tipc_link_input(l, skb); 1174 rc |= tipc_link_input(l, skb);
1219 1175
1220 /* Ack at regular intervals */ 1176 /* Ack at regular intervals */
1221 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) { 1177 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
@@ -1504,14 +1460,15 @@ tunnel_queue:
1504/* tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet 1460/* tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet
1505 * Owner node is locked. 1461 * Owner node is locked.
1506 */ 1462 */
1507static bool tipc_link_failover_rcv(struct tipc_link *link, 1463static int tipc_link_failover_rcv(struct tipc_link *link,
1508 struct sk_buff **skb) 1464 struct sk_buff **skb)
1509{ 1465{
1510 struct tipc_msg *msg = buf_msg(*skb); 1466 struct tipc_msg *msg = buf_msg(*skb);
1511 struct sk_buff *iskb = NULL; 1467 struct sk_buff *iskb = NULL;
1512 struct tipc_link *pl = NULL; 1468 struct tipc_link *pl = NULL;
1513 int bearer_id = msg_bearer_id(msg); 1469 int bearer_id = msg_bearer_id(msg);
1514 int pos = 0; 1470 int pos = 0;
1471 int rc = 0;
1515 1472
1516 if (msg_type(msg) != FAILOVER_MSG) { 1473 if (msg_type(msg) != FAILOVER_MSG) {
1517 pr_warn("%sunknown tunnel pkt received\n", link_co_err); 1474 pr_warn("%sunknown tunnel pkt received\n", link_co_err);
@@ -1524,8 +1481,6 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
1524 goto exit; 1481 goto exit;
1525 1482
1526 pl = link->owner->links[bearer_id].link; 1483 pl = link->owner->links[bearer_id].link;
1527 if (pl && tipc_link_is_up(pl))
1528 tipc_link_reset(pl);
1529 1484
1530 if (link->failover_pkts == FIRST_FAILOVER) 1485 if (link->failover_pkts == FIRST_FAILOVER)
1531 link->failover_pkts = msg_msgcnt(msg); 1486 link->failover_pkts = msg_msgcnt(msg);
@@ -1550,14 +1505,18 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
1550 } 1505 }
1551 if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) { 1506 if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) {
1552 link->stats.recv_fragments++; 1507 link->stats.recv_fragments++;
1553 tipc_buf_append(&link->failover_skb, &iskb); 1508 if (!tipc_buf_append(&link->failover_skb, &iskb) &&
1509 !link->failover_skb) {
1510 link->exec_mode = TIPC_LINK_BLOCKED;
1511 rc |= TIPC_LINK_DOWN_EVT;
1512 }
1554 } 1513 }
1555exit: 1514exit:
1556 if (!link->failover_pkts && pl) 1515 if (!link->failover_pkts && pl)
1557 pl->exec_mode = TIPC_LINK_OPEN; 1516 pl->exec_mode = TIPC_LINK_OPEN;
1558 kfree_skb(*skb); 1517 kfree_skb(*skb);
1559 *skb = iskb; 1518 *skb = iskb;
1560 return *skb; 1519 return rc;
1561} 1520}
1562 1521
1563/* tipc_link_proto_rcv(): receive link level protocol message : 1522/* tipc_link_proto_rcv(): receive link level protocol message :
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 279196d6baac..bb1378b7cb59 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -212,8 +212,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n,
212 const struct tipc_media_addr *maddr, 212 const struct tipc_media_addr *maddr,
213 struct sk_buff_head *inputq, 213 struct sk_buff_head *inputq,
214 struct sk_buff_head *namedq); 214 struct sk_buff_head *namedq);
215void tipc_link_delete(struct tipc_link *link);
216void tipc_link_delete_list(struct net *net, unsigned int bearer_id);
217void tipc_link_failover_send_queue(struct tipc_link *l_ptr); 215void tipc_link_failover_send_queue(struct tipc_link *l_ptr);
218void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest); 216void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest);
219void tipc_link_reset_fragments(struct tipc_link *l_ptr); 217void tipc_link_reset_fragments(struct tipc_link *l_ptr);
@@ -221,7 +219,6 @@ int tipc_link_is_up(struct tipc_link *l_ptr);
221int tipc_link_is_active(struct tipc_link *l_ptr); 219int tipc_link_is_active(struct tipc_link *l_ptr);
222void tipc_link_purge_queues(struct tipc_link *l_ptr); 220void tipc_link_purge_queues(struct tipc_link *l_ptr);
223void tipc_link_purge_backlog(struct tipc_link *l); 221void tipc_link_purge_backlog(struct tipc_link *l);
224void tipc_link_reset_all(struct tipc_node *node);
225void tipc_link_reset(struct tipc_link *l_ptr); 222void tipc_link_reset(struct tipc_link *l_ptr);
226int __tipc_link_xmit(struct net *net, struct tipc_link *link, 223int __tipc_link_xmit(struct net *net, struct tipc_link *link,
227 struct sk_buff_head *list); 224 struct sk_buff_head *list);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 558df25a7fc6..6a0680ba98a9 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -407,6 +407,44 @@ bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b,
407 return true; 407 return true;
408} 408}
409 409
410void tipc_node_delete_links(struct net *net, int bearer_id)
411{
412 struct tipc_net *tn = net_generic(net, tipc_net_id);
413 struct tipc_link *l;
414 struct tipc_node *n;
415
416 rcu_read_lock();
417 list_for_each_entry_rcu(n, &tn->node_list, list) {
418 tipc_node_lock(n);
419 l = n->links[bearer_id].link;
420 if (l) {
421 tipc_link_reset(l);
422 n->links[bearer_id].link = NULL;
423 n->link_cnt--;
424 }
425 tipc_node_unlock(n);
426 kfree(l);
427 }
428 rcu_read_unlock();
429}
430
431static void tipc_node_reset_links(struct tipc_node *n)
432{
433 char addr_string[16];
434 u32 i;
435
436 tipc_node_lock(n);
437
438 pr_warn("Resetting all links to %s\n",
439 tipc_addr_string_fill(addr_string, n->addr));
440
441 for (i = 0; i < MAX_BEARERS; i++) {
442 if (n->links[i].link)
443 tipc_link_reset(n->links[i].link);
444 }
445 tipc_node_unlock(n);
446}
447
410void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) 448void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
411{ 449{
412 n_ptr->links[l_ptr->bearer_id].link = l_ptr; 450 n_ptr->links[l_ptr->bearer_id].link = l_ptr;
@@ -721,7 +759,7 @@ void tipc_node_unlock(struct tipc_node *node)
721 tipc_bclink_input(net); 759 tipc_bclink_input(net);
722 760
723 if (flags & TIPC_BCAST_RESET) 761 if (flags & TIPC_BCAST_RESET)
724 tipc_link_reset_all(node); 762 tipc_node_reset_links(node);
725} 763}
726 764
727/* Caller should hold node lock for the passed node */ 765/* Caller should hold node lock for the passed node */
@@ -836,6 +874,40 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
836 return 0; 874 return 0;
837} 875}
838 876
877/* tipc_node_tnl_init(): handle a received TUNNEL_PROTOCOL packet,
878 * in order to control parallel link failover or synchronization
879 */
880static void tipc_node_tnl_init(struct tipc_node *n, int bearer_id,
881 struct sk_buff *skb)
882{
883 struct tipc_link *tnl, *pl;
884 struct tipc_msg *hdr = buf_msg(skb);
885 u16 oseqno = msg_seqno(hdr);
886 int pb_id = msg_bearer_id(hdr);
887
888 if (pb_id >= MAX_BEARERS)
889 return;
890
891 tnl = n->links[bearer_id].link;
892 if (!tnl)
893 return;
894
895 /* Ignore if duplicate */
896 if (less(oseqno, tnl->rcv_nxt))
897 return;
898
899 pl = n->links[pb_id].link;
900 if (!pl)
901 return;
902
903 if (msg_type(hdr) == FAILOVER_MSG) {
904 if (tipc_link_is_up(pl)) {
905 tipc_link_reset(pl);
906 pl->exec_mode = TIPC_LINK_BLOCKED;
907 }
908 }
909}
910
839/** 911/**
840 * tipc_rcv - process TIPC packets/messages arriving from off-node 912 * tipc_rcv - process TIPC packets/messages arriving from off-node
841 * @net: the applicable net namespace 913 * @net: the applicable net namespace
@@ -854,6 +926,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
854 struct tipc_media_addr *maddr; 926 struct tipc_media_addr *maddr;
855 int bearer_id = b->identity; 927 int bearer_id = b->identity;
856 int rc = 0; 928 int rc = 0;
929 int usr;
857 930
858 __skb_queue_head_init(&xmitq); 931 __skb_queue_head_init(&xmitq);
859 932
@@ -863,8 +936,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
863 936
864 /* Handle arrival of a non-unicast link packet */ 937 /* Handle arrival of a non-unicast link packet */
865 hdr = buf_msg(skb); 938 hdr = buf_msg(skb);
939 usr = msg_user(hdr);
866 if (unlikely(msg_non_seq(hdr))) { 940 if (unlikely(msg_non_seq(hdr))) {
867 if (msg_user(hdr) == LINK_CONFIG) 941 if (usr == LINK_CONFIG)
868 tipc_disc_rcv(net, skb, b); 942 tipc_disc_rcv(net, skb, b);
869 else 943 else
870 tipc_bclink_rcv(net, skb); 944 tipc_bclink_rcv(net, skb);
@@ -877,6 +951,10 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
877 goto discard; 951 goto discard;
878 tipc_node_lock(n); 952 tipc_node_lock(n);
879 953
954 /* Prepare links for tunneled reception if applicable */
955 if (unlikely(usr == TUNNEL_PROTOCOL))
956 tipc_node_tnl_init(n, bearer_id, skb);
957
880 /* Locate link endpoint that should handle packet */ 958 /* Locate link endpoint that should handle packet */
881 l = n->links[bearer_id].link; 959 l = n->links[bearer_id].link;
882 if (unlikely(!l)) 960 if (unlikely(!l))
@@ -887,7 +965,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
887 if (!tipc_node_filter_skb(n, l, hdr)) 965 if (!tipc_node_filter_skb(n, l, hdr))
888 goto unlock; 966 goto unlock;
889 967
890 if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) 968 if (unlikely(usr == LINK_PROTOCOL))
891 tipc_bclink_sync_state(n, hdr); 969 tipc_bclink_sync_state(n, hdr);
892 970
893 /* Release acked broadcast messages */ 971 /* Release acked broadcast messages */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 5e7016802077..49df0e934a65 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -171,6 +171,7 @@ void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *bearer,
171 struct tipc_media_addr *maddr); 171 struct tipc_media_addr *maddr);
172bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *bearer, 172bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *bearer,
173 struct tipc_media_addr *maddr); 173 struct tipc_media_addr *maddr);
174void tipc_node_delete_links(struct net *net, int bearer_id);
174void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); 175void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
175void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr); 176void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
176void tipc_node_link_down(struct tipc_node *n_ptr, int bearer_id); 177void tipc_node_link_down(struct tipc_node *n_ptr, int bearer_id);