aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/handler.c
blob: 0c70010a7dfe9671ab9f019940354de1de5c1672 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
 * net/tipc/handler.c: TIPC signal handling
 *
 * Copyright (c) 2000-2006, Ericsson AB
 * Copyright (c) 2005, Wind River Systems
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
 *
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "core.h"

struct queue_item {
	struct list_head next_signal;
	void (*handler) (unsigned long);
	unsigned long data;
};

static struct kmem_cache *tipc_queue_item_cache;
static struct list_head signal_queue_head;
static DEFINE_SPINLOCK(qitem_lock);
static int handler_enabled = 0;

static void process_signal_queue(unsigned long dummy);

static DECLARE_TASKLET_DISABLED(tipc_tasklet, process_signal_queue, 0);


unsigned int tipc_k_signal(Handler routine, unsigned long argument)
{
	struct queue_item *item;

	if (!handler_enabled) {
		err("Signal request ignored by handler\n");
		return -ENOPROTOOPT;
	}

	spin_lock_bh(&qitem_lock);
	item = kmem_cache_alloc(tipc_queue_item_cache, GFP_ATOMIC);
	if (!item) {
		err("Signal queue out of memory\n");
		spin_unlock_bh(&qitem_lock);
		return -ENOMEM;
	}
	item->handler = routine;
	item->data = argument;
	list_add_tail(&item->next_signal, &signal_queue_head);
	spin_unlock_bh(&qitem_lock);
	tasklet_schedule(&tipc_tasklet);
	return 0;
}

static void process_signal_queue(unsigned long dummy)
{
	struct queue_item *__volatile__ item;
	struct list_head *l, *n;

	spin_lock_bh(&qitem_lock);
	list_for_each_safe(l, n, &signal_queue_head) {
		item = list_entry(l, struct queue_item, next_signal);
		list_del(&item->next_signal);
		spin_unlock_bh(&qitem_lock);
		item->handler(item->data);
		spin_lock_bh(&qitem_lock);
		kmem_cache_free(tipc_queue_item_cache, item);
	}
	spin_unlock_bh(&qitem_lock);
}

int tipc_handler_start(void)
{
	tipc_queue_item_cache =
		kmem_cache_create("tipc_queue_items", sizeof(struct queue_item),
				  0, SLAB_HWCACHE_ALIGN, NULL);
	if (!tipc_queue_item_cache)
		return -ENOMEM;

	INIT_LIST_HEAD(&signal_queue_head);
	tasklet_enable(&tipc_tasklet);
	handler_enabled = 1;
	return 0;
}

void tipc_handler_stop(void)
{
	struct list_head *l, *n;
	struct queue_item *item;

	if (!handler_enabled)
		return;

	handler_enabled = 0;
	tasklet_disable(&tipc_tasklet);
	tasklet_kill(&tipc_tasklet);

	spin_lock_bh(&qitem_lock);
	list_for_each_safe(l, n, &signal_queue_head) {
		item = list_entry(l, struct queue_item, next_signal);
		list_del(&item->next_signal);
		kmem_cache_free(tipc_queue_item_cache, item);
	}
	spin_unlock_bh(&qitem_lock);

	kmem_cache_destroy(tipc_queue_item_cache);
}

'>671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920
/*
 * Copyright (c) 2006 Oracle.  All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the
 * OpenIB.org BSD license below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 */
#include <linux/kernel.h>
#include <linux/slab.h>
#include <linux/pci.h>
#include <linux/dma-mapping.h>
#include <rdma/rdma_cm.h>

#include "rds.h"
#include "iw.h"

static struct kmem_cache *rds_iw_incoming_slab;
static struct kmem_cache *rds_iw_frag_slab;
static atomic_t	rds_iw_allocation = ATOMIC_INIT(0);

static void rds_iw_frag_drop_page(struct rds_page_frag *frag)
{
	rdsdebug("frag %p page %p\n", frag, frag->f_page);
	__free_page(frag->f_page);
	frag->f_page = NULL;
}

static void rds_iw_frag_free(struct rds_page_frag *frag)
{
	rdsdebug("frag %p page %p\n", frag, frag->f_page);
	BUG_ON(frag->f_page != NULL);
	kmem_cache_free(rds_iw_frag_slab, frag);
}

