aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndy Grover <andy.grover@oracle.com>2009-04-01 04:20:20 -0400
committerDavid S. Miller <davem@davemloft.net>2009-04-02 03:52:22 -0400
commit8cbd9606a6367c221a7bbcc47f3ab1a8c31b6437 (patch)
tree9ff2adf88be32383fd59dc133f1cd6670b9ee815
parent745cbccac3fe8cead529a1b3358e1e86a1505bfa (diff)
RDS: Use spinlock to protect 64b value update on 32b archs
We have a 64bit value that needs to be set atomically. This is easy and quick on all 64bit archs, and can also be done on x86/32 with set_64bit() (uses cmpxchg8b). However other 32b archs don't have this. I actually changed this to the current state in preparation for mainline because the old way (using a spinlock on 32b) resulted in unsightly #ifdefs in the code. But obviously, being correct takes precedence. Signed-off-by: Andy Grover <andy.grover@oracle.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/rds/ib.h14
-rw-r--r--net/rds/ib_cm.c9
-rw-r--r--net/rds/ib_recv.c37
-rw-r--r--net/rds/iw.h14
-rw-r--r--net/rds/iw_cm.c9
-rw-r--r--net/rds/iw_recv.c37
-rw-r--r--net/rds/rds.h4
7 files changed, 100 insertions, 24 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index c08ffffb3164..069206cae733 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -108,7 +108,12 @@ struct rds_ib_connection {
108 108
109 /* sending acks */ 109 /* sending acks */
110 unsigned long i_ack_flags; 110 unsigned long i_ack_flags;
111#ifdef KERNEL_HAS_ATOMIC64
112 atomic64_t i_ack_next; /* next ACK to send */
113#else
114 spinlock_t i_ack_lock; /* protect i_ack_next */
111 u64 i_ack_next; /* next ACK to send */ 115 u64 i_ack_next; /* next ACK to send */
116#endif
112 struct rds_header *i_ack; 117 struct rds_header *i_ack;
113 struct ib_send_wr i_ack_wr; 118 struct ib_send_wr i_ack_wr;
114 struct ib_sge i_ack_sge; 119 struct ib_sge i_ack_sge;
@@ -363,13 +368,4 @@ rds_ib_data_sge(struct rds_ib_connection *ic, struct ib_sge *sge)
363 return &sge[1]; 368 return &sge[1];
364} 369}
365 370
366static inline void rds_ib_set_64bit(u64 *ptr, u64 val)
367{
368#if BITS_PER_LONG == 64
369 *ptr = val;
370#else
371 set_64bit(ptr, val);
372#endif
373}
374
375#endif 371#endif
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 889ab0441359..f8e40e1a6038 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -636,7 +636,11 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
636 636
637 /* Clear the ACK state */ 637 /* Clear the ACK state */
638 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags); 638 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
639 rds_ib_set_64bit(&ic->i_ack_next, 0); 639#ifdef KERNEL_HAS_ATOMIC64
640 atomic64_set(&ic->i_ack_next, 0);
641#else
642 ic->i_ack_next = 0;
643#endif
640 ic->i_ack_recv = 0; 644 ic->i_ack_recv = 0;
641 645
642 /* Clear flow control state */ 646 /* Clear flow control state */
@@ -669,6 +673,9 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
669 673
670 INIT_LIST_HEAD(&ic->ib_node); 674 INIT_LIST_HEAD(&ic->ib_node);
671 mutex_init(&ic->i_recv_mutex); 675 mutex_init(&ic->i_recv_mutex);
676#ifndef KERNEL_HAS_ATOMIC64
677 spin_lock_init(&ic->i_ack_lock);
678#endif
672 679
673 /* 680 /*
674 * rds_ib_conn_shutdown() waits for these to be emptied so they 681 * rds_ib_conn_shutdown() waits for these to be emptied so they
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 5061b5502162..36d931573ff4 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -395,10 +395,37 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
395 * room for it beyond the ring size. Send completion notices its special 395 * room for it beyond the ring size. Send completion notices its special
396 * wr_id and avoids working with the ring in that case. 396 * wr_id and avoids working with the ring in that case.
397 */ 397 */
398#ifndef KERNEL_HAS_ATOMIC64
398static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, 399static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
399 int ack_required) 400 int ack_required)
400{ 401{
401 rds_ib_set_64bit(&ic->i_ack_next, seq); 402 unsigned long flags;
403
404 spin_lock_irqsave(&ic->i_ack_lock, flags);
405 ic->i_ack_next = seq;
406 if (ack_required)
407 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
408 spin_unlock_irqrestore(&ic->i_ack_lock, flags);
409}
410
411static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
412{
413 unsigned long flags;
414 u64 seq;
415
416 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
417
418 spin_lock_irqsave(&ic->i_ack_lock, flags);
419 seq = ic->i_ack_next;
420 spin_unlock_irqrestore(&ic->i_ack_lock, flags);
421
422 return seq;
423}
424#else
425static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
426 int ack_required)
427{
428 atomic64_set(&ic->i_ack_next, seq);
402 if (ack_required) { 429 if (ack_required) {
403 smp_mb__before_clear_bit(); 430 smp_mb__before_clear_bit();
404 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 431 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
@@ -410,8 +437,10 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
410 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 437 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
411 smp_mb__after_clear_bit(); 438 smp_mb__after_clear_bit();
412 439
413 return ic->i_ack_next; 440 return atomic64_read(&ic->i_ack_next);
414} 441}
442#endif
443
415 444
416static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits) 445static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credits)
417{ 446{
@@ -464,6 +493,10 @@ static void rds_ib_send_ack(struct rds_ib_connection *ic, unsigned int adv_credi
464 * - i_ack_next, which is the last sequence number we received 493 * - i_ack_next, which is the last sequence number we received
465 * 494 *
466 * Potentially, send queue and receive queue handlers can run concurrently. 495 * Potentially, send queue and receive queue handlers can run concurrently.
496 * It would be nice to not have to use a spinlock to synchronize things,
497 * but the one problem that rules this out is that 64bit updates are
498 * not atomic on all platforms. Things would be a lot simpler if
499 * we had atomic64 or maybe cmpxchg64 everywhere.
467 * 500 *
468 * Reconnecting complicates this picture just slightly. When we 501 * Reconnecting complicates this picture just slightly. When we
469 * reconnect, we may be seeing duplicate packets. The peer 502 * reconnect, we may be seeing duplicate packets. The peer
diff --git a/net/rds/iw.h b/net/rds/iw.h
index 70eb948f42f4..b4fb27252895 100644
--- a/net/rds/iw.h
+++ b/net/rds/iw.h
@@ -131,7 +131,12 @@ struct rds_iw_connection {
131 131
132 /* sending acks */ 132 /* sending acks */
133 unsigned long i_ack_flags; 133 unsigned long i_ack_flags;
134#ifdef KERNEL_HAS_ATOMIC64
135 atomic64_t i_ack_next; /* next ACK to send */
136#else
137 spinlock_t i_ack_lock; /* protect i_ack_next */
134 u64 i_ack_next; /* next ACK to send */ 138 u64 i_ack_next; /* next ACK to send */
139#endif
135 struct rds_header *i_ack; 140 struct rds_header *i_ack;
136 struct ib_send_wr i_ack_wr; 141 struct ib_send_wr i_ack_wr;
137 struct ib_sge i_ack_sge; 142 struct ib_sge i_ack_sge;
@@ -391,13 +396,4 @@ rds_iw_data_sge(struct rds_iw_connection *ic, struct ib_sge *sge)
391 return &sge[1]; 396 return &sge[1];
392} 397}
393 398
394static inline void rds_iw_set_64bit(u64 *ptr, u64 val)
395{
396#if BITS_PER_LONG == 64
397 *ptr = val;
398#else
399 set_64bit(ptr, val);
400#endif
401}
402
403#endif 399#endif
diff --git a/net/rds/iw_cm.c b/net/rds/iw_cm.c
index 0ffaa3e97ad6..a416b0d492b1 100644
--- a/net/rds/iw_cm.c
+++ b/net/rds/iw_cm.c
@@ -659,7 +659,11 @@ void rds_iw_conn_shutdown(struct rds_connection *conn)
659 659
660 /* Clear the ACK state */ 660 /* Clear the ACK state */
661 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags); 661 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
662 rds_iw_set_64bit(&ic->i_ack_next, 0); 662#ifdef KERNEL_HAS_ATOMIC64
663 atomic64_set(&ic->i_ack_next, 0);
664#else
665 ic->i_ack_next = 0;
666#endif
663 ic->i_ack_recv = 0; 667 ic->i_ack_recv = 0;
664 668
665 /* Clear flow control state */ 669 /* Clear flow control state */
@@ -693,6 +697,9 @@ int rds_iw_conn_alloc(struct rds_connection *conn, gfp_t gfp)
693 697
694 INIT_LIST_HEAD(&ic->iw_node); 698 INIT_LIST_HEAD(&ic->iw_node);
695 mutex_init(&ic->i_recv_mutex); 699 mutex_init(&ic->i_recv_mutex);
700#ifndef KERNEL_HAS_ATOMIC64
701 spin_lock_init(&ic->i_ack_lock);
702#endif
696 703
697 /* 704 /*
698 * rds_iw_conn_shutdown() waits for these to be emptied so they 705 * rds_iw_conn_shutdown() waits for these to be emptied so they
diff --git a/net/rds/iw_recv.c b/net/rds/iw_recv.c
index a1931f0027a2..fde470fa50d5 100644
--- a/net/rds/iw_recv.c
+++ b/net/rds/iw_recv.c
@@ -395,10 +395,37 @@ void rds_iw_recv_init_ack(struct rds_iw_connection *ic)
395 * room for it beyond the ring size. Send completion notices its special 395 * room for it beyond the ring size. Send completion notices its special
396 * wr_id and avoids working with the ring in that case. 396 * wr_id and avoids working with the ring in that case.
397 */ 397 */
398#ifndef KERNEL_HAS_ATOMIC64
398static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq, 399static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
399 int ack_required) 400 int ack_required)
400{ 401{
401 rds_iw_set_64bit(&ic->i_ack_next, seq); 402 unsigned long flags;
403
404 spin_lock_irqsave(&ic->i_ack_lock, flags);
405 ic->i_ack_next = seq;
406 if (ack_required)
407 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
408 spin_unlock_irqrestore(&ic->i_ack_lock, flags);
409}
410
411static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
412{
413 unsigned long flags;
414 u64 seq;
415
416 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
417
418 spin_lock_irqsave(&ic->i_ack_lock, flags);
419 seq = ic->i_ack_next;
420 spin_unlock_irqrestore(&ic->i_ack_lock, flags);
421
422 return seq;
423}
424#else
425static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
426 int ack_required)
427{
428 atomic64_set(&ic->i_ack_next, seq);
402 if (ack_required) { 429 if (ack_required) {
403 smp_mb__before_clear_bit(); 430 smp_mb__before_clear_bit();
404 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 431 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
@@ -410,8 +437,10 @@ static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
410 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 437 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
411 smp_mb__after_clear_bit(); 438 smp_mb__after_clear_bit();
412 439
413 return ic->i_ack_next; 440 return atomic64_read(&ic->i_ack_next);
414} 441}
442#endif
443
415 444
416static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credits) 445static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credits)
417{ 446{
@@ -464,6 +493,10 @@ static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credi
464 * - i_ack_next, which is the last sequence number we received 493 * - i_ack_next, which is the last sequence number we received
465 * 494 *
466 * Potentially, send queue and receive queue handlers can run concurrently. 495 * Potentially, send queue and receive queue handlers can run concurrently.
496 * It would be nice to not have to use a spinlock to synchronize things,
497 * but the one problem that rules this out is that 64bit updates are
498 * not atomic on all platforms. Things would be a lot simpler if
499 * we had atomic64 or maybe cmpxchg64 everywhere.
467 * 500 *
468 * Reconnecting complicates this picture just slightly. When we 501 * Reconnecting complicates this picture just slightly. When we
469 * reconnect, we may be seeing duplicate packets. The peer 502 * reconnect, we may be seeing duplicate packets. The peer
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 060400704979..619f0a30a4e5 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -28,6 +28,10 @@
28 */ 28 */
29#define RDS_PORT 18634 29#define RDS_PORT 18634
30 30
31#ifdef ATOMIC64_INIT
32#define KERNEL_HAS_ATOMIC64
33#endif
34
31#ifdef DEBUG 35#ifdef DEBUG
32#define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args) 36#define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
33#else 37#else