Skip to content

Commit bbb4172

Browse files
committed
Merge branch 'load-acquire/store-release barriers for'
Björn Töpel says: ==================== This two-patch series introduces load-acquire/store-release barriers for the AF_XDP rings. For most contemporary architectures, this is more effective than a SPSC ring based on smp_{r,w,}mb() barriers. More importantly, load-acquire/store-release semantics make the ring code easier to follow. This is effectively the change done in commit 6c43c09 ("documentation: Update circular buffer for load-acquire/store-release"), but for the AF_XDP rings. Both libbpf and the kernel-side are updated. Full details are outlined in the commits! Thanks to the LKMM-folks (Paul/Alan/Will) for helping me out in this complicated matter! Changelog v1[1]->v2: * Expanded the commit message for patch 1, and included the LKMM litmus tests. Hopefully this clear things up. (Daniel) * Clarified why the smp_mb()/smp_load_acquire() is not needed in (A); control dependency with load to store. (Toke) [1] https://lore.kernel.org/bpf/[email protected]/ Thanks, Björn ==================== Signed-off-by: Andrii Nakryiko <[email protected]>
2 parents 299194a + 291471d commit bbb4172

File tree

3 files changed

+68
-51
lines changed

3 files changed

+68
-51
lines changed

net/xdp/xsk_queue.h

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,18 @@ struct xsk_queue {
4747
u64 queue_empty_descs;
4848
};
4949

50-
/* The structure of the shared state of the rings are the same as the
51-
* ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
52-
* ring, the kernel is the producer and user space is the consumer. For
53-
* the Tx and fill rings, the kernel is the consumer and user space is
54-
* the producer.
50+
/* The structure of the shared state of the rings are a simple
51+
* circular buffer, as outlined in
52+
* Documentation/core-api/circular-buffers.rst. For the Rx and
53+
* completion ring, the kernel is the producer and user space is the
54+
* consumer. For the Tx and fill rings, the kernel is the consumer and
55+
* user space is the producer.
5556
*
5657
* producer consumer
5758
*
58-
* if (LOAD ->consumer) { LOAD ->producer
59-
* (A) smp_rmb() (C)
59+
* if (LOAD ->consumer) { (A) LOAD.acq ->producer (C)
6060
* STORE $data LOAD $data
61-
* smp_wmb() (B) smp_mb() (D)
62-
* STORE ->producer STORE ->consumer
61+
* STORE.rel ->producer (B) STORE.rel ->consumer (D)
6362
* }
6463
*
6564
* (A) pairs with (D), and (B) pairs with (C).
@@ -78,7 +77,8 @@ struct xsk_queue {
7877
*
7978
* (A) is a control dependency that separates the load of ->consumer
8079
* from the stores of $data. In case ->consumer indicates there is no
81-
* room in the buffer to store $data we do not. So no barrier is needed.
80+
* room in the buffer to store $data we do not. The dependency will
81+
* order both of the stores after the loads. So no barrier is needed.
8282
*
8383
* (D) protects the load of the data to be observed to happen after the
8484
* store of the consumer pointer. If we did not have this memory
@@ -227,15 +227,13 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
227227

228228
static inline void __xskq_cons_release(struct xsk_queue *q)
229229
{
230-
smp_mb(); /* D, matches A */
231-
WRITE_ONCE(q->ring->consumer, q->cached_cons);
230+
smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
232231
}
233232

234233
static inline void __xskq_cons_peek(struct xsk_queue *q)
235234
{
236235
/* Refresh the local pointer */
237-
q->cached_prod = READ_ONCE(q->ring->producer);
238-
smp_rmb(); /* C, matches B */
236+
q->cached_prod = smp_load_acquire(&q->ring->producer); /* C, matches B */
239237
}
240238

241239
static inline void xskq_cons_get_entries(struct xsk_queue *q)
@@ -397,9 +395,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
397395

398396
static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
399397
{
400-
smp_wmb(); /* B, matches C */
401-
402-
WRITE_ONCE(q->ring->producer, idx);
398+
smp_store_release(&q->ring->producer, idx); /* B, matches C */
403399
}
404400

405401
static inline void xskq_prod_submit(struct xsk_queue *q)

tools/lib/bpf/libbpf_util.h

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#define __LIBBPF_LIBBPF_UTIL_H
66

77
#include <stdbool.h>
8+
#include <linux/compiler.h>
89

