aboutsummaryrefslogtreecommitdiffstats
path: root/fs/cifs/smbdirect.c
diff options
context:
space:
mode:
authorLong Li <longli@microsoft.com>2017-11-22 19:38:44 -0500
committerSteve French <smfrench@gmail.com>2018-01-24 20:49:06 -0500
commitc7398583340a6d82b8bb7f7f21edcde27dc6a898 (patch)
tree051a58595ea6dc336697fa5b0383d69f8e61099d /fs/cifs/smbdirect.c
parent9762c2d080926b7c292cb7c64ca6030e88d6a6e4 (diff)
CIFS: SMBD: Implement RDMA memory registration
Memory registration is used for transferring payload via RDMA read or write. After I/O is done, memory registrations are recovered and reused. This process can be time consuming and is done in a work queue. Signed-off-by: Long Li <longli@microsoft.com> Reviewed-by: Pavel Shilovsky <pshilov@microsoft.com> Reviewed-by: Ronnie Sahlberg <lsahlber@redhat.com> Signed-off-by: Steve French <smfrench@gmail.com>
Diffstat (limited to 'fs/cifs/smbdirect.c')
-rw-r--r--fs/cifs/smbdirect.c421
1 files changed, 421 insertions, 0 deletions
diff --git a/fs/cifs/smbdirect.c b/fs/cifs/smbdirect.c
index 3351873db93f..731577d4317f 100644
--- a/fs/cifs/smbdirect.c
+++ b/fs/cifs/smbdirect.c
@@ -48,6 +48,9 @@ static int smbd_post_send_page(struct smbd_connection *info,
48 struct page *page, unsigned long offset, 48 struct page *page, unsigned long offset,
49 size_t size, int remaining_data_length); 49 size_t size, int remaining_data_length);
50 50
51static void destroy_mr_list(struct smbd_connection *info);
52static int allocate_mr_list(struct smbd_connection *info);
53
51/* SMBD version number */ 54/* SMBD version number */
52#define SMBD_V1 0x0100 55#define SMBD_V1 0x0100
53 56
@@ -198,6 +201,12 @@ static void smbd_destroy_rdma_work(struct work_struct *work)
198 wait_event(info->wait_send_payload_pending, 201 wait_event(info->wait_send_payload_pending,
199 atomic_read(&info->send_payload_pending) == 0); 202 atomic_read(&info->send_payload_pending) == 0);
200 203
204 log_rdma_event(INFO, "freeing mr list\n");
205 wake_up_interruptible_all(&info->wait_mr);
206 wait_event(info->wait_for_mr_cleanup,
207 atomic_read(&info->mr_used_count) == 0);
208 destroy_mr_list(info);
209
201 /* It's not posssible for upper layer to get to reassembly */ 210 /* It's not posssible for upper layer to get to reassembly */
202 log_rdma_event(INFO, "drain the reassembly queue\n"); 211 log_rdma_event(INFO, "drain the reassembly queue\n");
203 do { 212 do {
@@ -453,6 +462,16 @@ static bool process_negotiation_response(
453 } 462 }
454 info->max_fragmented_send_size = 463 info->max_fragmented_send_size =
455 le32_to_cpu(packet->max_fragmented_size); 464 le32_to_cpu(packet->max_fragmented_size);
465 info->rdma_readwrite_threshold =
466 rdma_readwrite_threshold > info->max_fragmented_send_size ?
467 info->max_fragmented_send_size :
468 rdma_readwrite_threshold;
469
470
471 info->max_readwrite_size = min_t(u32,
472 le32_to_cpu(packet->max_readwrite_size),
473 info->max_frmr_depth * PAGE_SIZE);
474 info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
456 475
457 return true; 476 return true;
458} 477}
@@ -748,6 +767,12 @@ static int smbd_ia_open(
748 rc = -EPROTONOSUPPORT; 767 rc = -EPROTONOSUPPORT;
749 goto out2; 768 goto out2;
750 } 769 }
770 info->max_frmr_depth = min_t(int,
771 smbd_max_frmr_depth,
772 info->id->device->attrs.max_fast_reg_page_list_len);
773 info->mr_type = IB_MR_TYPE_MEM_REG;
774 if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
775 info->mr_type = IB_MR_TYPE_SG_GAPS;
751 776
752 info->pd = ib_alloc_pd(info->id->device, 0); 777 info->pd = ib_alloc_pd(info->id->device, 0);
753 if (IS_ERR(info->pd)) { 778 if (IS_ERR(info->pd)) {
@@ -1582,6 +1607,8 @@ struct smbd_connection *_smbd_get_connection(
1582 struct rdma_conn_param conn_param; 1607 struct rdma_conn_param conn_param;
1583 struct ib_qp_init_attr qp_attr; 1608 struct ib_qp_init_attr qp_attr;
1584 struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr; 1609 struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1610 struct ib_port_immutable port_immutable;
1611 u32 ird_ord_hdr[2];
1585 1612
1586 info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL); 1613 info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1587 if (!info) 1614 if (!info)
@@ -1670,6 +1697,28 @@ struct smbd_connection *_smbd_get_connection(
1670 memset(&conn_param, 0, sizeof(conn_param)); 1697 memset(&conn_param, 0, sizeof(conn_param));
1671 conn_param.initiator_depth = 0; 1698 conn_param.initiator_depth = 0;
1672 1699
1700 conn_param.responder_resources =
1701 info->id->device->attrs.max_qp_rd_atom
1702 < SMBD_CM_RESPONDER_RESOURCES ?
1703 info->id->device->attrs.max_qp_rd_atom :
1704 SMBD_CM_RESPONDER_RESOURCES;
1705 info->responder_resources = conn_param.responder_resources;
1706 log_rdma_mr(INFO, "responder_resources=%d\n",
1707 info->responder_resources);
1708
1709 /* Need to send IRD/ORD in private data for iWARP */
1710 info->id->device->get_port_immutable(
1711 info->id->device, info->id->port_num, &port_immutable);
1712 if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1713 ird_ord_hdr[0] = info->responder_resources;
1714 ird_ord_hdr[1] = 1;
1715 conn_param.private_data = ird_ord_hdr;
1716 conn_param.private_data_len = sizeof(ird_ord_hdr);
1717 } else {
1718 conn_param.private_data = NULL;
1719 conn_param.private_data_len = 0;
1720 }
1721
1673 conn_param.retry_count = SMBD_CM_RETRY; 1722 conn_param.retry_count = SMBD_CM_RETRY;
1674 conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY; 1723 conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1675 conn_param.flow_control = 0; 1724 conn_param.flow_control = 0;
@@ -1734,8 +1783,19 @@ struct smbd_connection *_smbd_get_connection(
1734 goto negotiation_failed; 1783 goto negotiation_failed;
1735 } 1784 }
1736 1785
1786 rc = allocate_mr_list(info);
1787 if (rc) {
1788 log_rdma_mr(ERR, "memory registration allocation failed\n");
1789 goto allocate_mr_failed;
1790 }
1791
1737 return info; 1792 return info;
1738 1793
1794allocate_mr_failed:
1795 /* At this point, need to a full transport shutdown */
1796 smbd_destroy(info);
1797 return NULL;
1798
1739negotiation_failed: 1799negotiation_failed:
1740 cancel_delayed_work_sync(&info->idle_timer_work); 1800 cancel_delayed_work_sync(&info->idle_timer_work);
1741 destroy_caches_and_workqueue(info); 1801 destroy_caches_and_workqueue(info);
@@ -2189,3 +2249,364 @@ done:
2189 2249
2190 return rc; 2250 return rc;
2191} 2251}
2252
2253static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2254{
2255 struct smbd_mr *mr;
2256 struct ib_cqe *cqe;
2257
2258 if (wc->status) {
2259 log_rdma_mr(ERR, "status=%d\n", wc->status);
2260 cqe = wc->wr_cqe;
2261 mr = container_of(cqe, struct smbd_mr, cqe);
2262 smbd_disconnect_rdma_connection(mr->conn);
2263 }
2264}
2265
2266/*
2267 * The work queue function that recovers MRs
2268 * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2269 * again. Both calls are slow, so finish them in a workqueue. This will not
2270 * block I/O path.
2271 * There is one workqueue that recovers MRs, there is no need to lock as the
2272 * I/O requests calling smbd_register_mr will never update the links in the
2273 * mr_list.
2274 */
2275static void smbd_mr_recovery_work(struct work_struct *work)
2276{
2277 struct smbd_connection *info =
2278 container_of(work, struct smbd_connection, mr_recovery_work);
2279 struct smbd_mr *smbdirect_mr;
2280 int rc;
2281
2282 list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2283 if (smbdirect_mr->state == MR_INVALIDATED ||
2284 smbdirect_mr->state == MR_ERROR) {
2285
2286 if (smbdirect_mr->state == MR_INVALIDATED) {
2287 ib_dma_unmap_sg(
2288 info->id->device, smbdirect_mr->sgl,
2289 smbdirect_mr->sgl_count,
2290 smbdirect_mr->dir);
2291 smbdirect_mr->state = MR_READY;
2292 } else if (smbdirect_mr->state == MR_ERROR) {
2293
2294 /* recover this MR entry */
2295 rc = ib_dereg_mr(smbdirect_mr->mr);
2296 if (rc) {
2297 log_rdma_mr(ERR,
2298 "ib_dereg_mr faield rc=%x\n",
2299 rc);
2300 smbd_disconnect_rdma_connection(info);
2301 }
2302
2303 smbdirect_mr->mr = ib_alloc_mr(
2304 info->pd, info->mr_type,
2305 info->max_frmr_depth);
2306 if (IS_ERR(smbdirect_mr->mr)) {
2307 log_rdma_mr(ERR,
2308 "ib_alloc_mr failed mr_type=%x "
2309 "max_frmr_depth=%x\n",
2310 info->mr_type,
2311 info->max_frmr_depth);
2312 smbd_disconnect_rdma_connection(info);
2313 }
2314
2315 smbdirect_mr->state = MR_READY;
2316 }
2317 /* smbdirect_mr->state is updated by this function
2318 * and is read and updated by I/O issuing CPUs trying
2319 * to get a MR, the call to atomic_inc_return
2320 * implicates a memory barrier and guarantees this
2321 * value is updated before waking up any calls to
2322 * get_mr() from the I/O issuing CPUs
2323 */
2324 if (atomic_inc_return(&info->mr_ready_count) == 1)
2325 wake_up_interruptible(&info->wait_mr);
2326 }
2327 }
2328}
2329
2330static void destroy_mr_list(struct smbd_connection *info)
2331{
2332 struct smbd_mr *mr, *tmp;
2333
2334 cancel_work_sync(&info->mr_recovery_work);
2335 list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2336 if (mr->state == MR_INVALIDATED)
2337 ib_dma_unmap_sg(info->id->device, mr->sgl,
2338 mr->sgl_count, mr->dir);
2339 ib_dereg_mr(mr->mr);
2340 kfree(mr->sgl);
2341 kfree(mr);
2342 }
2343}
2344
2345/*
2346 * Allocate MRs used for RDMA read/write
2347 * The number of MRs will not exceed hardware capability in responder_resources
2348 * All MRs are kept in mr_list. The MR can be recovered after it's used
2349 * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2350 * as MRs are used and recovered for I/O, but the list links will not change
2351 */
2352static int allocate_mr_list(struct smbd_connection *info)
2353{
2354 int i;
2355 struct smbd_mr *smbdirect_mr, *tmp;
2356
2357 INIT_LIST_HEAD(&info->mr_list);
2358 init_waitqueue_head(&info->wait_mr);
2359 spin_lock_init(&info->mr_list_lock);
2360 atomic_set(&info->mr_ready_count, 0);
2361 atomic_set(&info->mr_used_count, 0);
2362 init_waitqueue_head(&info->wait_for_mr_cleanup);
2363 /* Allocate more MRs (2x) than hardware responder_resources */
2364 for (i = 0; i < info->responder_resources * 2; i++) {
2365 smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2366 if (!smbdirect_mr)
2367 goto out;
2368 smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2369 info->max_frmr_depth);
2370 if (IS_ERR(smbdirect_mr->mr)) {
2371 log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x "
2372 "max_frmr_depth=%x\n",
2373 info->mr_type, info->max_frmr_depth);
2374 goto out;
2375 }
2376 smbdirect_mr->sgl = kcalloc(
2377 info->max_frmr_depth,
2378 sizeof(struct scatterlist),
2379 GFP_KERNEL);
2380 if (!smbdirect_mr->sgl) {
2381 log_rdma_mr(ERR, "failed to allocate sgl\n");
2382 ib_dereg_mr(smbdirect_mr->mr);
2383 goto out;
2384 }
2385 smbdirect_mr->state = MR_READY;
2386 smbdirect_mr->conn = info;
2387
2388 list_add_tail(&smbdirect_mr->list, &info->mr_list);
2389 atomic_inc(&info->mr_ready_count);
2390 }
2391 INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2392 return 0;
2393
2394out:
2395 kfree(smbdirect_mr);
2396
2397 list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2398 ib_dereg_mr(smbdirect_mr->mr);
2399 kfree(smbdirect_mr->sgl);
2400 kfree(smbdirect_mr);
2401 }
2402 return -ENOMEM;
2403}
2404
2405/*
2406 * Get a MR from mr_list. This function waits until there is at least one
2407 * MR available in the list. It may access the list while the
2408 * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2409 * as they never modify the same places. However, there may be several CPUs
2410 * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2411 * protect this situation.
2412 */
2413static struct smbd_mr *get_mr(struct smbd_connection *info)
2414{
2415 struct smbd_mr *ret;
2416 int rc;
2417again:
2418 rc = wait_event_interruptible(info->wait_mr,
2419 atomic_read(&info->mr_ready_count) ||
2420 info->transport_status != SMBD_CONNECTED);
2421 if (rc) {
2422 log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2423 return NULL;
2424 }
2425
2426 if (info->transport_status != SMBD_CONNECTED) {
2427 log_rdma_mr(ERR, "info->transport_status=%x\n",
2428 info->transport_status);
2429 return NULL;
2430 }
2431
2432 spin_lock(&info->mr_list_lock);
2433 list_for_each_entry(ret, &info->mr_list, list) {
2434 if (ret->state == MR_READY) {
2435 ret->state = MR_REGISTERED;
2436 spin_unlock(&info->mr_list_lock);
2437 atomic_dec(&info->mr_ready_count);
2438 atomic_inc(&info->mr_used_count);
2439 return ret;
2440 }
2441 }
2442
2443 spin_unlock(&info->mr_list_lock);
2444 /*
2445 * It is possible that we could fail to get MR because other processes may
2446 * try to acquire a MR at the same time. If this is the case, retry it.
2447 */
2448 goto again;
2449}
2450
2451/*
2452 * Register memory for RDMA read/write
2453 * pages[]: the list of pages to register memory with
2454 * num_pages: the number of pages to register
2455 * tailsz: if non-zero, the bytes to register in the last page
2456 * writing: true if this is a RDMA write (SMB read), false for RDMA read
2457 * need_invalidate: true if this MR needs to be locally invalidated after I/O
2458 * return value: the MR registered, NULL if failed.
2459 */
2460struct smbd_mr *smbd_register_mr(
2461 struct smbd_connection *info, struct page *pages[], int num_pages,
2462 int tailsz, bool writing, bool need_invalidate)
2463{
2464 struct smbd_mr *smbdirect_mr;
2465 int rc, i;
2466 enum dma_data_direction dir;
2467 struct ib_reg_wr *reg_wr;
2468 struct ib_send_wr *bad_wr;
2469
2470 if (num_pages > info->max_frmr_depth) {
2471 log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2472 num_pages, info->max_frmr_depth);
2473 return NULL;
2474 }
2475
2476 smbdirect_mr = get_mr(info);
2477 if (!smbdirect_mr) {
2478 log_rdma_mr(ERR, "get_mr returning NULL\n");
2479 return NULL;
2480 }
2481 smbdirect_mr->need_invalidate = need_invalidate;
2482 smbdirect_mr->sgl_count = num_pages;
2483 sg_init_table(smbdirect_mr->sgl, num_pages);
2484
2485 for (i = 0; i < num_pages - 1; i++)
2486 sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2487
2488 sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2489 tailsz ? tailsz : PAGE_SIZE, 0);
2490
2491 dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2492 smbdirect_mr->dir = dir;
2493 rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2494 if (!rc) {
2495 log_rdma_mr(INFO, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2496 num_pages, dir, rc);
2497 goto dma_map_error;
2498 }
2499
2500 rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2501 NULL, PAGE_SIZE);
2502 if (rc != num_pages) {
2503 log_rdma_mr(INFO,
2504 "ib_map_mr_sg failed rc = %x num_pages = %x\n",
2505 rc, num_pages);
2506 goto map_mr_error;
2507 }
2508
2509 ib_update_fast_reg_key(smbdirect_mr->mr,
2510 ib_inc_rkey(smbdirect_mr->mr->rkey));
2511 reg_wr = &smbdirect_mr->wr;
2512 reg_wr->wr.opcode = IB_WR_REG_MR;
2513 smbdirect_mr->cqe.done = register_mr_done;
2514 reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2515 reg_wr->wr.num_sge = 0;
2516 reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2517 reg_wr->mr = smbdirect_mr->mr;
2518 reg_wr->key = smbdirect_mr->mr->rkey;
2519 reg_wr->access = writing ?
2520 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2521 IB_ACCESS_REMOTE_READ;
2522
2523 /*
2524 * There is no need for waiting for complemtion on ib_post_send
2525 * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2526 * on the next ib_post_send when we actaully send I/O to remote peer
2527 */
2528 rc = ib_post_send(info->id->qp, &reg_wr->wr, &bad_wr);
2529 if (!rc)
2530 return smbdirect_mr;
2531
2532 log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2533 rc, reg_wr->key);
2534
2535 /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2536map_mr_error:
2537 ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2538 smbdirect_mr->sgl_count, smbdirect_mr->dir);
2539
2540dma_map_error:
2541 smbdirect_mr->state = MR_ERROR;
2542 if (atomic_dec_and_test(&info->mr_used_count))
2543 wake_up(&info->wait_for_mr_cleanup);
2544
2545 return NULL;
2546}
2547
2548static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2549{
2550 struct smbd_mr *smbdirect_mr;
2551 struct ib_cqe *cqe;
2552
2553 cqe = wc->wr_cqe;
2554 smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2555 smbdirect_mr->state = MR_INVALIDATED;
2556 if (wc->status != IB_WC_SUCCESS) {
2557 log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2558 smbdirect_mr->state = MR_ERROR;
2559 }
2560 complete(&smbdirect_mr->invalidate_done);
2561}
2562
2563/*
2564 * Deregister a MR after I/O is done
2565 * This function may wait if remote invalidation is not used
2566 * and we have to locally invalidate the buffer to prevent data is being
2567 * modified by remote peer after upper layer consumes it
2568 */
2569int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2570{
2571 struct ib_send_wr *wr, *bad_wr;
2572 struct smbd_connection *info = smbdirect_mr->conn;
2573 int rc = 0;
2574
2575 if (smbdirect_mr->need_invalidate) {
2576 /* Need to finish local invalidation before returning */
2577 wr = &smbdirect_mr->inv_wr;
2578 wr->opcode = IB_WR_LOCAL_INV;
2579 smbdirect_mr->cqe.done = local_inv_done;
2580 wr->wr_cqe = &smbdirect_mr->cqe;
2581 wr->num_sge = 0;
2582 wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2583 wr->send_flags = IB_SEND_SIGNALED;
2584
2585 init_completion(&smbdirect_mr->invalidate_done);
2586 rc = ib_post_send(info->id->qp, wr, &bad_wr);
2587 if (rc) {
2588 log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2589 smbd_disconnect_rdma_connection(info);
2590 goto done;
2591 }
2592 wait_for_completion(&smbdirect_mr->invalidate_done);
2593 smbdirect_mr->need_invalidate = false;
2594 } else
2595 /*
2596 * For remote invalidation, just set it to MR_INVALIDATED
2597 * and defer to mr_recovery_work to recover the MR for next use
2598 */
2599 smbdirect_mr->state = MR_INVALIDATED;
2600
2601 /*
2602 * Schedule the work to do MR recovery for future I/Os
2603 * MR recovery is slow and we don't want it to block the current I/O
2604 */
2605 queue_work(info->workqueue, &info->mr_recovery_work);
2606
2607done:
2608 if (atomic_dec_and_test(&info->mr_used_count))
2609 wake_up(&info->wait_for_mr_cleanup);
2610
2611 return rc;
2612}