/*
 * We map a page at a time.  Its fragments are posted in order.  This
 * is called in fragment order as the fragments get send completion events.
 * Only the last frag in the page performs the unmapping.
 *
 * It's OK for ring cleanup to call this in whatever order it likes because
 * DMA is not in flight and so we can unmap while other ring entries still
 * hold page references in their frags.
 */
static void rds_iw_recv_unmap_page(struct rds_iw_connection *ic,
				   struct rds_iw_recv_work *recv)
{
	struct rds_page_frag *frag = recv->r_frag;

	rdsdebug("recv %p frag %p page %p\n", recv, frag, frag->f_page);
	if (frag->f_mapped)
		ib_dma_unmap_page(ic->i_cm_id->device,
			       frag->f_mapped,
			       RDS_FRAG_SIZE, DMA_FROM_DEVICE);
	frag->f_mapped = 0;
}

void rds_iw_recv_init_ring(struct rds_iw_connection *ic)
{
	struct rds_iw_recv_work *recv;
	u32 i;

	for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
		struct ib_sge *sge;

		recv->r_iwinc = NULL;
		recv->r_frag = NULL;

		recv->r_wr.next = NULL;
		recv->r_wr.wr_id = i;
		recv->r_wr.sg_list = recv->r_sge;
		recv->r_wr.num_sge = RDS_IW_RECV_SGE;

		sge = rds_iw_data_sge(ic, recv->r_sge);
		sge->addr = 0;
		sge->length = RDS_FRAG_SIZE;
		sge->lkey = 0;

		sge = rds_iw_header_sge(ic, recv->r_sge);
		sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
		sge->length = sizeof(struct rds_header);
		sge->lkey = 0;
	}
}

static void rds_iw_recv_clear_one(struct rds_iw_connection *ic,
				  struct rds_iw_recv_work *recv)
{
	if (recv->r_iwinc) {
		rds_inc_put(&recv->r_iwinc->ii_inc);
		recv->r_iwinc = NULL;
	}
	if (recv->r_frag) {
		rds_iw_recv_unmap_page(ic, recv);
		if (recv->r_frag->f_page)
			rds_iw_frag_drop_page(recv->r_frag);
		rds_iw_frag_free(recv->r_frag);
		recv->r_frag = NULL;
	}
}

void rds_iw_recv_clear_ring(struct rds_iw_connection *ic)
{
	u32 i;

	for (i = 0; i < ic->i_recv_ring.w_nr; i++)
		rds_iw_recv_clear_one(ic, &ic->i_recvs[i]);

	if (ic->i_frag.f_page)
		rds_iw_frag_drop_page(&ic->i_frag);
}

