#include <pthread.h>
#include <signal.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <net/ethernet.h>
#include <dlb.h>
#include "dlb_test.h"
dlb_thread_args_t prod_args, cons_args, *work_args;
dlb_hdl_t dlb;
dlb_dev_cap_t cap;
int dev_id;
uint64_t num_events;
int num_workers;
dlb_domain_hdl_t domain;
int domain_id;
unsigned sns_per_queue;
static struct fwd_info fwd_info;
struct fwd_info *fwd = &fwd_info;
#define foreach_thd_lport(_t, _lp) \
for (int _i = 0; _i < _t->lport_cnt && (_lp = _t->lports[_i]); _i++, _lp = _t->lports[_i])
#define PKTDEV_USE_NON_AVX 1
#if PKTDEV_USE_NON_AVX
static inline void
swap_mac_addresses(void *data)
{
struct ether_header *eth = (struct ether_header *)data;
tmp = *src_addr;
*src_addr = *dst_addr;
*dst_addr = tmp;
}
#define MAC_SWAP swap_mac_addresses
#else
#define MAC_SWAP pktdev_mac_swap
#endif
static void
{
if (thd->priv_) {
int i;
for (i = 0; i < jcfg_num_lports(fwd->jinfo); i++) {
if (txbuffs[i])
txbuffs[i] = NULL;
}
free(thd->priv_);
thd->priv_ = NULL;
}
}
static int
{
struct fwd_port *pd;
pd = lport->priv_;
if (!txbuffs[idx])
return 0;
}
static int
{
if (thd->priv_)
CNE_ERR_RET(
"Expected thread's private data to be unused but it is %p\n", thd->priv_);
thd->priv_ = calloc(jcfg_num_lports(fwd->jinfo),
sizeof(
txbuff_t *));
if (!thd->priv_)
CNE_ERR_RET(
"Failed to allocate txbuff(s) for %d lport(s)\n", jcfg_num_lports(fwd->jinfo));
if (jcfg_lport_foreach(fwd->jinfo, _create_txbuff, thd->priv_)) {
destroy_per_thread_txbuff(thd);
}
foreach_thd_lport (thd, lport)
((struct fwd_port *)lport->priv_)->thd = thd;
return 0;
}
int
producer(void *arg)
{
int n_pkts, n_evts;
dlb_event_t events[MAX_BURST];
uint8_t sched_type;
int i, ret;
pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &thd->
group->
lcore_bitmap);
if (num_workers == 0)
sched_type = SCHED_DIRECTED;
else
sched_type = SCHED_ORDERED;
cne_printf(
" [blue]Producer Thread ID [red]%d [blue]on lcore [green]%d[]\n", thd->
tid,
cne_printf(
" [blue]Event Port ID [red]%d [blue]Enq Queue Id lcore [green]%d[]\n",
prod_args.port_id, prod_args.queue_id);
for (;;) {
foreach_thd_lport (thd, lport) {
goto leave;
struct fwd_port *pd = lport->priv_;
if (!pd)
continue;
if (n_pkts == PKTDEV_ADMIN_STATE_DOWN)
goto leave;
if (n_pkts == 0)
continue;
for (i = 0; i < n_pkts; i++) {
events[i].send.queue_id = prod_args.queue_id;
events[i].send.sched_type = sched_type;
events[i].send.priority = 0;
events[i].adv_send.udata64 = (uint64_t)pd->rx_mbufs[i];
}
ret = 0;
n_evts = 0;
for (i = 0; n_evts != n_pkts && i < RETRY_LIMIT; i++) {
ret = dlb_send(prod_args.port, n_pkts - n_evts, &events[n_evts]);
if (ret == -1)
break;
n_evts += ret;
}
if (n_evts != n_pkts)
CNE_ERR_RET(
"[%s()] Enqueued %d/%d packets!\n", __func__, n_evts, i);
prod_args.curr_evt_stats.enq += n_evts;
}
}
leave:
return 0;
}
int
consumer(void *arg)
{
dlb_event_t events[MAX_BURST];
int ret;
pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &thd->
group->
lcore_bitmap);
cne_printf(
" [blue]Consumer Thread ID [red]%d [blue]on lcore [green]%d[]\n", thd->
tid,
cne_printf(
" [blue]Event Port ID [red]%d [blue]Enq Queue Id lcore [green]%d[]\n",
cons_args.port_id, cons_args.queue_id);
if (create_per_thread_txbuff(thd))
cne_exit(
"Failed to create txbuff(s) for \"%s\" thread\n", thd->name);
for (;;) {
foreach_thd_lport (thd, lport) {
int i, num_deq;
goto leave;
struct fwd_port *pd = lport->priv_;
if (!pd)
continue;
for (i = 0, num_deq = 0; num_deq == 0 && i < RETRY_LIMIT; i++) {
ret = dlb_recv(cons_args.port, MAX_BURST - num_deq, POLL, &events[num_deq]);
if (ret == -1)
break;
num_deq += ret;
}
if (num_deq == 0)
continue;
cons_args.curr_evt_stats.deq += num_deq;
for (i = 0; i < num_deq; i++) {
}
}
}
leave:
return 0;
}
int
worker(void *arg)
{
dlb_thread_args_t *args = (dlb_thread_args_t *)arg;
dlb_event_t events[MAX_BURST];
int num_enq, num_deq;
int i, ret;
cne_printf(
" [blue]Worker Thread ID [red]%d [blue]on lcore [green]%d[]\n", gettid(),
cne_printf(
" [blue]Event Port ID [red]%d [blue]Enq Queue Id lcore [green]%d[]\n",
args->port_id, args->queue_id);
for (;;) {
for (i = 0, num_deq = 0; num_deq == 0 && i < RETRY_LIMIT; i++) {
ret = dlb_recv(args->port, MAX_BURST, POLL, events);
if (ret == -1)
break;
num_deq += ret;
}
if (num_deq == -1 && errno == EACCES)
break;
if (num_deq == 0)
continue;
args->curr_evt_stats.deq += num_deq;
for (i = 0; i < num_deq; i++) {
events[i].send.queue_id = args->queue_id;
events[i].send.sched_type = SCHED_DIRECTED;
}
ret = 0;
for (i = 0, num_enq = 0; num_enq < num_deq && i < RETRY_LIMIT; i++) {
ret = dlb_forward(args->port, num_deq - num_enq, &events[num_enq]);
if (ret == -1)
break;
num_enq += ret;
}
args->curr_evt_stats.enq += num_enq;
if (num_enq != num_deq)
CNE_ERR_RET(
"[%s()] Forwarded %d/%d packets!\n", __func__, num_enq, num_deq);
}
return 0;
}
static int
{
CNE_DEBUG("No lports attached to thread '%s'\n", thd->name);
return 0;
} else
CNE_DEBUG(
"Close %d lport%s for thread '%s'\n", thd->
lport_cnt,
(thd->
lport_cnt == 1) ?
"" :
"s", thd->name);
foreach_thd_lport (thd, lport) {
cne_printf(
">>> [blue]lport [red]%d[] - '[cyan]%s[]'\n", lport->
lpid, lport->name);
}
return 0;
}
static void
__on_exit(int val, void *arg, int exit_type)
{
switch (exit_type) {
if (val == SIGUSR1)
return;
cne_printf_pos(99, 1,
"\n>>> [cyan]Terminating with signal [green]%d[]\n", val);
fwd->timer_quit = 1;
break;
if (val)
cne_printf_pos(99, 1,
"\n>>> [cyan]Terminating with status [green]%d[]\n", val);
if (fwd) {
jcfg_thread_foreach(fwd->jinfo, _thread_quit, fwd);
fwd->timer_quit = 1;
}
break;
cne_printf_pos(99, 1,
"\n>>> [cyan]Terminating with signal [green]%d[]\n", val);
fwd->timer_quit = 1;
break;
default:
break;
}
fflush(stdout);
}
int
{
int signals[] = {SIGINT, SIGUSR1};
memset(&fwd_info, 0, sizeof(struct fwd_info));
goto err;
cne_on_exit(__on_exit, fwd, signals, cne_countof(signals));
if (parse_args(argc, argv))
goto err;
goto err;
cne_printf(
"\nMax threads: %d, Max lcores: %d, NUMA nodes: %d, Num Workers: %d\n",
fwd->timer_quit = 0;
for (;;) {
sleep(1);
if (fwd->timer_quit)
break;
print_port_stats_all();
print_dlb_stats();
}
dlb_remove();
cne_printf(
">>> [cyan]Main Application Exiting[]: [green]Bye![]\n");
return 0;
err:
cne_printf(
"\n*** [cyan]DLB Test Application[], [blue]PID[]: [green]%d[] failed\n", getpid());
return 0;
}
CNDP_API int cne_max_threads(void)
CNDP_API int cne_init(void)
CNDP_API int cne_on_exit(on_exit_fn_t exit_fn, void *arg, int *signals, int nb_signals)
#define cne_exit(format, args...)
CNDP_API int cne_printf_pos(int16_t r, int16_t c, const char *fmt,...)
CNDP_API int cne_printf(const char *fmt,...)
CNDP_API int cne_lcore_id(void)
CNDP_API unsigned int cne_max_lcores(void)
CNDP_API int cne_max_numa_nodes(void)
CNDP_API int thread_set_affinity(int cpu)
static uint16_t pktdev_rx_burst(uint16_t lport_id, pktmbuf_t **rx_pkts, const uint16_t nb_pkts)
CNDP_API int pktdev_close(uint16_t lport_id)
#define pktmbuf_mtod(m, t)
CNDP_API txbuff_t * txbuff_pktdev_create(uint16_t size, txbuff_error_fn cbfn, void *cb_arg, uint16_t lport_id)
CNDP_API uint16_t txbuff_add(txbuff_t *buffer, pktmbuf_t *tx_pkt)
CNDP_API void txbuff_free(txbuff_t *buffer)