O-RAN E Maintenance Release contribution for ODULOW
[o-du/phy.git] / fhi_lib / lib / ethernet / ethdi.c
1 /******************************************************************************
2 *
3 *   Copyright (c) 2020 Intel.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 *
17 *******************************************************************************/
18
19 /**
20  * @brief This file has all definitions for the Ethernet Data Interface Layer
21  * @file ethdi.c
22  * @ingroup group_lte_source_auxlib
23  * @author Intel Corporation
24  **/
25
26 #define _GNU_SOURCE
27 #include <stdio.h>
28 #include <string.h>
29 #include <stdint.h>
30 #include <errno.h>
31 #include <sys/queue.h>
32 #include <err.h>
33 #include <assert.h>
34 #include <linux/limits.h>
35 #include <sys/types.h>
36 #include <stdlib.h>
37 #include <sys/time.h>
38 #include <time.h>
39 #include <unistd.h>
40 #include <immintrin.h>
41 #include <rte_config.h>
42 #include <rte_common.h>
43 #include <rte_log.h>
44 #include <rte_memory.h>
45 #include <rte_malloc.h>
46 #include <rte_memzone.h>
47 #include <rte_eal.h>
48 #include <rte_per_lcore.h>
49 #include <rte_launch.h>
50 #include <rte_atomic.h>
51 #include <rte_cycles.h>
52 #include <rte_prefetch.h>
53 #include <rte_lcore.h>
54 #include <rte_per_lcore.h>
55 #include <rte_branch_prediction.h>
56 #include <rte_interrupts.h>
57 #include <rte_pci.h>
58 #include <rte_debug.h>
59 #include <rte_ethdev.h>
60 #include <rte_ring.h>
61 #include <rte_mbuf.h>
62 #include <rte_timer.h>
63
64 #include "ethernet.h"
65 #include "ethdi.h"
66 #include "xran_fh_o_du.h"
67 #include "xran_mlog_lnx.h"
68 #include "xran_printf.h"
69 #include "xran_common.h"
70
71 #include "xran_lib_mlog_tasks_id.h"
72
73 #define BURST_RX_IO_SIZE 48
74
75 //#define ORAN_OWD_DEBUG_TX_LOOP
76
77 struct xran_ethdi_ctx g_ethdi_ctx = { 0 };
78 enum xran_if_state xran_if_current_state = XRAN_STOPPED;
79
80 struct rte_mbuf *xran_ethdi_mbuf_alloc(void)
81 {
82     return rte_pktmbuf_alloc(_eth_mbuf_pool);
83 }
84
85 struct rte_mbuf *xran_ethdi_mbuf_indir_alloc(void)
86 {
87     return rte_pktmbuf_alloc(socket_indirect_pool);
88 }
89
90 int32_t xran_ethdi_mbuf_send(struct rte_mbuf *mb, uint16_t ethertype, uint16_t vf_id)
91 {
92     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
93     int res = 0;
94
95     mb->port = ctx->io_cfg.port[vf_id];
96     xran_add_eth_hdr_vlan(&ctx->entities[vf_id][ID_O_RU], ethertype, mb);
97
98     res = xran_enqueue_mbuf(mb, ctx->tx_ring[vf_id]);
99     return res;
100 }
101
102 int32_t xran_ethdi_mbuf_send_cp(struct rte_mbuf *mb, uint16_t ethertype, uint16_t vf_id)
103 {
104     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
105     int res = 0;
106
107     mb->port = ctx->io_cfg.port[vf_id];
108     xran_add_eth_hdr_vlan(&ctx->entities[vf_id][ID_O_RU], ethertype, mb);
109
110     res = xran_enqueue_mbuf(mb, ctx->tx_ring[vf_id]);
111     return res;
112 }
113
114 struct {
115     uint16_t ethertype;
116     ethertype_handler fn;
117 } xran_ethertype_handlers[] = {
118     { ETHER_TYPE_ECPRI, NULL },
119 };
120
121 int32_t xran_register_ethertype_handler(uint16_t ethertype, ethertype_handler callback)
122 {
123     int i;
124
125     for (i = 0; i < RTE_DIM(xran_ethertype_handlers); ++i)
126         if (xran_ethertype_handlers[i].ethertype == ethertype) {
127             xran_ethertype_handlers[i].fn = callback;
128
129             return 1;
130         }
131
132     print_err("support for ethertype %u not found", ethertype);
133
134     return 0;
135 }
136
137 int xran_handle_ether(uint16_t ethertype, struct rte_mbuf* pkt_q[], uint16_t xport_id, struct xran_eaxc_info *p_cid,  uint16_t num)
138 {
139     int i;
140
141     for (i = 0; i < RTE_DIM(xran_ethertype_handlers); ++i)
142         if (xran_ethertype_handlers[i].ethertype == ethertype)
143             if (xran_ethertype_handlers[i].fn){
144 //                rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
145                 return xran_ethertype_handlers[i].fn(pkt_q, xport_id, p_cid, num);
146             }
147
148     print_err("Packet with unrecognized ethertype '%.4X' dropped", ethertype);
149
150     return MBUF_FREE;
151 };
152
153
154 /* Process vlan tag. Cut the ethernet header. Call the etherype handlers. */
155 int xran_ethdi_filter_packet(struct rte_mbuf *pkt_q[], uint16_t vf_id, uint16_t q_id, uint16_t num)
156 {
157     int ret;
158     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
159     struct rte_ether_hdr* eth_hdr;
160     uint16_t port_id = ctx->vf2xran_port[vf_id];
161     struct xran_eaxc_info *p_cid = &ctx->vf_and_q2cid[vf_id][q_id];
162
163     ret = xran_handle_ether(ETHER_TYPE_ECPRI, pkt_q, port_id, p_cid, num);
164
165     return MBUF_FREE;
166 }
167
168 /* Check the link status of all ports in up to 9s, and print them finally */
169 static void check_port_link_status(uint8_t portid)
170 {
171 #define CHECK_INTERVAL 100 /* 100ms */
172 #define MAX_CHECK_TIME 90 /* 9s (90 * 100ms) in total */
173     uint8_t count, all_ports_up, print_flag = 0;
174     struct rte_eth_link link;
175
176     printf("\nChecking link status portid [%d]  ",  portid);
177     fflush(stdout);
178     for (count = 0; count <= MAX_CHECK_TIME; count++) {
179         all_ports_up = 1;
180         memset(&link, 0, sizeof(link));
181         rte_eth_link_get_nowait(portid, &link);
182
183         /* print link status if flag set */
184         if (print_flag == 1) {
185             if (link.link_status)
186                 printf("Port %d Link Up - speed %u "
187                         "Mbps - %s\n", (uint8_t)portid,
188                         (unsigned)link.link_speed,
189                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
190                         ("full-duplex") : ("half-duplex\n"));
191             else
192                 printf("Port %d Link Down\n",
193                         (uint8_t)portid);
194         }
195         /* clear all_ports_up flag if any link down */
196         if (link.link_status == ETH_LINK_DOWN) {
197             all_ports_up = 0;
198             break;
199         }
200         /* after finally printing all link status, get out */
201         if (print_flag == 1)
202             break;
203
204         if (all_ports_up == 0) {
205             printf(".");
206             fflush(stdout);
207             rte_delay_ms(CHECK_INTERVAL);
208         }
209
210         /* set the print_flag if all ports up or timeout */
211         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
212             print_flag = 1;
213             printf(" ... done\n");
214         }
215     }
216 }
217
218 /**
219  * create a flow rule that sends packets with matching pc_id
220  * to selected queue.
221  *
222  * @param port_id
223  *   The selected port.
224  * @param rx_q
225  *   The selected target queue.
226  * @param pc_id_be
227  *   The value to apply to the pc_id.
228  * @param[out] error
229  *   Perform verbose error reporting if not NULL.
230  *
231  * @return
232  *   A flow if the rule could be created else return NULL.
233  */
234 struct rte_flow *
235 generate_ecpri_flow(uint16_t port_id, uint16_t rx_q, uint16_t pc_id_be, struct rte_flow_error *error)
236 {
237     struct rte_flow *flow = NULL;
238 #if (RTE_VER_YEAR >= 21)
239 #define MAX_PATTERN_NUM         3
240 #define MAX_ACTION_NUM          2
241     struct rte_flow_attr attr;
242     struct rte_flow_item pattern[MAX_PATTERN_NUM];
243     struct rte_flow_action action[MAX_ACTION_NUM];
244
245     struct rte_flow_action_queue queue = { .index = rx_q };
246     struct rte_flow_item_ecpri ecpri_spec;
247     struct rte_flow_item_ecpri ecpri_mask;
248
249     int res;
250     print_dbg("%s\n", __FUNCTION__);
251     memset(pattern, 0, sizeof(pattern));
252     memset(action, 0, sizeof(action));
253
254     /*
255      * set the rule attribute.
256      * in this case only ingress packets will be checked.
257      */
258     memset(&attr, 0, sizeof(struct rte_flow_attr));
259     attr.ingress = 1;
260
261     /*
262      * create the action sequence.
263      * one action only,  move packet to queue
264      */
265     action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE;
266     action[0].conf = &queue;
267     action[1].type = RTE_FLOW_ACTION_TYPE_END;
268
269     /*
270      * set the first level of the pattern (ETH).
271      * since in this example we just want to get the
272      * eCPRI we set this level to allow all.
273      */
274     pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH;
275
276     memset(&ecpri_spec, 0, sizeof(struct rte_flow_item_ecpri));
277     memset(&ecpri_mask, 0, sizeof(struct rte_flow_item_ecpri));
278
279     ecpri_spec.hdr.common.type = RTE_ECPRI_MSG_TYPE_IQ_DATA;
280     ecpri_spec.hdr.type0.pc_id = pc_id_be;
281
282     ecpri_mask.hdr.common.type  = 0xff;
283     ecpri_mask.hdr.type0.pc_id  = 0xffff;
284
285     ecpri_spec.hdr.common.u32   = rte_cpu_to_be_32(ecpri_spec.hdr.common.u32);
286
287     pattern[1].type = RTE_FLOW_ITEM_TYPE_ECPRI;
288     pattern[1].spec = &ecpri_spec;
289     pattern[1].mask = &ecpri_mask;
290
291     struct rte_flow_item_ecpri *pecpri_spec = (struct rte_flow_item_ecpri *)pattern[1].spec;
292     struct rte_flow_item_ecpri *pecpri_mask = (struct rte_flow_item_ecpri *)pattern[1].mask;
293     print_dbg("RTE_FLOW_ITEM_TYPE_ECPRI\n");
294     print_dbg("spec type %x pc_id %x\n", pecpri_spec->hdr.common.type, pecpri_spec->hdr.type0.pc_id);
295     print_dbg("mask type %x pc_id %x\n", pecpri_mask->hdr.common.type, pecpri_mask->hdr.type0.pc_id);
296
297     /* the final level must be always type end */
298     pattern[2].type = RTE_FLOW_ITEM_TYPE_END;
299
300     res = rte_flow_validate(port_id, &attr, pattern, action, error);
301     if (!res)
302         flow = rte_flow_create(port_id, &attr, pattern, action, error);
303     else {
304         rte_panic("Flow can't be created %d message: %s\n",
305                     error->type,
306                     error->message ? error->message : "(no stated reason)");
307     }
308 #endif
309     return flow;
310 }
311
312
313 int32_t
314 xran_ethdi_init_dpdk_io(char *name, const struct xran_io_cfg *io_cfg,
315     int *lcore_id, struct rte_ether_addr *p_o_du_addr,
316     struct rte_ether_addr *p_ru_addr,  uint32_t mtu)
317 {
318     uint16_t port[XRAN_VF_MAX];
319     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
320     int i,ivf;
321     char core_mask[64];
322     uint64_t c_mask         = 0L;
323     uint64_t c_mask_64_127  = 0L;
324     uint64_t nWorkerCore = 1;
325     uint32_t coreNum = sysconf(_SC_NPROCESSORS_CONF);
326     char bbdev_wdev[32]   = "";
327     char bbdev_vdev[32]   = "";
328     char iova_mode[32]    = "--iova-mode=pa";
329     char socket_mem[32]   = "--socket-mem=8192";
330     char socket_limit[32] = "--socket-limit=8192";
331     char ring_name[32]    = "";
332     int32_t xran_port = -1;
333     portid_t port_id;
334     queueid_t qi = 0;
335     uint16_t count;
336
337     char *argv[] = { name, core_mask, "-n2", iova_mode, socket_mem, socket_limit, "--proc-type=auto",
338         "--file-prefix", name, "-a0000:00:00.0", bbdev_wdev, bbdev_vdev};
339
340
341     if (io_cfg == NULL)
342         return 0;
343     if(io_cfg->bbdev_mode != XRAN_BBDEV_NOT_USED){
344         printf("BBDEV_FEC_ACCL_NR5G\n");
345         if (io_cfg->bbdev_mode == XRAN_BBDEV_MODE_HW_ON){
346             // hw-accelerated bbdev
347             printf("hw-accelerated bbdev %s\n", io_cfg->bbdev_dev[0]);
348
349             snprintf(bbdev_wdev, RTE_DIM(bbdev_wdev), "-a%s", io_cfg->bbdev_dev[0]);
350
351         } else if (io_cfg->bbdev_mode == XRAN_BBDEV_MODE_HW_OFF){
352
353             snprintf(bbdev_wdev, RTE_DIM(bbdev_wdev), "%s", "--vdev=baseband_turbo_sw");
354         } else if (io_cfg->bbdev_mode == XRAN_BBDEV_MODE_HW_SW){
355             printf("software and hw-accelerated bbdev %s\n", io_cfg->bbdev_dev[0]);
356
357             snprintf(bbdev_wdev, RTE_DIM(bbdev_wdev), "-a%s", io_cfg->bbdev_dev[0]);
358
359             snprintf(bbdev_vdev, RTE_DIM(bbdev_vdev), "%s", "--vdev=baseband_turbo_sw");
360         } else {
361             rte_panic("Cannot init DPDK incorrect [bbdev_mode %d]\n", io_cfg->bbdev_mode);
362         }
363     }
364
365     if (io_cfg->dpdkIoVaMode == 1){
366         snprintf(iova_mode, RTE_DIM(iova_mode), "%s", "--iova-mode=va");
367     }
368
369     if (io_cfg->dpdkMemorySize){
370         snprintf(socket_mem, RTE_DIM(socket_mem), "--socket-mem=%d", io_cfg->dpdkMemorySize);
371         snprintf(socket_limit, RTE_DIM(socket_limit), "--socket-limit=%d", io_cfg->dpdkMemorySize);
372     }
373
374     if (io_cfg->core < 64)
375         c_mask |= (long)(1L << io_cfg->core);
376     else
377         c_mask_64_127 |= (long)(1L << (io_cfg->core - 64));
378
379     if (io_cfg->system_core < 64)
380         c_mask |= (long)(1L << io_cfg->system_core);
381     else
382         c_mask_64_127 |= (long)(1L << (io_cfg->system_core - 64));
383
384     if (io_cfg->timing_core < 64)
385        c_mask |=  (long)(1L << io_cfg->timing_core);
386     else
387        c_mask_64_127 |= (long)(1L << (io_cfg->timing_core - 64));
388
389     nWorkerCore = 1L;
390     for (i = 0; i < coreNum && i < 64; i++) {
391         if (nWorkerCore & (uint64_t)io_cfg->pkt_proc_core) {
392             c_mask |= nWorkerCore;
393         }
394         nWorkerCore = nWorkerCore << 1;
395     }
396
397     nWorkerCore = 1L;
398     for (i = 64; i < coreNum && i < 128; i++) {
399         if (nWorkerCore & (uint64_t)io_cfg->pkt_proc_core_64_127) {
400             c_mask_64_127 |= nWorkerCore;
401         }
402         nWorkerCore = nWorkerCore << 1;
403     }
404
405     printf("total cores %d c_mask 0x%lx%016lx core %d [id] system_core %d [id] pkt_proc_core 0x%lx%016lx [mask] pkt_aux_core %d [id] timing_core %d [id]\n",
406         coreNum, c_mask_64_127, c_mask, io_cfg->core, io_cfg->system_core, io_cfg->pkt_proc_core_64_127, io_cfg->pkt_proc_core, io_cfg->pkt_aux_core, io_cfg->timing_core);
407
408     snprintf(core_mask, sizeof(core_mask), "-c 0x%lx%016lx",c_mask_64_127,c_mask);
409
410     ctx->io_cfg = *io_cfg;
411
412     for (ivf = 0; ivf < XRAN_VF_MAX; ivf++){
413         for (i = 0; i < ID_MAX; i++)     /* Initialize all as broadcast */
414             memset(&ctx->entities[ivf][i], 0xFF, sizeof(ctx->entities[0][0]));
415     }
416
417     printf("%s: Calling rte_eal_init:", __FUNCTION__);
418     for (i = 0; i < RTE_DIM(argv); i++)
419     {
420         printf("%s ", argv[i]);
421     }
422     printf("\n");
423
424     /* This will return on system_core, which is not necessarily the
425      * one we're on right now. */
426     if (rte_eal_init(RTE_DIM(argv), argv) < 0)
427         rte_panic("Cannot init EAL: %s\n", rte_strerror(rte_errno));
428
429     if (rte_eal_process_type() == RTE_PROC_SECONDARY)
430         rte_exit(EXIT_FAILURE,
431                 "Secondary process type not supported.\n");
432
433     xran_init_mbuf_pool(mtu);
434
435 #ifdef RTE_LIBRTE_PDUMP
436     /* initialize packet capture framework */
437     rte_pdump_init();
438 #endif
439
440     /* Timers. */
441     rte_timer_subsystem_init();
442
443     *lcore_id = rte_get_next_lcore(rte_lcore_id(), 0, 0);
444
445     PANIC_ON(*lcore_id == RTE_MAX_LCORE, "out of lcores for io_loop()");
446
447     for (i = 0; i < XRAN_VF_MAX; i++)
448         port[i] = 0xffff;
449
450     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
451         for (i = 0; i < XRAN_VF_MAX && i < io_cfg->num_vfs; i++){
452             if(io_cfg->dpdk_dev[i]){
453                 struct rte_dev_iterator iterator;
454                 uint16_t port_id;
455
456                 if (rte_dev_probe(io_cfg->dpdk_dev[i]) != 0 ||
457                     rte_eth_dev_count_avail() == 0) {
458                         errx(1, "Network port doesn't exist\n");
459                 }
460
461                 RTE_ETH_FOREACH_MATCHING_DEV(port_id, io_cfg->dpdk_dev[i], &iterator){
462                         port[i] = port_id;
463                         xran_init_port(port[i], io_cfg->num_rxq, mtu);
464             }
465
466                 if(!(i & 1) || io_cfg->one_vf_cu_plane){
467                 snprintf(ring_name, RTE_DIM(ring_name), "%s_%d", "tx_ring_up", i);
468                     ctx->tx_ring[i] = rte_ring_create(ring_name, NUM_MBUFS_RING_TRX,
469                     rte_lcore_to_socket_id(*lcore_id), RING_F_SC_DEQ);
470                     PANIC_ON(ctx->tx_ring[i] == NULL, "failed to allocate tx ring");
471                     for(qi = 0; qi < io_cfg->num_rxq; qi++) {
472                         snprintf(ring_name, RTE_DIM(ring_name), "%s_%d_%d", "rx_ring_up", i, qi);
473                         ctx->rx_ring[i][qi] = rte_ring_create(ring_name, NUM_MBUFS_RING_TRX,
474                             rte_lcore_to_socket_id(*lcore_id), RING_F_SP_ENQ);
475                         PANIC_ON(ctx->rx_ring[i][qi] == NULL, "failed to allocate rx ring");
476                     }
477             }else {
478                 snprintf(ring_name, RTE_DIM(ring_name), "%s_%d", "tx_ring_cp", i);
479                     ctx->tx_ring[i] = rte_ring_create(ring_name, NUM_MBUFS_RING_TRX,
480                     rte_lcore_to_socket_id(*lcore_id), RING_F_SC_DEQ);
481                     PANIC_ON(ctx->tx_ring[i] == NULL, "failed to allocate rx ring");
482                     for(qi = 0; qi < io_cfg->num_rxq; qi++) {
483                         snprintf(ring_name, RTE_DIM(ring_name), "%s_%d_%d", "rx_ring_cp", i, qi);
484                         ctx->rx_ring[i][qi] = rte_ring_create(ring_name, NUM_MBUFS_RING_TRX,
485                             rte_lcore_to_socket_id(*lcore_id), RING_F_SP_ENQ);
486                         PANIC_ON(ctx->rx_ring[i][qi] == NULL, "failed to allocate rx ring");
487                     }
488                 }
489             } else {
490                 printf("no DPDK port provided\n");
491                 xran_init_port_mempool(i, mtu);
492             }
493
494             if(io_cfg->dpdk_dev[i]){
495                 check_port_link_status(port[i]);
496             }
497         }
498     } else {
499         rte_panic("ethdi_dpdk_io_loop() failed to start  with RTE_PROC_SECONDARY\n");
500     }
501
502     for (i = 0; i < XRAN_VF_MAX && i < io_cfg->num_vfs; i++){
503         ctx->io_cfg.port[i] = port[i];
504         print_dbg("port_id 0x%04x\n", ctx->io_cfg.port[i]);
505     }
506
507     for (i = 0; i < XRAN_VF_MAX; i++){
508         ctx->vf2xran_port[i] = 0xFFFF;
509         ctx->rxq_per_port[i] = 1;
510         for (qi = 0; qi < XRAN_VF_QUEUE_MAX; qi++){
511             ctx->vf_and_q2pc_id[i][qi] = 0xFFFF;
512
513             ctx->vf_and_q2cid[i][qi].cuPortId     = 0xFF;
514             ctx->vf_and_q2cid[i][qi].bandSectorId = 0xFF;
515             ctx->vf_and_q2cid[i][qi].ccId         = 0xFF;
516             ctx->vf_and_q2cid[i][qi].ruPortId     = 0xFF;
517         }
518     }
519
520     for (i = 0; i < XRAN_VF_MAX && i < io_cfg->num_vfs; i++){
521         if(io_cfg->dpdk_dev[i]){
522             struct rte_ether_addr *p_addr;
523
524             if(i % (io_cfg->nEthLinePerPort * (2 - 1*ctx->io_cfg.one_vf_cu_plane)) == 0) /* C-p and U-p VFs per line */
525                 xran_port +=1;
526
527             rte_eth_macaddr_get(port[i], &ctx->entities[i][io_cfg->id]);
528
529             p_addr = &ctx->entities[i][io_cfg->id];
530             printf("[%2d] vf %2u local  SRC MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
531                    " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
532                    (unsigned)xran_port,
533                    (unsigned)i,
534                    p_addr->addr_bytes[0], p_addr->addr_bytes[1], p_addr->addr_bytes[2],
535                    p_addr->addr_bytes[3], p_addr->addr_bytes[4], p_addr->addr_bytes[5]);
536
537             p_addr = &p_ru_addr[i];
538             printf("[%2d] vf %2u remote DST MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
539                    " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
540                    (unsigned)xran_port,
541                    (unsigned)i,
542                    p_addr->addr_bytes[0], p_addr->addr_bytes[1], p_addr->addr_bytes[2],
543                    p_addr->addr_bytes[3], p_addr->addr_bytes[4], p_addr->addr_bytes[5]);
544
545             rte_ether_addr_copy(&p_ru_addr[i],  &ctx->entities[i][ID_O_RU]);
546             ctx->vf2xran_port[i] = xran_port;
547             ctx->rxq_per_port[i] = io_cfg->num_rxq;
548         }
549     }
550
551     for(i = 0; i < xran_port + 1 && i < XRAN_PORTS_NUM; i++) {
552         snprintf(ring_name, RTE_DIM(ring_name), "%s_%d", "dl_gen_ring_up", i);
553         ctx->up_dl_pkt_gen_ring[i] = rte_ring_create(ring_name, NUM_MBUFS_RING,
554         rte_lcore_to_socket_id(*lcore_id), /*RING_F_SC_DEQ*/0);
555         PANIC_ON(ctx->up_dl_pkt_gen_ring[i] == NULL, "failed to allocate dl gen ring");
556         printf("created %s\n", ring_name);
557     }
558
559     return 1;
560 }
561
562 static inline uint16_t xran_tx_from_ring(int port, struct rte_ring *r)
563 {
564     struct rte_mbuf *mbufs[BURST_SIZE];
565     uint16_t dequeued, sent = 0;
566     uint32_t remaining;
567     int i;
568     long t1 = MLogTick();
569
570     dequeued = rte_ring_dequeue_burst(r, (void **)mbufs, BURST_SIZE,
571             &remaining);
572     if (!dequeued)
573         return 0;   /* Nothing to send. */
574
575     while (1) {     /* When tx queue is full it is trying again till succeed */
576         sent += rte_eth_tx_burst(port, 0, &mbufs[sent], dequeued - sent);
577         if (sent == dequeued){
578         MLogTask(PID_RADIO_ETH_TX_BURST, t1, MLogTick());
579             return remaining;
580     }
581 }
582 }
583
584 int32_t process_dpdk_io(void* args)
585 {
586     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
587     struct xran_io_cfg * cfg = &(xran_ethdi_get_ctx()->io_cfg);
588     int32_t* port = &cfg->port[0];
589     int port_id = 0;
590     int qi      = 0;
591
592     rte_timer_manage();
593
594     for (port_id = 0; port_id < XRAN_VF_MAX && port_id < ctx->io_cfg.num_vfs; port_id++){
595         struct rte_mbuf *mbufs[BURST_RX_IO_SIZE];
596         if(port[port_id] == 0xFF)
597             return 0;
598
599         /* RX */
600         for(qi = 0; qi < ctx->rxq_per_port[port_id]; qi++) {
601             const uint16_t rxed = rte_eth_rx_burst(port[port_id], qi, mbufs, BURST_RX_IO_SIZE);
602         if (rxed != 0){
603             unsigned enq_n = 0;
604             long t1 = MLogTick();
605                 ctx->rx_vf_queue_cnt[port[port_id]][qi] += rxed;
606                 enq_n =  rte_ring_enqueue_burst(ctx->rx_ring[port_id][qi], (void*)mbufs, rxed, NULL);
607             if(rxed - enq_n)
608                 rte_panic("error enq\n");
609             MLogTask(PID_RADIO_RX_VALIDATE, t1, MLogTick());
610         }
611         }
612
613         /* TX */
614
615         const uint16_t sent = xran_tx_from_ring(port[port_id], ctx->tx_ring[port_id]);
616         /* One way Delay Measurements */
617         if ((cfg->eowd_cmn[cfg->id].owdm_enable != 0) && (cfg->eowd_cmn[cfg->id].measVf == port_id))
618         {
619           if (!xran_ecpri_port_update_required(cfg, (uint16_t)port_id))
620             {
621 #ifdef ORAN_OWD_DEBUG_TX_LOOP
622                 printf("going to owd tx for port %d\n", port_id);
623 #endif
624                 if (xran_ecpri_one_way_delay_measurement_transmitter((uint16_t) port_id, (void*)xran_dev_get_ctx()) != OK)
625                 {
626                     errx(1,"Exit pdio port_id %d", port_id);
627                 }
628             }
629         }
630
631         if (XRAN_STOPPED == xran_if_current_state)
632             return -1;
633     }
634
635     if (XRAN_STOPPED == xran_if_current_state)
636             return -1;
637
638     return 0;
639 }
640
641 int32_t process_dpdk_io_tx(void* args)
642 {
643     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
644     struct xran_io_cfg * cfg = &(xran_ethdi_get_ctx()->io_cfg);
645     int32_t* port = &cfg->port[0];
646     int port_id = 0;
647
648     //rte_timer_manage();
649
650     for (port_id = 0; port_id < XRAN_VF_MAX && port_id < ctx->io_cfg.num_vfs; port_id++){
651         struct rte_mbuf *mbufs[BURST_RX_IO_SIZE];
652         if(port[port_id] == 0xFF)
653             return 0;
654         /* TX */
655         const uint16_t sent = xran_tx_from_ring(port[port_id], ctx->tx_ring[port_id]);
656
657         if (XRAN_STOPPED == xran_if_current_state)
658             return -1;
659     }
660
661     if (XRAN_STOPPED == xran_if_current_state)
662             return -1;
663
664     return 0;
665 }
666
667 int32_t process_dpdk_io_rx(void* args)
668 {
669     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
670     struct xran_io_cfg * cfg = &(xran_ethdi_get_ctx()->io_cfg);
671     int32_t* port = &cfg->port[0];
672     int port_id = 0;
673     int qi     = 0;
674
675     rte_timer_manage();
676
677     for (port_id = 0; port_id < XRAN_VF_MAX && port_id < ctx->io_cfg.num_vfs; port_id++){
678         struct rte_mbuf *mbufs[BURST_RX_IO_SIZE];
679         if(port[port_id] == 0xFF)
680             return 0;
681
682         /* RX */
683         for(qi = 0; qi < ctx->rxq_per_port[port_id]; qi++){
684             const uint16_t rxed = rte_eth_rx_burst(port[port_id], qi, mbufs, BURST_RX_IO_SIZE);
685             if (rxed != 0){
686                 unsigned enq_n = 0;
687                 long t1 = MLogTick();
688                 ctx->rx_vf_queue_cnt[port[port_id]][qi] += rxed;
689                 enq_n =  rte_ring_enqueue_burst(ctx->rx_ring[port_id][qi], (void*)mbufs, rxed, NULL);
690                 if(rxed - enq_n)
691                     rte_panic("error enq\n");
692                 MLogTask(PID_RADIO_RX_VALIDATE, t1, MLogTick());
693             }
694         }
695         if (XRAN_STOPPED == xran_if_current_state)
696             return -1;
697     }
698
699     if (XRAN_STOPPED == xran_if_current_state)
700             return -1;
701
702     return 0;
703 }
704