static int rds_iw_recv_refill_one(struct rds_connection *conn,
				  struct rds_iw_recv_work *recv,
				  gfp_t kptr_gfp, gfp_t page_gfp)
{
	struct rds_iw_connection *ic = conn->c_transport_data;
	dma_addr_t dma_addr;
	struct ib_sge *sge;
	int ret = -ENOMEM;

	if (recv->r_iwinc == NULL) {
		if (!atomic_add_unless(&rds_iw_allocation, 1, rds_iw_sysctl_max_recv_allocation)) {
			rds_iw_stats_inc(s_iw_rx_alloc_limit);
			goto out;
		}
		recv->r_iwinc = kmem_cache_alloc(rds_iw_incoming_slab,
						 kptr_gfp);
		if (recv->r_iwinc == NULL) {
			atomic_dec(&rds_iw_allocation);
			goto out;
		}
		INIT_LIST_HEAD(&recv->r_iwinc->ii_frags);
		rds_inc_init(&recv->r_iwinc->ii_inc, conn, conn->c_faddr);
	}

	if (recv->r_frag == NULL) {
		recv->r_frag = kmem_cache_alloc(rds_iw_frag_slab, kptr_gfp);
		if (recv->r_frag == NULL)
			goto out;
		INIT_LIST_HEAD(&recv->r_frag->f_item);
		recv->r_frag->f_page = NULL;
	}

	if (ic->i_frag.f_page == NULL) {
		ic->i_frag.f_page = alloc_page(page_gfp);
		if (ic->i_frag.f_page == NULL)
			goto out;
		ic->i_frag.f_offset = 0;
	}

	dma_addr = ib_dma_map_page(ic->i_cm_id->device,
				  ic->i_frag.f_page,
				  ic->i_frag.f_offset,
				  RDS_FRAG_SIZE,
				  DMA_FROM_DEVICE);
	if (ib_dma_mapping_error(ic->i_cm_id->device, dma_addr))
		goto out;

	/*
	 * Once we get the RDS_PAGE_LAST_OFF frag then rds_iw_frag_unmap()
	 * must be called on this recv.  This happens as completions hit
	 * in order or on connection shutdown.
	 */
	recv->r_frag->f_page = ic->i_frag.f_page;
	recv->r_frag->f_offset = ic->i_frag.f_offset;
	recv->r_frag->f_mapped = dma_addr;

	sge = rds_iw_data_sge(ic, recv->r_sge);
	sge->addr = dma_addr;
	sge->length = RDS_FRAG_SIZE;

	sge = rds_iw_header_sge(ic, recv->r_sge);
	sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
	sge->length = sizeof(struct rds_header);

	get_page(recv->r_frag->f_page);

	if (ic->i_frag.f_offset < RDS_PAGE_LAST_OFF) {
		ic->i_frag.f_offset += RDS_FRAG_SIZE;
	} else {
		put_page(ic->i_frag.f_page);
		ic->i_frag.f_page = NULL;
		ic->i_frag.f_offset = 0;
	}

	ret = 0;
out:
	return ret;
}

/*
 * This tries to allocate and post unused work requests after making sure that
 * they have all the allocations they need to queue received fragments into
 * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
 * pairs don't go unmatched.
 *
 * -1 is returned if posting fails due to temporary resource exhaustion.
 */
int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
		       gfp_t page_gfp, int prefill)
{
	struct rds_iw_connection *ic = conn->c_transport_data;
	struct rds_iw_recv_work *recv;
	struct ib_recv_wr *failed_wr;
	unsigned int posted = 0;
	int ret = 0;
	u32 pos;

	while ((prefill || rds_conn_up(conn)) &&
	       rds_iw_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
		if (pos >= ic->i_recv_ring.w_nr) {
			printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
					pos);
			ret = -EINVAL;
			break;
		}

		recv = &ic->i_recvs[pos];
		ret = rds_iw_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
		if (ret) {
			ret = -1;
			break;
		}

		/* XXX when can this fail? */
		ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
		rdsdebug("recv %p iwinc %p page %p addr %lu ret %d\n", recv,
			 recv->r_iwinc, recv->r_frag->f_page,
			 (long) recv->r_frag->f_mapped, ret);
		if (ret) {
			rds_iw_conn_error(conn, "recv post on "
			       "%pI4 returned %d, disconnecting and "
			       "reconnecting\n", &conn->c_faddr,
			       ret);
			ret = -1;
			break;
		}

		posted++;
	}

	/* We're doing flow control - update the window. */
	if (ic->i_flowctl && posted)
		rds_iw_advertise_credits(conn, posted);

	if (ret)
		rds_iw_ring_unalloc(&ic->i_recv_ring, 1);
	return ret;
}

void rds_iw_inc_purge(struct rds_incoming *inc)
{
	struct rds_iw_incoming *iwinc;
	struct rds_page_frag *frag;
	struct rds_page_frag *pos;

	iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
	rdsdebug("purging iwinc %p inc %p\n", iwinc, inc);

	list_for_each_entry_safe(frag, pos, &iwinc->ii_frags, f_item) {
		list_del_init(&frag->f_item);
		rds_iw_frag_drop_page(frag);
		rds_iw_frag_free(frag);
	}
}

void rds_iw_inc_free(struct rds_incoming *inc)
{
	struct rds_iw_incoming *iwinc;

	iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);

	rds_iw_inc_purge(inc);
	rdsdebug("freeing iwinc %p inc %p\n", iwinc, inc);
	BUG_ON(!list_empty(&iwinc->ii_frags));