summaryrefslogtreecommitdiffstats
path: root/net/smc
diff options
context:
space:
mode:
authorHans Wippel <hwippel@linux.ibm.com>2018-06-28 13:05:10 -0400
committerDavid S. Miller <davem@davemloft.net>2018-06-30 07:42:26 -0400
commitbe244f28d22f77d939ba2b973c102ad2b49d3496 (patch)
treec819462ffbf01974dcc81d7069050cb3a384efd0 /net/smc
parentc758dfddc1b5b1c9b8c64e5e4bb9bf24b74f4a59 (diff)
net/smc: add SMC-D support in data transfer
The data transfer and CDC message headers differ in SMC-R and SMC-D. This patch adds support for the SMC-D data transfer to the existing SMC code. It consists of the following: * SMC-D CDC support * SMC-D tx support * SMC-D rx support The CDC header is stored at the beginning of the receive buffer. Thus, a rx_offset variable is added for the CDC header offset within the buffer (0 for SMC-R). Signed-off-by: Hans Wippel <hwippel@linux.ibm.com> Signed-off-by: Ursula Braun <ubraun@linux.ibm.com> Suggested-by: Thomas Richter <tmricht@linux.ibm.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/smc')
-rw-r--r--net/smc/smc.h5
-rw-r--r--net/smc/smc_cdc.c86
-rw-r--r--net/smc/smc_cdc.h43
-rw-r--r--net/smc/smc_core.c25
-rw-r--r--net/smc/smc_ism.c8
-rw-r--r--net/smc/smc_rx.c2
-rw-r--r--net/smc/smc_tx.c193
-rw-r--r--net/smc/smc_tx.h2
8 files changed, 308 insertions, 56 deletions
diff --git a/net/smc/smc.h b/net/smc/smc.h
index 7c86f716a92e..8c6231011779 100644
--- a/net/smc/smc.h
+++ b/net/smc/smc.h
@@ -183,6 +183,11 @@ struct smc_connection {
183 spinlock_t acurs_lock; /* protect cursors */ 183 spinlock_t acurs_lock; /* protect cursors */
184#endif 184#endif
185 struct work_struct close_work; /* peer sent some closing */ 185 struct work_struct close_work; /* peer sent some closing */
186 struct tasklet_struct rx_tsklet; /* Receiver tasklet for SMC-D */
187 u8 rx_off; /* receive offset:
188 * 0 for SMC-R, 32 for SMC-D
189 */
190 u64 peer_token; /* SMC-D token of peer */
186}; 191};
187 192
188struct smc_sock { /* smc sock container */ 193struct smc_sock { /* smc sock container */
diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c
index a7e8d63fc8ae..621d8cca570b 100644
--- a/net/smc/smc_cdc.c
+++ b/net/smc/smc_cdc.c
@@ -117,7 +117,7 @@ int smc_cdc_msg_send(struct smc_connection *conn,
117 return rc; 117 return rc;
118} 118}
119 119
120int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn) 120static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn)
121{ 121{
122 struct smc_cdc_tx_pend *pend; 122 struct smc_cdc_tx_pend *pend;
123 struct smc_wr_buf *wr_buf; 123 struct smc_wr_buf *wr_buf;
@@ -130,6 +130,21 @@ int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
130 return smc_cdc_msg_send(conn, wr_buf, pend); 130 return smc_cdc_msg_send(conn, wr_buf, pend);
131} 131}
132 132
133int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
134{
135 int rc;
136
137 if (conn->lgr->is_smcd) {
138 spin_lock_bh(&conn->send_lock);
139 rc = smcd_cdc_msg_send(conn);
140 spin_unlock_bh(&conn->send_lock);
141 } else {
142 rc = smcr_cdc_get_slot_and_msg_send(conn);
143 }
144
145 return rc;
146}
147
133static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend, 148static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend,
134 unsigned long data) 149 unsigned long data)
135{ 150{
@@ -157,6 +172,45 @@ void smc_cdc_tx_dismiss_slots(struct smc_connection *conn)
157 (unsigned long)conn); 172 (unsigned long)conn);
158} 173}
159 174
175/* Send a SMC-D CDC header.
176 * This increments the free space available in our send buffer.
177 * Also update the confirmed receive buffer with what was sent to the peer.
178 */
179int smcd_cdc_msg_send(struct smc_connection *conn)
180{
181 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
182 struct smcd_cdc_msg cdc;
183 int rc, diff;
184
185 memset(&cdc, 0, sizeof(cdc));
186 cdc.common.type = SMC_CDC_MSG_TYPE;
187 cdc.prod_wrap = conn->local_tx_ctrl.prod.wrap;
188 cdc.prod_count = conn->local_tx_ctrl.prod.count;
189
190 cdc.cons_wrap = conn->local_tx_ctrl.cons.wrap;
191 cdc.cons_count = conn->local_tx_ctrl.cons.count;
192 cdc.prod_flags = conn->local_tx_ctrl.prod_flags;
193 cdc.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
194 rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1);
195 if (rc)
196 return rc;
197 smc_curs_write(&conn->rx_curs_confirmed,
198 smc_curs_read(&conn->local_tx_ctrl.cons, conn), conn);
199 /* Calculate transmitted data and increment free send buffer space */
200 diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
201 &conn->tx_curs_sent);
202 /* increased by confirmed number of bytes */
203 smp_mb__before_atomic();
204 atomic_add(diff, &conn->sndbuf_space);
205 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
206 smp_mb__after_atomic();
207 smc_curs_write(&conn->tx_curs_fin,
208 smc_curs_read(&conn->tx_curs_sent, conn), conn);
209
210 smc_tx_sndbuf_nonfull(smc);
211 return rc;
212}
213
160/********************************* receive ***********************************/ 214/********************************* receive ***********************************/
161 215
162static inline bool smc_cdc_before(u16 seq1, u16 seq2) 216static inline bool smc_cdc_before(u16 seq1, u16 seq2)
@@ -178,7 +232,7 @@ static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc,
178 if (!sock_flag(&smc->sk, SOCK_URGINLINE)) 232 if (!sock_flag(&smc->sk, SOCK_URGINLINE))
179 /* we'll skip the urgent byte, so don't account for it */ 233 /* we'll skip the urgent byte, so don't account for it */
180 (*diff_prod)--; 234 (*diff_prod)--;
181 base = (char *)conn->rmb_desc->cpu_addr; 235 base = (char *)conn->rmb_desc->cpu_addr + conn->rx_off;
182 if (conn->urg_curs.count) 236 if (conn->urg_curs.count)
183 conn->urg_rx_byte = *(base + conn->urg_curs.count - 1); 237 conn->urg_rx_byte = *(base + conn->urg_curs.count - 1);
184 else 238 else
@@ -276,6 +330,34 @@ static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
276 sock_put(&smc->sk); /* no free sk in softirq-context */ 330 sock_put(&smc->sk); /* no free sk in softirq-context */
277} 331}
278 332
333/* Schedule a tasklet for this connection. Triggered from the ISM device IRQ
334 * handler to indicate update in the DMBE.
335 *
336 * Context:
337 * - tasklet context
338 */
339static void smcd_cdc_rx_tsklet(unsigned long data)
340{
341 struct smc_connection *conn = (struct smc_connection *)data;
342 struct smcd_cdc_msg cdc;
343 struct smc_sock *smc;
344
345 if (!conn)
346 return;
347
348 memcpy(&cdc, conn->rmb_desc->cpu_addr, sizeof(cdc));
349 smc = container_of(conn, struct smc_sock, conn);
350 smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc);
351}
352
353/* Initialize receive tasklet. Called from ISM device IRQ handler to start
354 * receiver side.
355 */
356void smcd_cdc_rx_init(struct smc_connection *conn)
357{
358 tasklet_init(&conn->rx_tsklet, smcd_cdc_rx_tsklet, (unsigned long)conn);
359}
360
279/***************************** init, exit, misc ******************************/ 361/***************************** init, exit, misc ******************************/
280 362
281static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf) 363static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf)
diff --git a/net/smc/smc_cdc.h b/net/smc/smc_cdc.h
index f60082fee5b8..8fbce4fee3e4 100644
--- a/net/smc/smc_cdc.h
+++ b/net/smc/smc_cdc.h
@@ -50,6 +50,20 @@ struct smc_cdc_msg {
50 u8 reserved[18]; 50 u8 reserved[18];
51} __packed; /* format defined in RFC7609 */ 51} __packed; /* format defined in RFC7609 */
52 52
53/* CDC message for SMC-D */
54struct smcd_cdc_msg {
55 struct smc_wr_rx_hdr common; /* Type = 0xFE */
56 u8 res1[7];
57 u16 prod_wrap;
58 u32 prod_count;
59 u8 res2[2];
60 u16 cons_wrap;
61 u32 cons_count;
62 struct smc_cdc_producer_flags prod_flags;
63 struct smc_cdc_conn_state_flags conn_state_flags;
64 u8 res3[8];
65} __packed;
66
53static inline bool smc_cdc_rxed_any_close(struct smc_connection *conn) 67static inline bool smc_cdc_rxed_any_close(struct smc_connection *conn)
54{ 68{
55 return conn->local_rx_ctrl.conn_state_flags.peer_conn_abort || 69 return conn->local_rx_ctrl.conn_state_flags.peer_conn_abort ||
@@ -204,9 +218,9 @@ static inline void smc_cdc_cursor_to_host(union smc_host_cursor *local,
204 smc_curs_write(local, smc_curs_read(&temp, conn), conn); 218 smc_curs_write(local, smc_curs_read(&temp, conn), conn);
205} 219}
206 220
207static inline void smc_cdc_msg_to_host(struct smc_host_cdc_msg *local, 221static inline void smcr_cdc_msg_to_host(struct smc_host_cdc_msg *local,
208 struct smc_cdc_msg *peer, 222 struct smc_cdc_msg *peer,
209 struct smc_connection *conn) 223 struct smc_connection *conn)
210{ 224{
211 local->common.type = peer->common.type; 225 local->common.type = peer->common.type;
212 local->len = peer->len; 226 local->len = peer->len;
@@ -218,6 +232,27 @@ static inline void smc_cdc_msg_to_host(struct smc_host_cdc_msg *local,
218 local->conn_state_flags = peer->conn_state_flags; 232 local->conn_state_flags = peer->conn_state_flags;
219} 233}
220 234
235static inline void smcd_cdc_msg_to_host(struct smc_host_cdc_msg *local,
236 struct smcd_cdc_msg *peer)
237{
238 local->prod.wrap = peer->prod_wrap;
239 local->prod.count = peer->prod_count;
240 local->cons.wrap = peer->cons_wrap;
241 local->cons.count = peer->cons_count;
242 local->prod_flags = peer->prod_flags;
243 local->conn_state_flags = peer->conn_state_flags;
244}
245
246static inline void smc_cdc_msg_to_host(struct smc_host_cdc_msg *local,
247 struct smc_cdc_msg *peer,
248 struct smc_connection *conn)
249{
250 if (conn->lgr->is_smcd)
251 smcd_cdc_msg_to_host(local, (struct smcd_cdc_msg *)peer);
252 else
253 smcr_cdc_msg_to_host(local, peer, conn);
254}
255
221struct smc_cdc_tx_pend; 256struct smc_cdc_tx_pend;
222 257
223int smc_cdc_get_free_slot(struct smc_connection *conn, 258int smc_cdc_get_free_slot(struct smc_connection *conn,
@@ -227,6 +262,8 @@ void smc_cdc_tx_dismiss_slots(struct smc_connection *conn);
227int smc_cdc_msg_send(struct smc_connection *conn, struct smc_wr_buf *wr_buf, 262int smc_cdc_msg_send(struct smc_connection *conn, struct smc_wr_buf *wr_buf,
228 struct smc_cdc_tx_pend *pend); 263 struct smc_cdc_tx_pend *pend);
229int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn); 264int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn);
265int smcd_cdc_msg_send(struct smc_connection *conn);
230int smc_cdc_init(void) __init; 266int smc_cdc_init(void) __init;
267void smcd_cdc_rx_init(struct smc_connection *conn);
231 268
232#endif /* SMC_CDC_H */ 269#endif /* SMC_CDC_H */
diff --git a/net/smc/smc_core.c b/net/smc/smc_core.c
index daa88db1841a..434c028162a4 100644
--- a/net/smc/smc_core.c
+++ b/net/smc/smc_core.c
@@ -281,10 +281,12 @@ void smc_conn_free(struct smc_connection *conn)
281{ 281{
282 if (!conn->lgr) 282 if (!conn->lgr)
283 return; 283 return;
284 if (conn->lgr->is_smcd) 284 if (conn->lgr->is_smcd) {
285 smc_ism_unset_conn(conn); 285 smc_ism_unset_conn(conn);
286 else 286 tasklet_kill(&conn->rx_tsklet);
287 } else {
287 smc_cdc_tx_dismiss_slots(conn); 288 smc_cdc_tx_dismiss_slots(conn);
289 }
288 smc_lgr_unregister_conn(conn); 290 smc_lgr_unregister_conn(conn);
289 smc_buf_unuse(conn); 291 smc_buf_unuse(conn);
290} 292}
@@ -324,10 +326,13 @@ static void smcr_buf_free(struct smc_link_group *lgr, bool is_rmb,
324static void smcd_buf_free(struct smc_link_group *lgr, bool is_dmb, 326static void smcd_buf_free(struct smc_link_group *lgr, bool is_dmb,
325 struct smc_buf_desc *buf_desc) 327 struct smc_buf_desc *buf_desc)
326{ 328{
327 if (is_dmb) 329 if (is_dmb) {
330 /* restore original buf len */
331 buf_desc->len += sizeof(struct smcd_cdc_msg);
328 smc_ism_unregister_dmb(lgr->smcd, buf_desc); 332 smc_ism_unregister_dmb(lgr->smcd, buf_desc);
329 else 333 } else {
330 kfree(buf_desc->cpu_addr); 334 kfree(buf_desc->cpu_addr);
335 }
331 kfree(buf_desc); 336 kfree(buf_desc);
332} 337}
333 338
@@ -632,6 +637,10 @@ create:
632 conn->local_tx_ctrl.common.type = SMC_CDC_MSG_TYPE; 637 conn->local_tx_ctrl.common.type = SMC_CDC_MSG_TYPE;
633 conn->local_tx_ctrl.len = SMC_WR_TX_SIZE; 638 conn->local_tx_ctrl.len = SMC_WR_TX_SIZE;
634 conn->urg_state = SMC_URG_READ; 639 conn->urg_state = SMC_URG_READ;
640 if (is_smcd) {
641 conn->rx_off = sizeof(struct smcd_cdc_msg);
642 smcd_cdc_rx_init(conn); /* init tasklet for this conn */
643 }
635#ifndef KERNEL_HAS_ATOMIC64 644#ifndef KERNEL_HAS_ATOMIC64
636 spin_lock_init(&conn->acurs_lock); 645 spin_lock_init(&conn->acurs_lock);
637#endif 646#endif
@@ -776,8 +785,9 @@ static struct smc_buf_desc *smcd_new_buf_create(struct smc_link_group *lgr,
776 kfree(buf_desc); 785 kfree(buf_desc);
777 return ERR_PTR(-EAGAIN); 786 return ERR_PTR(-EAGAIN);
778 } 787 }
779 memset(buf_desc->cpu_addr, 0, bufsize); 788 buf_desc->pages = virt_to_page(buf_desc->cpu_addr);
780 buf_desc->len = bufsize; 789 /* CDC header stored in buf. So, pretend it was smaller */
790 buf_desc->len = bufsize - sizeof(struct smcd_cdc_msg);
781 } else { 791 } else {
782 buf_desc->cpu_addr = kzalloc(bufsize, GFP_KERNEL | 792 buf_desc->cpu_addr = kzalloc(bufsize, GFP_KERNEL |
783 __GFP_NOWARN | __GFP_NORETRY | 793 __GFP_NOWARN | __GFP_NORETRY |
@@ -854,7 +864,8 @@ static int __smc_buf_create(struct smc_sock *smc, bool is_smcd, bool is_rmb)
854 conn->rmbe_size_short = bufsize_short; 864 conn->rmbe_size_short = bufsize_short;
855 smc->sk.sk_rcvbuf = bufsize * 2; 865 smc->sk.sk_rcvbuf = bufsize * 2;
856 atomic_set(&conn->bytes_to_rcv, 0); 866 atomic_set(&conn->bytes_to_rcv, 0);
857 conn->rmbe_update_limit = smc_rmb_wnd_update_limit(bufsize); 867 conn->rmbe_update_limit =
868 smc_rmb_wnd_update_limit(buf_desc->len);
858 if (is_smcd) 869 if (is_smcd)
859 smc_ism_set_conn(conn); /* map RMB/smcd_dev to conn */ 870 smc_ism_set_conn(conn); /* map RMB/smcd_dev to conn */
860 } else { 871 } else {
diff --git a/net/smc/smc_ism.c b/net/smc/smc_ism.c
index f44e4dff244a..cfade7fdcc6d 100644
--- a/net/smc/smc_ism.c
+++ b/net/smc/smc_ism.c
@@ -302,5 +302,13 @@ EXPORT_SYMBOL_GPL(smcd_handle_event);
302 */ 302 */
303void smcd_handle_irq(struct smcd_dev *smcd, unsigned int dmbno) 303void smcd_handle_irq(struct smcd_dev *smcd, unsigned int dmbno)
304{ 304{
305 struct smc_connection *conn = NULL;
306 unsigned long flags;
307
308 spin_lock_irqsave(&smcd->lock, flags);
309 conn = smcd->conn[dmbno];
310 if (conn)
311 tasklet_schedule(&conn->rx_tsklet);
312 spin_unlock_irqrestore(&smcd->lock, flags);
305} 313}
306EXPORT_SYMBOL_GPL(smcd_handle_irq); 314EXPORT_SYMBOL_GPL(smcd_handle_irq);
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index 3d77b383cccd..b329803c8339 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -305,7 +305,7 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
305 target = sock_rcvlowat(sk, flags & MSG_WAITALL, len); 305 target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
306 306
307 /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */ 307 /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
308 rcvbuf_base = conn->rmb_desc->cpu_addr; 308 rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr;
309 309
310 do { /* while (read_remaining) */ 310 do { /* while (read_remaining) */
311 if (read_done >= target || (pipe && read_done)) 311 if (read_done >= target || (pipe && read_done))
diff --git a/net/smc/smc_tx.c b/net/smc/smc_tx.c
index f82886b7d1d8..142bcb134dd6 100644
--- a/net/smc/smc_tx.c
+++ b/net/smc/smc_tx.c
@@ -24,6 +24,7 @@
24#include "smc.h" 24#include "smc.h"
25#include "smc_wr.h" 25#include "smc_wr.h"
26#include "smc_cdc.h" 26#include "smc_cdc.h"
27#include "smc_ism.h"
27#include "smc_tx.h" 28#include "smc_tx.h"
28 29
29#define SMC_TX_WORK_DELAY HZ 30#define SMC_TX_WORK_DELAY HZ
@@ -250,6 +251,24 @@ out_err:
250 251
251/***************************** sndbuf consumer *******************************/ 252/***************************** sndbuf consumer *******************************/
252 253
254/* sndbuf consumer: actual data transfer of one target chunk with ISM write */
255int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len,
256 u32 offset, int signal)
257{
258 struct smc_ism_position pos;
259 int rc;
260
261 memset(&pos, 0, sizeof(pos));
262 pos.token = conn->peer_token;
263 pos.index = conn->peer_rmbe_idx;
264 pos.offset = conn->tx_off + offset;
265 pos.signal = signal;
266 rc = smc_ism_write(conn->lgr->smcd, &pos, data, len);
267 if (rc)
268 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
269 return rc;
270}
271
253/* sndbuf consumer: actual data transfer of one target chunk with RDMA write */ 272/* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
254static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset, 273static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
255 int num_sges, struct ib_sge sges[]) 274 int num_sges, struct ib_sge sges[])
@@ -297,21 +316,104 @@ static inline void smc_tx_advance_cursors(struct smc_connection *conn,
297 smc_curs_add(conn->sndbuf_desc->len, sent, len); 316 smc_curs_add(conn->sndbuf_desc->len, sent, len);
298} 317}
299 318
319/* SMC-R helper for smc_tx_rdma_writes() */
320static int smcr_tx_rdma_writes(struct smc_connection *conn, size_t len,
321 size_t src_off, size_t src_len,
322 size_t dst_off, size_t dst_len)
323{
324 dma_addr_t dma_addr =
325 sg_dma_address(conn->sndbuf_desc->sgt[SMC_SINGLE_LINK].sgl);
326 struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK];
327 int src_len_sum = src_len, dst_len_sum = dst_len;
328 struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
329 int sent_count = src_off;
330 int srcchunk, dstchunk;
331 int num_sges;
332 int rc;
333
334 for (dstchunk = 0; dstchunk < 2; dstchunk++) {
335 num_sges = 0;
336 for (srcchunk = 0; srcchunk < 2; srcchunk++) {
337 sges[srcchunk].addr = dma_addr + src_off;
338 sges[srcchunk].length = src_len;
339 sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
340 num_sges++;
341
342 src_off += src_len;
343 if (src_off >= conn->sndbuf_desc->len)
344 src_off -= conn->sndbuf_desc->len;
345 /* modulo in send ring */
346 if (src_len_sum == dst_len)
347 break; /* either on 1st or 2nd iteration */
348 /* prepare next (== 2nd) iteration */
349 src_len = dst_len - src_len; /* remainder */
350 src_len_sum += src_len;
351 }
352 rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
353 if (rc)
354 return rc;
355 if (dst_len_sum == len)
356 break; /* either on 1st or 2nd iteration */
357 /* prepare next (== 2nd) iteration */
358 dst_off = 0; /* modulo offset in RMBE ring buffer */
359 dst_len = len - dst_len; /* remainder */
360 dst_len_sum += dst_len;
361 src_len = min_t(int, dst_len, conn->sndbuf_desc->len -
362 sent_count);
363 src_len_sum = src_len;
364 }
365 return 0;
366}
367
368/* SMC-D helper for smc_tx_rdma_writes() */
369static int smcd_tx_rdma_writes(struct smc_connection *conn, size_t len,
370 size_t src_off, size_t src_len,
371 size_t dst_off, size_t dst_len)
372{
373 int src_len_sum = src_len, dst_len_sum = dst_len;
374 int srcchunk, dstchunk;
375 int rc;
376
377 for (dstchunk = 0; dstchunk < 2; dstchunk++) {
378 for (srcchunk = 0; srcchunk < 2; srcchunk++) {
379 void *data = conn->sndbuf_desc->cpu_addr + src_off;
380
381 rc = smcd_tx_ism_write(conn, data, src_len, dst_off +
382 sizeof(struct smcd_cdc_msg), 0);
383 if (rc)
384 return rc;
385 dst_off += src_len;
386 src_off += src_len;
387 if (src_off >= conn->sndbuf_desc->len)
388 src_off -= conn->sndbuf_desc->len;
389 /* modulo in send ring */
390 if (src_len_sum == dst_len)
391 break; /* either on 1st or 2nd iteration */
392 /* prepare next (== 2nd) iteration */
393 src_len = dst_len - src_len; /* remainder */
394 src_len_sum += src_len;
395 }
396 if (dst_len_sum == len)
397 break; /* either on 1st or 2nd iteration */
398 /* prepare next (== 2nd) iteration */
399 dst_off = 0; /* modulo offset in RMBE ring buffer */
400 dst_len = len - dst_len; /* remainder */
401 dst_len_sum += dst_len;
402 src_len = min_t(int, dst_len, conn->sndbuf_desc->len - src_off);
403 src_len_sum = src_len;
404 }
405 return 0;
406}
407
300/* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit; 408/* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
301 * usable snd_wnd as max transmit 409 * usable snd_wnd as max transmit
302 */ 410 */
303static int smc_tx_rdma_writes(struct smc_connection *conn) 411static int smc_tx_rdma_writes(struct smc_connection *conn)
304{ 412{
305 size_t src_off, src_len, dst_off, dst_len; /* current chunk values */ 413 size_t len, src_len, dst_off, dst_len; /* current chunk values */
306 size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
307 union smc_host_cursor sent, prep, prod, cons; 414 union smc_host_cursor sent, prep, prod, cons;
308 struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
309 struct smc_link_group *lgr = conn->lgr;
310 struct smc_cdc_producer_flags *pflags; 415 struct smc_cdc_producer_flags *pflags;
311 int to_send, rmbespace; 416 int to_send, rmbespace;
312 struct smc_link *link;
313 dma_addr_t dma_addr;
314 int num_sges;
315 int rc; 417 int rc;
316 418
317 /* source: sndbuf */ 419 /* source: sndbuf */
@@ -341,7 +443,6 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
341 len = min(to_send, rmbespace); 443 len = min(to_send, rmbespace);
342 444
343 /* initialize variables for first iteration of subsequent nested loop */ 445 /* initialize variables for first iteration of subsequent nested loop */
344 link = &lgr->lnk[SMC_SINGLE_LINK];
345 dst_off = prod.count; 446 dst_off = prod.count;
346 if (prod.wrap == cons.wrap) { 447 if (prod.wrap == cons.wrap) {
347 /* the filled destination area is unwrapped, 448 /* the filled destination area is unwrapped,
@@ -358,8 +459,6 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
358 */ 459 */
359 dst_len = len; 460 dst_len = len;
360 } 461 }
361 dst_len_sum = dst_len;
362 src_off = sent.count;
363 /* dst_len determines the maximum src_len */ 462 /* dst_len determines the maximum src_len */
364 if (sent.count + dst_len <= conn->sndbuf_desc->len) { 463 if (sent.count + dst_len <= conn->sndbuf_desc->len) {
365 /* unwrapped src case: single chunk of entire dst_len */ 464 /* unwrapped src case: single chunk of entire dst_len */
@@ -368,38 +467,15 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
368 /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */ 467 /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
369 src_len = conn->sndbuf_desc->len - sent.count; 468 src_len = conn->sndbuf_desc->len - sent.count;
370 } 469 }
371 src_len_sum = src_len; 470
372 dma_addr = sg_dma_address(conn->sndbuf_desc->sgt[SMC_SINGLE_LINK].sgl); 471 if (conn->lgr->is_smcd)
373 for (dstchunk = 0; dstchunk < 2; dstchunk++) { 472 rc = smcd_tx_rdma_writes(conn, len, sent.count, src_len,
374 num_sges = 0; 473 dst_off, dst_len);
375 for (srcchunk = 0; srcchunk < 2; srcchunk++) { 474 else
376 sges[srcchunk].addr = dma_addr + src_off; 475 rc = smcr_tx_rdma_writes(conn, len, sent.count, src_len,
377 sges[srcchunk].length = src_len; 476 dst_off, dst_len);
378 sges[srcchunk].lkey = link->roce_pd->local_dma_lkey; 477 if (rc)
379 num_sges++; 478 return rc;
380 src_off += src_len;
381 if (src_off >= conn->sndbuf_desc->len)
382 src_off -= conn->sndbuf_desc->len;
383 /* modulo in send ring */
384 if (src_len_sum == dst_len)
385 break; /* either on 1st or 2nd iteration */
386 /* prepare next (== 2nd) iteration */
387 src_len = dst_len - src_len; /* remainder */
388 src_len_sum += src_len;
389 }
390 rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
391 if (rc)
392 return rc;
393 if (dst_len_sum == len)
394 break; /* either on 1st or 2nd iteration */
395 /* prepare next (== 2nd) iteration */
396 dst_off = 0; /* modulo offset in RMBE ring buffer */
397 dst_len = len - dst_len; /* remainder */
398 dst_len_sum += dst_len;
399 src_len = min_t(int,
400 dst_len, conn->sndbuf_desc->len - sent.count);
401 src_len_sum = src_len;
402 }
403 479
404 if (conn->urg_tx_pend && len == to_send) 480 if (conn->urg_tx_pend && len == to_send)
405 pflags->urg_data_present = 1; 481 pflags->urg_data_present = 1;
@@ -420,7 +496,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
420/* Wakeup sndbuf consumers from any context (IRQ or process) 496/* Wakeup sndbuf consumers from any context (IRQ or process)
421 * since there is more data to transmit; usable snd_wnd as max transmit 497 * since there is more data to transmit; usable snd_wnd as max transmit
422 */ 498 */
423int smc_tx_sndbuf_nonempty(struct smc_connection *conn) 499static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
424{ 500{
425 struct smc_cdc_producer_flags *pflags; 501 struct smc_cdc_producer_flags *pflags;
426 struct smc_cdc_tx_pend *pend; 502 struct smc_cdc_tx_pend *pend;
@@ -467,6 +543,37 @@ out_unlock:
467 return rc; 543 return rc;
468} 544}
469 545
546static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn)
547{
548 struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
549 int rc = 0;
550
551 spin_lock_bh(&conn->send_lock);
552 if (!pflags->urg_data_present)
553 rc = smc_tx_rdma_writes(conn);
554 if (!rc)
555 rc = smcd_cdc_msg_send(conn);
556
557 if (!rc && pflags->urg_data_present) {
558 pflags->urg_data_pending = 0;
559 pflags->urg_data_present = 0;
560 }
561 spin_unlock_bh(&conn->send_lock);
562 return rc;
563}
564
565int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
566{
567 int rc;
568
569 if (conn->lgr->is_smcd)
570 rc = smcd_tx_sndbuf_nonempty(conn);
571 else
572 rc = smcr_tx_sndbuf_nonempty(conn);
573
574 return rc;
575}
576
470/* Wakeup sndbuf consumers from process context 577/* Wakeup sndbuf consumers from process context
471 * since there is more data to transmit 578 * since there is more data to transmit
472 */ 579 */
diff --git a/net/smc/smc_tx.h b/net/smc/smc_tx.h
index 9d2238909fa0..b22bdc5694c4 100644
--- a/net/smc/smc_tx.h
+++ b/net/smc/smc_tx.h
@@ -33,5 +33,7 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len);
33int smc_tx_sndbuf_nonempty(struct smc_connection *conn); 33int smc_tx_sndbuf_nonempty(struct smc_connection *conn);
34void smc_tx_sndbuf_nonfull(struct smc_sock *smc); 34void smc_tx_sndbuf_nonfull(struct smc_sock *smc);
35void smc_tx_consumer_update(struct smc_connection *conn, bool force); 35void smc_tx_consumer_update(struct smc_connection *conn, bool force);
36int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len,
37 u32 offset, int signal);
36 38
37#endif /* SMC_TX_H */ 39#endif /* SMC_TX_H */