aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZach Brown <zach.brown@oracle.com>2010-05-18 18:48:51 -0400
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:15:17 -0400
commit3e0249f9c05cb77b66f7f09644ca9ca208d991a9 (patch)
tree2f8aaddb51590d36a54b8f7044224073107d77d9
parent89bf9d4158b5a1b6bd00960eb2e47601ec8cc138 (diff)
RDS/IB: add refcount tracking to struct rds_ib_device
The RDS IB client .remove callback used to free the rds_ibdev for the given device unconditionally. This could race other users of the struct. This patch adds refcounting so that we only free the rds_ibdev once all of its users are done. Many rds_ibdev users are tied to connections. We give the connection a reference and change these users to reference the device in the connection instead of looking it up in the IB client data. The only user of the IB client data remaining is the first lookup of the device as connections are built up. Incrementing the reference count of a device found in the IB client data could race with final freeing so we use an RCU grace period to make sure that freeing won't happen until those lookups are done. MRs need the rds_ibdev to get at the pool that they're freed in to. They exist outside a connection and many MRs can reference different devices from one socket, so it was natural to have each MR hold a reference. MR refs can be dropped from interrupt handlers and final device teardown can block so we push it off to a work struct. Pool teardown had to be fixed to cancel its pending work instead of deadlocking waiting for all queued work, including itself, to finish. MRs get their reference from the global device list, which gets a reference. It is left unprotected by locks and remains racy. A simple global lock would be a significant bottleneck. More scalable (complicated) locking should be done carefully in a later patch. Signed-off-by: Zach Brown <zach.brown@oracle.com>
-rw-r--r--net/rds/ib.c129
-rw-r--r--net/rds/ib.h4
-rw-r--r--net/rds/ib_cm.c33
-rw-r--r--net/rds/ib_rdma.c14
4 files changed, 125 insertions, 55 deletions
diff --git a/net/rds/ib.c b/net/rds/ib.c
index 7d289d7985fe..1732f8effb59 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -59,6 +59,38 @@ struct list_head rds_ib_devices;
59DEFINE_SPINLOCK(ib_nodev_conns_lock); 59DEFINE_SPINLOCK(ib_nodev_conns_lock);
60LIST_HEAD(ib_nodev_conns); 60LIST_HEAD(ib_nodev_conns);
61 61
62/*
63 * rds_ib_destroy_mr_pool() blocks on a few things and mrs drop references
64 * from interrupt context so we push freing off into a work struct in krdsd.
65 */
66static void rds_ib_dev_free(struct work_struct *work)
67{
68 struct rds_ib_ipaddr *i_ipaddr, *i_next;
69 struct rds_ib_device *rds_ibdev = container_of(work,
70 struct rds_ib_device, free_work);
71
72 if (rds_ibdev->mr_pool)
73 rds_ib_destroy_mr_pool(rds_ibdev->mr_pool);
74 if (rds_ibdev->mr)
75 ib_dereg_mr(rds_ibdev->mr);
76 if (rds_ibdev->pd)
77 ib_dealloc_pd(rds_ibdev->pd);
78
79 list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) {
80 list_del(&i_ipaddr->list);
81 kfree(i_ipaddr);
82 }
83
84 kfree(rds_ibdev);
85}
86
87void rds_ib_dev_put(struct rds_ib_device *rds_ibdev)
88{
89 BUG_ON(atomic_read(&rds_ibdev->refcount) <= 0);
90 if (atomic_dec_and_test(&rds_ibdev->refcount))
91 queue_work(rds_wq, &rds_ibdev->free_work);
92}
93
62void rds_ib_add_one(struct ib_device *device) 94void rds_ib_add_one(struct ib_device *device)
63{ 95{
64 struct rds_ib_device *rds_ibdev; 96 struct rds_ib_device *rds_ibdev;
@@ -77,11 +109,14 @@ void rds_ib_add_one(struct ib_device *device)
77 goto free_attr; 109 goto free_attr;
78 } 110 }
79 111
80 rds_ibdev = kmalloc_node(sizeof *rds_ibdev, GFP_KERNEL, ibdev_to_node(device)); 112 rds_ibdev = kzalloc_node(sizeof(struct rds_ib_device), GFP_KERNEL,
113 ibdev_to_node(device));
81 if (!rds_ibdev) 114 if (!rds_ibdev)
82 goto free_attr; 115 goto free_attr;
83 116
84 spin_lock_init(&rds_ibdev->spinlock); 117 spin_lock_init(&rds_ibdev->spinlock);
118 atomic_set(&rds_ibdev->refcount, 1);
119 INIT_WORK(&rds_ibdev->free_work, rds_ib_dev_free);
85 120
86 rds_ibdev->max_wrs = dev_attr->max_qp_wr; 121 rds_ibdev->max_wrs = dev_attr->max_qp_wr;
87 rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE); 122 rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE);
@@ -96,67 +131,93 @@ void rds_ib_add_one(struct ib_device *device)
96 131
97 rds_ibdev->dev = device; 132 rds_ibdev->dev = device;
98 rds_ibdev->pd = ib_alloc_pd(device); 133 rds_ibdev->pd = ib_alloc_pd(device);
99 if (IS_ERR(rds_ibdev->pd)) 134 if (IS_ERR(rds_ibdev->pd)) {
100 goto free_dev; 135 rds_ibdev->pd = NULL;
136 goto put_dev;
137 }
101 138
102 rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, 139 rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, IB_ACCESS_LOCAL_WRITE);
103 IB_ACCESS_LOCAL_WRITE); 140 if (IS_ERR(rds_ibdev->mr)) {
104 if (IS_ERR(rds_ibdev->mr)) 141 rds_ibdev->mr = NULL;
105 goto err_pd; 142 goto put_dev;
143 }
106 144
107 rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev); 145 rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev);
108 if (IS_ERR(rds_ibdev->mr_pool)) { 146 if (IS_ERR(rds_ibdev->mr_pool)) {
109 rds_ibdev->mr_pool = NULL; 147 rds_ibdev->mr_pool = NULL;
110 goto err_mr; 148 goto put_dev;
111 } 149 }
112 150
113 INIT_LIST_HEAD(&rds_ibdev->ipaddr_list); 151 INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
114 INIT_LIST_HEAD(&rds_ibdev->conn_list); 152 INIT_LIST_HEAD(&rds_ibdev->conn_list);
115 list_add_tail(&rds_ibdev->list, &rds_ib_devices); 153 list_add_tail(&rds_ibdev->list, &rds_ib_devices);
154 atomic_inc(&rds_ibdev->refcount);
116 155
117 ib_set_client_data(device, &rds_ib_client, rds_ibdev); 156 ib_set_client_data(device, &rds_ib_client, rds_ibdev);
157 atomic_inc(&rds_ibdev->refcount);
118 158
119 goto free_attr; 159put_dev:
120 160 rds_ib_dev_put(rds_ibdev);
121err_mr:
122 ib_dereg_mr(rds_ibdev->mr);
123err_pd:
124 ib_dealloc_pd(rds_ibdev->pd);
125free_dev:
126 kfree(rds_ibdev);
127free_attr: 161free_attr:
128 kfree(dev_attr); 162 kfree(dev_attr);
129} 163}
130 164
165/*
166 * New connections use this to find the device to associate with the
167 * connection. It's not in the fast path so we're not concerned about the
168 * performance of the IB call. (As of this writing, it uses an interrupt
169 * blocking spinlock to serialize walking a per-device list of all registered
170 * clients.)
171 *
172 * RCU is used to handle incoming connections racing with device teardown.
173 * Rather than use a lock to serialize removal from the client_data and
174 * getting a new reference, we use an RCU grace period. The destruction
175 * path removes the device from client_data and then waits for all RCU
176 * readers to finish.
177 *
178 * A new connection can get NULL from this if its arriving on a
179 * device that is in the process of being removed.
180 */
181struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device)
182{
183 struct rds_ib_device *rds_ibdev;
184
185 rcu_read_lock();
186 rds_ibdev = ib_get_client_data(device, &rds_ib_client);
187 if (rds_ibdev)
188 atomic_inc(&rds_ibdev->refcount);
189 rcu_read_unlock();
190 return rds_ibdev;
191}
192
193/*
194 * The IB stack is letting us know that a device is going away. This can
195 * happen if the underlying HCA driver is removed or if PCI hotplug is removing
196 * the pci function, for example.
197 *
198 * This can be called at any time and can be racing with any other RDS path.
199 */
131void rds_ib_remove_one(struct ib_device *device) 200void rds_ib_remove_one(struct ib_device *device)
132{ 201{
133 struct rds_ib_device *rds_ibdev; 202 struct rds_ib_device *rds_ibdev;
134 struct rds_ib_ipaddr *i_ipaddr, *i_next;
135 203
136 rds_ibdev = ib_get_client_data(device, &rds_ib_client); 204 rds_ibdev = ib_get_client_data(device, &rds_ib_client);
137 if (!rds_ibdev) 205 if (!rds_ibdev)
138 return; 206 return;
139 207
140 synchronize_rcu();
141 list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) {
142 list_del(&i_ipaddr->list);
143 kfree(i_ipaddr);
144 }
145
146 rds_ib_destroy_conns(rds_ibdev); 208 rds_ib_destroy_conns(rds_ibdev);
147 209
148 if (rds_ibdev->mr_pool) 210 /*
149 rds_ib_destroy_mr_pool(rds_ibdev->mr_pool); 211 * prevent future connection attempts from getting a reference to this
150 212 * device and wait for currently racing connection attempts to finish
151 ib_dereg_mr(rds_ibdev->mr); 213 * getting their reference
152 214 */
153 while (ib_dealloc_pd(rds_ibdev->pd)) { 215 ib_set_client_data(device, &rds_ib_client, NULL);
154 rdsdebug("Failed to dealloc pd %p\n", rds_ibdev->pd); 216 synchronize_rcu();
155 msleep(1); 217 rds_ib_dev_put(rds_ibdev);
156 }
157 218
158 list_del(&rds_ibdev->list); 219 list_del(&rds_ibdev->list);
159 kfree(rds_ibdev); 220 rds_ib_dev_put(rds_ibdev);
160} 221}
161 222
162struct ib_client rds_ib_client = { 223struct ib_client rds_ib_client = {
@@ -190,7 +251,7 @@ static int rds_ib_conn_info_visitor(struct rds_connection *conn,
190 rdma_addr_get_sgid(dev_addr, (union ib_gid *) &iinfo->src_gid); 251 rdma_addr_get_sgid(dev_addr, (union ib_gid *) &iinfo->src_gid);
191 rdma_addr_get_dgid(dev_addr, (union ib_gid *) &iinfo->dst_gid); 252 rdma_addr_get_dgid(dev_addr, (union ib_gid *) &iinfo->dst_gid);
192 253
193 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); 254 rds_ibdev = ic->rds_ibdev;
194 iinfo->max_send_wr = ic->i_send_ring.w_nr; 255 iinfo->max_send_wr = ic->i_send_ring.w_nr;
195 iinfo->max_recv_wr = ic->i_recv_ring.w_nr; 256 iinfo->max_recv_wr = ic->i_recv_ring.w_nr;
196 iinfo->max_send_sge = rds_ibdev->max_sge; 257 iinfo->max_send_sge = rds_ibdev->max_sge;
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 4bc3e2fba25a..282ec69fe282 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -167,6 +167,8 @@ struct rds_ib_device {
167 unsigned int max_initiator_depth; 167 unsigned int max_initiator_depth;
168 unsigned int max_responder_resources; 168 unsigned int max_responder_resources;
169 spinlock_t spinlock; /* protect the above */ 169 spinlock_t spinlock; /* protect the above */
170 atomic_t refcount;
171 struct work_struct free_work;
170}; 172};
171 173
172#define pcidev_to_node(pcidev) pcibus_to_node(pcidev->bus) 174#define pcidev_to_node(pcidev) pcibus_to_node(pcidev->bus)
@@ -251,6 +253,8 @@ static inline void rds_ib_dma_sync_sg_for_device(struct ib_device *dev,
251extern struct rds_transport rds_ib_transport; 253extern struct rds_transport rds_ib_transport;
252extern void rds_ib_add_one(struct ib_device *device); 254extern void rds_ib_add_one(struct ib_device *device);
253extern void rds_ib_remove_one(struct ib_device *device); 255extern void rds_ib_remove_one(struct ib_device *device);
256struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device);
257void rds_ib_dev_put(struct rds_ib_device *rds_ibdev);
254extern struct ib_client rds_ib_client; 258extern struct ib_client rds_ib_client;
255 259
256extern unsigned int fmr_pool_size; 260extern unsigned int fmr_pool_size;
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 73253f7c1fa3..a9fb917c00bb 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -95,7 +95,6 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
95{ 95{
96 const struct rds_ib_connect_private *dp = NULL; 96 const struct rds_ib_connect_private *dp = NULL;
97 struct rds_ib_connection *ic = conn->c_transport_data; 97 struct rds_ib_connection *ic = conn->c_transport_data;
98 struct rds_ib_device *rds_ibdev;
99 struct ib_qp_attr qp_attr; 98 struct ib_qp_attr qp_attr;
100 int err; 99 int err;
101 100
@@ -145,12 +144,11 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
145 if (err) 144 if (err)
146 printk(KERN_NOTICE "ib_modify_qp(IB_QP_STATE, RTS): err=%d\n", err); 145 printk(KERN_NOTICE "ib_modify_qp(IB_QP_STATE, RTS): err=%d\n", err);
147 146
148 /* update ib_device with this local ipaddr & conn */ 147 /* update ib_device with this local ipaddr */
149 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); 148 err = rds_ib_update_ipaddr(ic->rds_ibdev, conn->c_laddr);
150 err = rds_ib_update_ipaddr(rds_ibdev, conn->c_laddr);
151 if (err) 149 if (err)
152 printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n", err); 150 printk(KERN_ERR "rds_ib_update_ipaddr failed (%d)\n",
153 rds_ib_add_conn(rds_ibdev, conn); 151 err);
154 152
155 /* If the peer gave us the last packet it saw, process this as if 153 /* If the peer gave us the last packet it saw, process this as if
156 * we had received a regular ACK. */ 154 * we had received a regular ACK. */
@@ -168,12 +166,10 @@ static void rds_ib_cm_fill_conn_param(struct rds_connection *conn,
168 u32 max_initiator_depth) 166 u32 max_initiator_depth)
169{ 167{
170 struct rds_ib_connection *ic = conn->c_transport_data; 168 struct rds_ib_connection *ic = conn->c_transport_data;
171 struct rds_ib_device *rds_ibdev; 169 struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
172 170
173 memset(conn_param, 0, sizeof(struct rdma_conn_param)); 171 memset(conn_param, 0, sizeof(struct rdma_conn_param));
174 172
175 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
176
177 conn_param->responder_resources = 173 conn_param->responder_resources =
178 min_t(u32, rds_ibdev->max_responder_resources, max_responder_resources); 174 min_t(u32, rds_ibdev->max_responder_resources, max_responder_resources);
179 conn_param->initiator_depth = 175 conn_param->initiator_depth =
@@ -241,18 +237,16 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
241 struct rds_ib_device *rds_ibdev; 237 struct rds_ib_device *rds_ibdev;
242 int ret; 238 int ret;
243 239
244 /* rds_ib_add_one creates a rds_ib_device object per IB device, 240 /*
245 * and allocates a protection domain, memory range and FMR pool 241 * It's normal to see a null device if an incoming connection races
246 * for each. If that fails for any reason, it will not register 242 * with device removal, so we don't print a warning.
247 * the rds_ibdev at all.
248 */ 243 */
249 rds_ibdev = ib_get_client_data(dev, &rds_ib_client); 244 rds_ibdev = rds_ib_get_client_data(dev);
250 if (!rds_ibdev) { 245 if (!rds_ibdev)
251 if (printk_ratelimit())
252 printk(KERN_NOTICE "RDS/IB: No client_data for device %s\n",
253 dev->name);
254 return -EOPNOTSUPP; 246 return -EOPNOTSUPP;
255 } 247
248 /* add the conn now so that connection establishment has the dev */
249 rds_ib_add_conn(rds_ibdev, conn);
256 250
257 if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1) 251 if (rds_ibdev->max_wrs < ic->i_send_ring.w_nr + 1)
258 rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1); 252 rds_ib_ring_resize(&ic->i_send_ring, rds_ibdev->max_wrs - 1);
@@ -371,6 +365,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
371 ic->i_send_cq, ic->i_recv_cq); 365 ic->i_send_cq, ic->i_recv_cq);
372 366
373out: 367out:
368 rds_ib_dev_put(rds_ibdev);
374 return ret; 369 return ret;
375} 370}
376 371
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 4ba01b9ffd44..64b5ede037c8 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -87,6 +87,7 @@ static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
87 rcu_read_lock(); 87 rcu_read_lock();
88 list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) { 88 list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) {
89 if (i_ipaddr->ipaddr == ipaddr) { 89 if (i_ipaddr->ipaddr == ipaddr) {
90 atomic_inc(&rds_ibdev->refcount);
90 rcu_read_unlock(); 91 rcu_read_unlock();
91 return rds_ibdev; 92 return rds_ibdev;
92 } 93 }
@@ -141,8 +142,10 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr)
141 struct rds_ib_device *rds_ibdev_old; 142 struct rds_ib_device *rds_ibdev_old;
142 143
143 rds_ibdev_old = rds_ib_get_device(ipaddr); 144 rds_ibdev_old = rds_ib_get_device(ipaddr);
144 if (rds_ibdev_old) 145 if (rds_ibdev_old) {
145 rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr); 146 rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr);
147 rds_ib_dev_put(rds_ibdev_old);
148 }
146 149
147 return rds_ib_add_ipaddr(rds_ibdev, ipaddr); 150 return rds_ib_add_ipaddr(rds_ibdev, ipaddr);
148} 151}
@@ -163,6 +166,7 @@ void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *con
163 spin_unlock_irq(&ib_nodev_conns_lock); 166 spin_unlock_irq(&ib_nodev_conns_lock);
164 167
165 ic->rds_ibdev = rds_ibdev; 168 ic->rds_ibdev = rds_ibdev;
169 atomic_inc(&rds_ibdev->refcount);
166} 170}
167 171
168void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn) 172void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn)
@@ -182,6 +186,7 @@ void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *
182 spin_unlock(&ib_nodev_conns_lock); 186 spin_unlock(&ib_nodev_conns_lock);
183 187
184 ic->rds_ibdev = NULL; 188 ic->rds_ibdev = NULL;
189 rds_ib_dev_put(rds_ibdev);
185} 190}
186 191
187void __rds_ib_destroy_conns(struct list_head *list, spinlock_t *list_lock) 192void __rds_ib_destroy_conns(struct list_head *list, spinlock_t *list_lock)
@@ -240,7 +245,7 @@ void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co
240 245
241void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool) 246void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
242{ 247{
243 flush_workqueue(rds_wq); 248 cancel_work_sync(&pool->flush_worker);
244 rds_ib_flush_mr_pool(pool, 1); 249 rds_ib_flush_mr_pool(pool, 1);
245 WARN_ON(atomic_read(&pool->item_count)); 250 WARN_ON(atomic_read(&pool->item_count));
246 WARN_ON(atomic_read(&pool->free_pinned)); 251 WARN_ON(atomic_read(&pool->free_pinned));
@@ -597,6 +602,8 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
597 queue_work(rds_wq, &pool->flush_worker); 602 queue_work(rds_wq, &pool->flush_worker);
598 } 603 }
599 } 604 }
605
606 rds_ib_dev_put(rds_ibdev);
600} 607}
601 608
602void rds_ib_flush_mrs(void) 609void rds_ib_flush_mrs(void)
@@ -640,6 +647,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
640 printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret); 647 printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret);
641 648
642 ibmr->device = rds_ibdev; 649 ibmr->device = rds_ibdev;
650 rds_ibdev = NULL;
643 651
644 out: 652 out:
645 if (ret) { 653 if (ret) {
@@ -647,5 +655,7 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
647 rds_ib_free_mr(ibmr, 0); 655 rds_ib_free_mr(ibmr, 0);
648 ibmr = ERR_PTR(ret); 656 ibmr = ERR_PTR(ret);
649 } 657 }
658 if (rds_ibdev)
659 rds_ib_dev_put(rds_ibdev);
650 return ibmr; 660 return ibmr;
651} 661}