910
#ifdef __cplusplus
1011
extern "C" {
@@ -15,29 +16,56 @@ extern "C" {
1516
* application that uses libbpf.
1617
*/
1718
#if defined(__i386__) || defined(__x86_64__)
18-
# define libbpf_smp_rmb() asm volatile("" : : : "memory")
19-
# define libbpf_smp_wmb() asm volatile("" : : : "memory")
20-
# define libbpf_smp_mb() \
21-
asm volatile("lock; addl $0,-4(%%rsp)" : : : "memory", "cc")
22-
/* Hinders stores to be observed before older loads. */
23-
# define libbpf_smp_rwmb() asm volatile("" : : : "memory")
19+
# define libbpf_smp_store_release(p, v) \
20+
do { \
21+
asm volatile("" : : : "memory"); \
22+
WRITE_ONCE(*p, v); \
23+
} while (0)
24+
# define libbpf_smp_load_acquire(p) \
25+
({ \
26+
typeof(*p) ___p1 = READ_ONCE(*p); \
27+
asm volatile("" : : : "memory"); \
28+
___p1; \
29+
})
2430
#elif defined(__aarch64__)
25-
# define libbpf_smp_rmb() asm volatile("dmb ishld" : : : "memory")
26-
# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
27-
# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
28-
# define libbpf_smp_rwmb() libbpf_smp_mb()
29-
#elif defined(__arm__)
30-
/* These are only valid for armv7 and above */
31-
# define libbpf_smp_rmb() asm volatile("dmb ish" : : : "memory")
32-
# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
33-
# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
34-
# define libbpf_smp_rwmb() libbpf_smp_mb()
35-
#else
36-
/* Architecture missing native barrier functions. */
37-
# define libbpf_smp_rmb() __sync_synchronize()
38-
# define libbpf_smp_wmb() __sync_synchronize()
39-
# define libbpf_smp_mb() __sync_synchronize()
40-
# define libbpf_smp_rwmb() __sync_synchronize()
31+
# define libbpf_smp_store_release(p, v) \
32+
asm volatile ("stlr %w1, %0" : "=Q" (*p) : "r" (v) : "memory")
33+
# define libbpf_smp_load_acquire(p) \
34+
({ \
35+
typeof(*p) ___p1; \
36+
asm volatile ("ldar %w0, %1" \
37+
: "=r" (___p1) : "Q" (*p) : "memory"); \
38+
__p1; \
39+
})
40+
#elif defined(__riscv)
41+
# define libbpf_smp_store_release(p, v) \
42+
do { \
43+
asm volatile ("fence rw,w" : : : "memory"); \
44+
WRITE_ONCE(*p, v); \
45+
} while (0)
46+
# define libbpf_smp_load_acquire(p) \
47+
({ \
48+
typeof(*p) ___p1 = READ_ONCE(*p); \
49+
asm volatile ("fence r,rw" : : : "memory"); \
50+
___p1; \
51+
})
52+
#endif
53+
54+
#ifndef libbpf_smp_store_release
55+
#define libbpf_smp_store_release(p, v) \
56+
do { \
57+
__sync_synchronize(); \
58+
WRITE_ONCE(*p, v); \
59+
} while (0)
60+
#endif
61+
62+
#ifndef libbpf_smp_load_acquire
63+
#define libbpf_smp_load_acquire(p) \
64+
({ \
65+
typeof(*p) ___p1 = READ_ONCE(*p); \
66+
__sync_synchronize(); \
67+
___p1; \
68+
})
4169
#endif
4270

4371
#ifdef __cplusplus

tools/lib/bpf/xsk.h

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ static inline __u32 xsk_prod_nb_free(struct xsk_ring_prod *r, __u32 nb)
9696
* this function. Without this optimization it whould have been
9797
* free_entries = r->cached_prod - r->cached_cons + r->size.
9898
*/
99-
r->cached_cons = *r->consumer + r->size;
99+
r->cached_cons = libbpf_smp_load_acquire(r->consumer);
100+
r->cached_cons += r->size;
100101

101102
return r->cached_cons - r->cached_prod;
102103
}
@@ -106,7 +107,7 @@ static inline __u32 xsk_cons_nb_avail(struct xsk_ring_cons *r, __u32 nb)
106107
__u32 entries = r->cached_prod - r->cached_cons;
107108

108109
if (entries == 0) {
109-
r->cached_prod = *r->producer;
110+
r->cached_prod = libbpf_smp_load_acquire(r->producer);
110111
entries = r->cached_prod - r->cached_cons;
111112
}
112113

@@ -129,21 +130,14 @@ static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, __u32 nb)
129130
/* Make sure everything has been written to the ring before indicating
130131
* this to the kernel by writing the producer pointer.
131132
*/
132-
libbpf_smp_wmb();
133-
134-
*prod->producer += nb;
133+
libbpf_smp_store_release(prod->producer, *prod->producer + nb);
135134
}
136135

137136
static inline __u32 xsk_ring_cons__peek(struct xsk_ring_cons *cons, __u32 nb, __u32 *idx)
138137
{
139138
__u32 entries = xsk_cons_nb_avail(cons, nb);
140139

141140
if (entries > 0) {
142-
/* Make sure we do not speculatively read the data before
143-
* we have received the packet buffers from the ring.
144-
*/
145-
libbpf_smp_rmb();
146-
147141
*idx = cons->cached_cons;
148142
cons->cached_cons += entries;
149143
}
@@ -161,9 +155,8 @@ static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, __u32 nb)
161155
/* Make sure data has been read before indicating we are done
162156
* with the entries by updating the consumer pointer.
163157
*/
164-
libbpf_smp_rwmb();
158+
libbpf_smp_store_release(cons->consumer, *cons->consumer + nb);
165159

166-
*cons->consumer += nb;
167160
}
168161

169162
static inline void *xsk_umem__get_data(void *umem_area, __u64 addr)

0 commit comments

Comments
 (0)