CNDP  22.08.0
cthread_queue.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2019-2022 Intel Corporation
3  */
4 
5 /*
6  * Some portions of this software is derived from the producer
7  * consumer queues described by Dmitry Vyukov and published here
8  * http://www.1024cores.net
9  *
10  * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions
13  * are met:
14  *
15  * 1. Redistributions of source code must retain the above copyright notice,
16  * this list of conditions and the following disclaimer.
17  *
18  * 2. Redistributions in binary form must reproduce the above copyright notice,
19  * this list of conditions and the following disclaimer in the documentation
20  * and/or other materials provided with the distribution.
21  *
22  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
23  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
24  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
25  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
26  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
27  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
28  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
29  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
30  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
31  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
32  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  *
34  * The views and conclusions contained in the software and documentation are
35  * those of the authors and should not be interpreted as representing official
36  * policies, either expressed or implied, of Dmitry Vyukov.
37  */
38 
39 #ifndef _CTHREAD_QUEUE_H_
40 #define _CTHREAD_QUEUE_H_
41 
42 #include <string.h>
43 
44 #include <cne_prefetch.h>
45 #include <cne_per_thread.h>
46 
47 #include "cthread_int.h"
48 #include "cthread.h"
49 #include "cthread_pool.h"
50 
51 #ifdef __cplusplus
52 extern "C" {
53 #endif
54 
55 /*
56  * This file implements an unbounded FIFO queue based on a lock free
57  * linked list.
58  *
59  * The queue is non-intrusive in that it uses intermediate nodes, and does
60  * not require these nodes to be inserted into the object being placed
61  * in the queue.
62  *
63  * This is slightly more efficient than the very similar queue in cthread_pool
64  * in that it does not have to swing a stub node as the queue becomes empty.
65  *
66  * The queue access functions allocate and free intermediate node
67  * transparently from/to a per scheduler pool ( see cthread_pool.h ).
68  *
69  * The queue provides both MPSC and SPSC insert methods
70  */
71 
72 /*
73  * define a queue of cthread nodes
74  */
75 struct cthread_queue {
76  struct qnode *head;
77  struct qnode *tail __cne_cache_aligned;
78  char name[CTHREAD_NAME_SIZE];
80 
89 static inline struct cthread_queue *
90 _cthread_queue_create(const char *name)
91 {
92  struct qnode *stub;
93  struct cthread_queue *new_queue;
94 
95  new_queue = calloc(1, sizeof(struct cthread_queue));
96  if (!new_queue)
97  return NULL;
98 
99  /* allocated stub node */
100  stub = _qnode_alloc();
101  CNE_ASSERT(stub);
102  if (stub == NULL) {
103  free(new_queue);
104  return NULL;
105  }
106 
107  strlcpy(new_queue->name, (name) ? name : "Unknown", sizeof(new_queue->name));
108 
109  /* initialize queue as empty */
110  stub->next = NULL;
111  new_queue->head = stub;
112  new_queue->tail = stub;
113 
114  return new_queue;
115 }
116 
125 static __attribute__((always_inline)) inline int
126 _cthread_queue_empty(struct cthread_queue *q)
127 {
128  return q->tail == q->head;
129 }
130 
139 static inline int
140 _cthread_queue_destroy(struct cthread_queue *q)
141 {
142  if (!q)
143  return 0;
144 
145  if (!_cthread_queue_empty(q))
146  return -1;
147 
148  _qnode_free(q->head);
149  free(q);
150  return 0;
151 }
152 
153 CNE_DECLARE_PER_THREAD(struct cthread_sched *, this_sched);
154 
165 static __attribute__((always_inline)) inline struct qnode *
166 _cthread_queue_insert_mp(struct cthread_queue *q, void *data)
167 {
168  struct qnode *prev;
169  struct qnode *n = _qnode_alloc();
170 
171  if (!n)
172  return NULL;
173 
174  /* set object in node */
175  n->data = data;
176  n->next = NULL;
177 
178  /* this is an MPSC method, perform a locked update */
179  prev = n;
180  prev = (struct qnode *)__sync_lock_test_and_set((uint64_t *)&(q)->head, (uint64_t)prev);
181  /* there is a window of inconsistency until prev next is set,
182  * which is why remove must retry
183  */
184  prev->next = n;
185 
186  return n;
187 }
188 
200 static __attribute__((always_inline)) inline struct qnode *
201 _cthread_queue_insert_sp(struct cthread_queue *q, void *data)
202 {
203  /* allocate a queue node */
204  struct qnode *prev;
205  struct qnode *n = _qnode_alloc();
206 
207  if (n == NULL)
208  return NULL;
209 
210  /* set data in node */
211  n->data = data;
212  n->next = NULL;
213 
214  /* this is an SPSC method, no need for locked exchange operation */
215  prev = q->head;
216  prev->next = q->head = n;
217 
218  return n;
219 }
220 
229 static __attribute__((always_inline)) inline void *
230 _cthread_queue_poll(struct cthread_queue *q)
231 {
232  void *data = NULL;
233  struct qnode *tail = q->tail;
234  struct qnode *next = (struct qnode *)tail->next;
235 
236  /*
237  * There is a small window of inconsistency between producer and
238  * consumer whereby the queue may appear empty if consumer and
239  * producer access it at the same time.
240  * The consumer must handle this by retrying
241  */
242 
243  if (likely(next != NULL)) {
244  q->tail = next;
245  tail->data = next->data;
246  data = tail->data;
247 
248  /* free the node */
249  _qnode_free(tail);
250 
251  return data;
252  }
253  return NULL;
254 }
255 
264 static __attribute__((always_inline)) inline void *
265 _cthread_queue_remove(struct cthread_queue *q)
266 {
267  void *data = NULL;
268 
269  /*
270  * There is a small window of inconsistency between producer and
271  * consumer whereby the queue may appear empty if consumer and
272  * producer access it at the same time. We handle this by retrying
273  */
274  do {
275  data = _cthread_queue_poll(q);
276 
277  if (likely(data != NULL))
278  return data;
280  } while (unlikely(!_cthread_queue_empty(q)));
281  return NULL;
282 }
283 
294 static __attribute__((always_inline)) inline void *
295 _cthread_queue_remove_given(struct cthread_queue *q, void *given)
296 {
297  void *data = NULL;
298  struct cthread_queue *saved;
299 
300  saved = _cthread_queue_create(NULL);
301  if (saved == NULL)
302  return NULL;
303 
304  /*
305  * There is a small window of inconsistency between producer and
306  * consumer whereby the queue may appear empty if consumer and
307  * producer access it at the same time. We handle this by retrying
308  */
309  do {
310  data = _cthread_queue_poll(q);
311 
312  if (likely(data != NULL)) {
313  if (data == given)
314  break;
315  _cthread_queue_insert_sp(saved, data);
316  data = NULL;
317  }
319  } while (unlikely(!_cthread_queue_empty(q)));
320 
321  while (!_cthread_queue_empty(saved)) {
322  given = _cthread_queue_remove(saved);
323  _cthread_queue_insert_mp(q, given);
324  }
325  _cthread_queue_destroy(saved);
326 
327  return data;
328 }
329 
330 #ifdef __cplusplus
331 }
332 #endif
333 
334 #endif /* _CTHREAD_QUEUE_H_ */
#define likely(x)
#define unlikely(x)
#define cne_compiler_barrier()
Definition: cne_common.h:116
#define __cne_cache_aligned
Definition: cne_common.h:379
#define CNE_DECLARE_PER_THREAD(type, name)