CNDP  22.08.0
cne_ring_generic.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2019-2022 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 #include <sched.h>
10 #include <cne_atomic.h>
11 
12 #ifndef _CNE_RING_GENERIC_H_
13 #define _CNE_RING_GENERIC_H_
14 
20 #ifdef __cplusplus
21 extern "C" {
22 #endif
23 
24 #include <errno.h>
25 #include <cne_branch_prediction.h>
26 
27 #include "ring_private.h"
28 #include "cne_ring.h"
29 
30 // clang-format off
31 /*
32  * The actual enqueue of pointers on the ring.
33  * Placed here since identical code needed in both
34  * single and multi producer enqueue functions
35  */
36 #define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
37  unsigned int i; \
38  const uint32_t size = (r)->size; \
39  uint32_t idx = prod_head & (r)->mask; \
40  obj_type *ring = (obj_type *)ring_start; \
41  if (likely(idx + n < size)) { \
42  for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
43  ring[idx] = obj_table[i]; \
44  ring[idx+1] = obj_table[i+1]; \
45  ring[idx+2] = obj_table[i+2]; \
46  ring[idx+3] = obj_table[i+3]; \
47  } \
48  switch (n & 0x3) { \
49  case 3: \
50  ring[idx++] = obj_table[i++]; /* fallthrough */ \
51  case 2: \
52  ring[idx++] = obj_table[i++]; /* fallthrough */ \
53  case 1: \
54  ring[idx++] = obj_table[i++]; \
55  } \
56  } else { \
57  for (i = 0; idx < size; i++, idx++)\
58  ring[idx] = obj_table[i]; \
59  for (idx = 0; i < n; i++, idx++) \
60  ring[idx] = obj_table[i]; \
61  } \
62 } while (0)
63 
64 /* The actual copy of pointers on the ring to obj_table.
65  * Placed here since identical code needed in both
66  * single and multi consumer dequeue functions
67  */
68 #define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \
69  unsigned int i; \
70  uint32_t idx = cons_head &r->mask; \
71  const uint32_t size = r->size; \
72  obj_type *ring = (obj_type *)ring_start; \
73  if (likely(idx + n < size)) { \
74  for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
75  obj_table[i] = ring[idx]; \
76  obj_table[i+1] = ring[idx+1]; \
77  obj_table[i+2] = ring[idx+2]; \
78  obj_table[i+3] = ring[idx+3]; \
79  } \
80  switch (n & 0x3) { \
81  case 3: \
82  obj_table[i++] = ring[idx++]; /* fallthrough */ \
83  case 2: \
84  obj_table[i++] = ring[idx++]; /* fallthrough */ \
85  case 1: \
86  obj_table[i++] = ring[idx++]; \
87  } \
88  } else { \
89  for (i = 0; idx < size; i++, idx++) \
90  obj_table[i] = ring[idx]; \
91  for (idx = 0; i < n; i++, idx++) \
92  obj_table[i] = ring[idx]; \
93  } \
94 } while (0)
95 // clang-format on
96 
98 update_tail(struct cne_ring_headtail *ht, uint32_t old_val, uint32_t new_val, uint32_t single)
99 {
100  /*
101  * If there are other enqueues/dequeues in progress that preceded us,
102  * we need to wait for them to complete
103  */
104  if (!single) {
105  uint64_t timo = 1000;
106  /* Need another implementation of this here. Not preemptible. */
107  while (unlikely(atomic_load_explicit(&ht->tail, CNE_MEMORY_ORDER(relaxed)) != old_val)) {
108  if (--timo == 0) {
109  timo = 1000;
110  sched_yield();
111  }
112  }
113  }
114 
115  atomic_store_explicit(&ht->tail, new_val, CNE_MEMORY_ORDER(release));
116 }
117 
141 static __cne_always_inline unsigned int
142 __cne_ring_move_prod_head(struct cne_ring *r, unsigned int is_sp, unsigned int n,
143  enum cne_ring_queue_behavior behavior, uint32_t *old_head,
144  uint32_t *new_head, uint32_t *free_entries)
145 {
146  const uint32_t capacity = r->capacity;
147  unsigned int max = n;
148  uint32_t cons_tail;
149  _Bool success;
150 
151  *old_head = atomic_load_explicit(&r->prod.head, CNE_MEMORY_ORDER(relaxed));
152  do {
153  /* Reset n to the initial burst count */
154  n = max;
155 
156  /* add rmb barrier to avoid load/load reorder in weak
157  * memory model. It is noop on x86
158  */
159  atomic_thread_fence(CNE_MEMORY_ORDER(acquire));
160 
161  /* load-acquire synchronize with store-release of ht->tail
162  * in update_tail.
163  */
164  cons_tail = atomic_load_explicit(&r->cons.tail, CNE_MEMORY_ORDER(acquire));
165 
166  /*
167  * The subtraction is done between two unsigned 32bits value
168  * (the result is always modulo 32 bits even if we have
169  * *old_head > cons_tail). So 'free_entries' is always between 0
170  * and capacity (which is < size).
171  */
172  *free_entries = (capacity + cons_tail - *old_head);
173 
174  /* check that we have enough room in ring */
175  if (unlikely(n > *free_entries))
176  n = (behavior == CNE_RING_QUEUE_FIXED_ITEMS) ? 0 : *free_entries;
177 
178  if (n == 0)
179  return 0;
180 
181  *new_head = *old_head + n;
182  if (is_sp) {
183  r->prod.head = *new_head;
184  success = 1;
185  } else {
186  success = atomic_compare_exchange_strong_explicit(&r->prod.head, old_head, *new_head,
187  CNE_MEMORY_ORDER(relaxed),
188  CNE_MEMORY_ORDER(relaxed));
189  }
190  } while (unlikely(success == 0));
191  return n;
192 }
193 
217 static __cne_always_inline unsigned int
218 __cne_ring_move_cons_head(struct cne_ring *r, unsigned int is_sc, unsigned int n,
219  enum cne_ring_queue_behavior behavior, uint32_t *old_head,
220  uint32_t *new_head, uint32_t *entries)
221 {
222  unsigned int max = n;
223  uint32_t prod_tail;
224  _Bool success;
225 
226  /* move cons.head atomically */
227  *old_head = atomic_load_explicit(&r->cons.head, CNE_MEMORY_ORDER(relaxed));
228  do {
229  /* Restore n as it may change every loop */
230  n = max;
231 
232  /* add rmb barrier to avoid load/load reorder in weak
233  * memory model. It is noop on x86
234  */
235  atomic_thread_fence(CNE_MEMORY_ORDER(acquire));
236 
237  /* this load-acquire synchronize with store-release of ht->tail
238  * in update_tail.
239  */
240  prod_tail = atomic_load_explicit(&r->prod.tail, CNE_MEMORY_ORDER(acquire));
241 
242  /* The subtraction is done between two unsigned 32bits value
243  * (the result is always modulo 32 bits even if we have
244  * cons_head > prod_tail). So 'entries' is always between 0
245  * and size(ring)-1.
246  */
247  *entries = (prod_tail - *old_head);
248 
249  /* Set the actual entries for dequeue */
250  if (n > *entries)
251  n = (behavior == CNE_RING_QUEUE_FIXED_ITEMS) ? 0 : *entries;
252 
253  if (unlikely(n == 0))
254  return 0;
255 
256  *new_head = *old_head + n;
257  if (is_sc) {
258  r->cons.head = *new_head;
259  success = 1;
260  } else {
261  success = atomic_compare_exchange_strong_explicit(&r->cons.head, old_head, *new_head,
262  CNE_MEMORY_ORDER(relaxed),
263  CNE_MEMORY_ORDER(relaxed));
264  }
265  } while (unlikely(success == 0));
266  return n;
267 }
268 
289 static __cne_always_inline unsigned int
290 __cne_ring_do_enqueue(struct cne_ring *r, void *const *obj_table, unsigned int n,
291  enum cne_ring_queue_behavior behavior, unsigned int is_sp,
292  unsigned int *free_space)
293 {
294  uint32_t prod_head, prod_next;
295  uint32_t free_entries;
296 
297  n = __cne_ring_move_prod_head(r, is_sp, n, behavior, &prod_head, &prod_next, &free_entries);
298  if (n == 0)
299  goto end;
300 
301  ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
302 
303  update_tail(&r->prod, prod_head, prod_next, is_sp);
304 end:
305  if (free_space != NULL)
306  *free_space = free_entries - n;
307  return n;
308 }
309 
330 static __cne_always_inline unsigned int
331 __cne_ring_do_dequeue(struct cne_ring *r, void **obj_table, unsigned int n,
332  enum cne_ring_queue_behavior behavior, unsigned int is_sc,
333  unsigned int *available)
334 {
335  uint32_t cons_head, cons_next;
336  uint32_t entries;
337 
338  n = __cne_ring_move_cons_head(r, (int)is_sc, n, behavior, &cons_head, &cons_next, &entries);
339  if (n == 0)
340  goto end;
341 
342  DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
343 
344  update_tail(&r->cons, cons_head, cons_next, is_sc);
345 
346 end:
347  if (available != NULL)
348  *available = entries - n;
349  return n;
350 }
351 
352 #ifdef __cplusplus
353 }
354 #endif
355 
356 #endif /* _CNE_RING_GENERIC_H_ */
#define unlikely(x)
#define __cne_always_inline
Definition: cne_common.h:218