* INTC Contribution to the O-RAN F Release for O-DU Low
[o-du/phy.git] / fhi_lib / lib / ethernet / ethdi.c
1 /******************************************************************************
2 *
3 *   Copyright (c) 2020 Intel.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 *
17 *******************************************************************************/
18
19 /**
20  * @brief This file has all definitions for the Ethernet Data Interface Layer
21  * @file ethdi.c
22  * @ingroup group_lte_source_auxlib
23  * @author Intel Corporation
24  **/
25
26 #define _GNU_SOURCE
27 #include <stdio.h>
28 #include <string.h>
29 #include <stdint.h>
30 #include <errno.h>
31 #include <sys/queue.h>
32 #include <err.h>
33 #include <assert.h>
34 #include <linux/limits.h>
35 #include <sys/types.h>
36 #include <stdlib.h>
37 #include <sys/time.h>
38 #include <time.h>
39 #include <unistd.h>
40 #include <immintrin.h>
41 #include <numa.h>
42 #include <rte_config.h>
43 #include <rte_common.h>
44 #include <rte_log.h>
45 #include <rte_memory.h>
46 #include <rte_malloc.h>
47 #include <rte_memzone.h>
48 #include <rte_eal.h>
49 #include <rte_per_lcore.h>
50 #include <rte_launch.h>
51 #include <rte_atomic.h>
52 #include <rte_cycles.h>
53 #include <rte_prefetch.h>
54 #include <rte_lcore.h>
55 #include <rte_per_lcore.h>
56 #include <rte_branch_prediction.h>
57 #include <rte_interrupts.h>
58 #include <rte_pci.h>
59 #include <rte_debug.h>
60 #include <rte_ethdev.h>
61 #include <rte_ring.h>
62 #include <rte_mbuf.h>
63 #include <rte_timer.h>
64
65 #include "ethernet.h"
66 #include "ethdi.h"
67 #include "xran_fh_o_du.h"
68 #include "xran_mlog_lnx.h"
69 #include "xran_printf.h"
70 #include "xran_common.h"
71
72 #include "xran_lib_mlog_tasks_id.h"
73
74 #define BURST_RX_IO_SIZE 48
75
76 //#define ORAN_OWD_DEBUG_TX_LOOP
77
78 struct xran_ethdi_ctx g_ethdi_ctx = { 0 };
79 enum xran_if_state xran_if_current_state = XRAN_STOPPED;
80
81 struct rte_mbuf *xran_ethdi_mbuf_alloc(void)
82 {
83     return rte_pktmbuf_alloc(_eth_mbuf_pool);
84 }
85
86 struct rte_mbuf *xran_ethdi_mbuf_indir_alloc(void)
87 {
88     return rte_pktmbuf_alloc(socket_indirect_pool);
89 }
90
91 int32_t xran_ethdi_mbuf_send(struct rte_mbuf *mb, uint16_t ethertype, uint16_t vf_id)
92 {
93     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
94     int res = 0;
95
96     mb->port = ctx->io_cfg.port[vf_id];
97     xran_add_eth_hdr_vlan(&ctx->entities[vf_id][ID_O_RU], ethertype, mb);
98
99     res = xran_enqueue_mbuf(mb, ctx->tx_ring[vf_id]);
100     return res;
101 }
102
103 int32_t xran_ethdi_mbuf_send_cp(struct rte_mbuf *mb, uint16_t ethertype, uint16_t vf_id)
104 {
105     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
106     int res = 0;
107
108     mb->port = ctx->io_cfg.port[vf_id];
109     xran_add_eth_hdr_vlan(&ctx->entities[vf_id][ID_O_RU], ethertype, mb);
110
111     res = xran_enqueue_mbuf(mb, ctx->tx_ring[vf_id]);
112     return res;
113 }
114
115 struct {
116     uint16_t ethertype;
117     ethertype_handler fn;
118 } xran_ethertype_handlers[] = {
119     { ETHER_TYPE_ECPRI, NULL },
120 };
121
122 int32_t xran_register_ethertype_handler(uint16_t ethertype, ethertype_handler callback)
123 {
124     int i;
125
126     for (i = 0; i < RTE_DIM(xran_ethertype_handlers); ++i)
127         if (xran_ethertype_handlers[i].ethertype == ethertype) {
128             xran_ethertype_handlers[i].fn = callback;
129
130             return 1;
131         }
132
133     print_err("support for ethertype %u not found", ethertype);
134
135     return 0;
136 }
137
138 int xran_handle_ether(uint16_t ethertype, struct rte_mbuf* pkt_q[], uint16_t xport_id, struct xran_eaxc_info *p_cid,  uint16_t num)
139 {
140     int i;
141
142     for (i = 0; i < RTE_DIM(xran_ethertype_handlers); ++i)
143         if (xran_ethertype_handlers[i].ethertype == ethertype)
144             if (xran_ethertype_handlers[i].fn){
145 //                rte_prefetch0(rte_pktmbuf_mtod(pkt, void *));
146                 return xran_ethertype_handlers[i].fn(pkt_q, xport_id, p_cid, num);
147             }
148
149     print_err("Packet with unrecognized ethertype '%.4X' dropped", ethertype);
150
151     return MBUF_FREE;
152 };
153
154
155 /* Process vlan tag. Cut the ethernet header. Call the etherype handlers. */
156 int xran_ethdi_filter_packet(struct rte_mbuf *pkt_q[], uint16_t vf_id, uint16_t q_id, uint16_t num)
157 {
158     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
159     uint16_t port_id = ctx->vf2xran_port[vf_id];
160     struct xran_eaxc_info *p_cid = &ctx->vf_and_q2cid[vf_id][q_id];
161
162     xran_handle_ether(ETHER_TYPE_ECPRI, pkt_q, port_id, p_cid, num);
163
164     return MBUF_FREE;
165 }
166
167 /* Check the link status of all ports in up to 9s, and print them finally */
168 static void check_port_link_status(uint8_t portid)
169 {
170 #define CHECK_INTERVAL 100 /* 100ms */
171 #define MAX_CHECK_TIME 90 /* 9s (90 * 100ms) in total */
172     uint8_t count, all_ports_up, print_flag = 0;
173     struct rte_eth_link link;
174
175     printf("\nChecking link status portid [%d]  ",  portid);
176     fflush(stdout);
177     for (count = 0; count <= MAX_CHECK_TIME; count++) {
178         all_ports_up = 1;
179         memset(&link, 0, sizeof(link));
180         rte_eth_link_get_nowait(portid, &link);
181
182         /* print link status if flag set */
183         if (print_flag == 1) {
184             if (link.link_status)
185                 printf("Port %d Link Up - speed %u "
186                         "Mbps - %s\n", (uint8_t)portid,
187                         (unsigned)link.link_speed,
188                         (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
189                         ("full-duplex") : ("half-duplex\n"));
190             else
191                 printf("Port %d Link Down\n",
192                         (uint8_t)portid);
193         }
194         /* clear all_ports_up flag if any link down */
195         if (link.link_status == ETH_LINK_DOWN) {
196             all_ports_up = 0;
197             break;
198         }
199         /* after finally printing all link status, get out */
200         if (print_flag == 1)
201             break;
202
203         if (all_ports_up == 0) {
204             printf(".");
205             fflush(stdout);
206             rte_delay_ms(CHECK_INTERVAL);
207         }
208
209         /* set the print_flag if all ports up or timeout */
210         if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
211             print_flag = 1;
212             printf(" ... done\n");
213         }
214     }
215 }
216
217 /**
218  * create a flow rule that sends packets with matching pc_id
219  * to selected queue.
220  *
221  * @param port_id
222  *   The selected port.
223  * @param rx_q
224  *   The selected target queue.
225  * @param pc_id_be
226  *   The value to apply to the pc_id.
227  * @param[out] error
228  *   Perform verbose error reporting if not NULL.
229  *
230  * @return
231  *   A flow if the rule could be created else return NULL.
232  */
233 struct rte_flow *
234 generate_ecpri_flow(uint16_t port_id, uint16_t rx_q, uint16_t pc_id_be, struct rte_flow_error *error)
235 {
236     struct rte_flow *flow = NULL;
237 #if (RTE_VER_YEAR >= 21)
238 #define MAX_PATTERN_NUM         3
239 #define MAX_ACTION_NUM          2
240     struct rte_flow_attr attr;
241     struct rte_flow_item pattern[MAX_PATTERN_NUM];
242     struct rte_flow_action action[MAX_ACTION_NUM];
243
244     struct rte_flow_action_queue queue = { .index = rx_q };
245     struct rte_flow_item_ecpri ecpri_spec;
246     struct rte_flow_item_ecpri ecpri_mask;
247
248     int res;
249     print_dbg("%s\n", __FUNCTION__);
250     memset(pattern, 0, sizeof(pattern));
251     memset(action, 0, sizeof(action));
252
253     /*
254      * set the rule attribute.
255      * in this case only ingress packets will be checked.
256      */
257     memset(&attr, 0, sizeof(struct rte_flow_attr));
258     attr.ingress = 1;
259
260     /*
261      * create the action sequence.
262      * one action only,  move packet to queue
263      */
264     action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE;
265     action[0].conf = &queue;
266     action[1].type = RTE_FLOW_ACTION_TYPE_END;
267
268     /*
269      * set the first level of the pattern (ETH).
270      * since in this example we just want to get the
271      * eCPRI we set this level to allow all.
272      */
273     pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH;
274
275     memset(&ecpri_spec, 0, sizeof(struct rte_flow_item_ecpri));
276     memset(&ecpri_mask, 0, sizeof(struct rte_flow_item_ecpri));
277
278     ecpri_spec.hdr.common.type = RTE_ECPRI_MSG_TYPE_IQ_DATA;
279     ecpri_spec.hdr.type0.pc_id = pc_id_be;
280
281     ecpri_mask.hdr.common.type  = 0xff;
282     ecpri_mask.hdr.type0.pc_id  = 0xffff;
283
284     ecpri_spec.hdr.common.u32   = rte_cpu_to_be_32(ecpri_spec.hdr.common.u32);
285
286     pattern[1].type = RTE_FLOW_ITEM_TYPE_ECPRI;
287     pattern[1].spec = &ecpri_spec;
288     pattern[1].mask = &ecpri_mask;
289
290     struct rte_flow_item_ecpri *pecpri_spec = (struct rte_flow_item_ecpri *)pattern[1].spec;
291     struct rte_flow_item_ecpri *pecpri_mask = (struct rte_flow_item_ecpri *)pattern[1].mask;
292     print_dbg("RTE_FLOW_ITEM_TYPE_ECPRI\n");
293     print_dbg("spec type %x pc_id %x\n", pecpri_spec->hdr.common.type, pecpri_spec->hdr.type0.pc_id);
294     print_dbg("mask type %x pc_id %x\n", pecpri_mask->hdr.common.type, pecpri_mask->hdr.type0.pc_id);
295
296     /* the final level must be always type end */
297     pattern[2].type = RTE_FLOW_ITEM_TYPE_END;
298
299     res = rte_flow_validate(port_id, &attr, pattern, action, error);
300     if (!res)
301         flow = rte_flow_create(port_id, &attr, pattern, action, error);
302     else {
303         rte_panic("Flow can't be created %d message: %s\n",
304                     error->type,
305                     error->message ? error->message : "(no stated reason)");
306     }
307 #endif
308     return flow;
309 }
310
311
312 int32_t
313 xran_ethdi_init_dpdk_io(char *name, const struct xran_io_cfg *io_cfg,
314     int *lcore_id, struct rte_ether_addr *p_o_du_addr,
315     struct rte_ether_addr *p_ru_addr,  uint32_t mtu)
316 {
317     uint16_t port[XRAN_VF_MAX];
318     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
319     int i,ivf;
320     char core_mask[64];
321     uint64_t c_mask         = 0L;
322     uint64_t c_mask_64_127  = 0L;
323     uint64_t nWorkerCore = 1;
324     uint32_t coreNum = sysconf(_SC_NPROCESSORS_CONF);
325     char bbdev_wdev[32]   = "";
326     char bbdev_vdev[32]   = "";
327     char iova_mode[32]    = "--iova-mode=pa";
328     char socket_mem[32]   = "--socket-mem=8192";
329     char socket_limit[32] = "--socket-limit=8192";
330     char ring_name[32]    = "";
331     int32_t xran_port = -1;
332     queueid_t qi = 0;
333     uint32_t cpu = 0;
334     uint32_t node = 0;
335
336     cpu = sched_getcpu();
337     node = numa_node_of_cpu(cpu);
338
339     char *argv[] = { name, core_mask, "-n2", iova_mode, socket_mem, socket_limit, "--proc-type=auto",
340         "--file-prefix", name, "-a0000:00:00.0", bbdev_wdev, bbdev_vdev};
341
342     if (io_cfg == NULL)
343         return 0;
344     if(io_cfg->bbdev_mode != XRAN_BBDEV_NOT_USED){
345         printf("BBDEV_FEC_ACCL_NR5G\n");
346         if (io_cfg->bbdev_mode == XRAN_BBDEV_MODE_HW_ON){
347             // hw-accelerated bbdev
348             printf("hw-accelerated bbdev %s\n", io_cfg->bbdev_dev[0]);
349             snprintf(bbdev_wdev, RTE_DIM(bbdev_wdev), "-a%s", io_cfg->bbdev_dev[0]);
350         } else if (io_cfg->bbdev_mode == XRAN_BBDEV_MODE_HW_OFF){
351             snprintf(bbdev_wdev, RTE_DIM(bbdev_wdev), "%s", "--vdev=baseband_turbo_sw");
352         } else if (io_cfg->bbdev_mode == XRAN_BBDEV_MODE_HW_SW){
353             printf("software and hw-accelerated bbdev %s\n", io_cfg->bbdev_dev[0]);
354             snprintf(bbdev_wdev, RTE_DIM(bbdev_wdev), "-a%s", io_cfg->bbdev_dev[0]);
355             snprintf(bbdev_vdev, RTE_DIM(bbdev_vdev), "%s", "--vdev=baseband_turbo_sw");
356         } else {
357             rte_panic("Cannot init DPDK incorrect [bbdev_mode %d]\n", io_cfg->bbdev_mode);
358         }
359     }
360
361     if (io_cfg->dpdkIoVaMode == 1){
362         snprintf(iova_mode, RTE_DIM(iova_mode), "%s", "--iova-mode=va");
363     }
364
365     if (io_cfg->dpdkMemorySize){
366         printf("node %d\n", node);
367         if (node == 1){
368             snprintf(socket_mem, RTE_DIM(socket_mem), "--socket-mem=0,%d", io_cfg->dpdkMemorySize);
369             snprintf(socket_limit, RTE_DIM(socket_limit), "--socket-limit=0,%d", io_cfg->dpdkMemorySize);
370         } else {
371             snprintf(socket_mem, RTE_DIM(socket_mem), "--socket-mem=%d,0", io_cfg->dpdkMemorySize);
372             snprintf(socket_limit, RTE_DIM(socket_limit), "--socket-limit=%d,0", io_cfg->dpdkMemorySize);
373         }
374     }
375
376     if (io_cfg->core < 64)
377         c_mask |= (long)(1L << io_cfg->core);
378     else
379         c_mask_64_127 |= (long)(1L << (io_cfg->core - 64));
380
381     if (io_cfg->system_core < 64)
382         c_mask |= (long)(1L << io_cfg->system_core);
383     else
384         c_mask_64_127 |= (long)(1L << (io_cfg->system_core - 64));
385
386     if (io_cfg->timing_core < 64)
387        c_mask |=  (long)(1L << io_cfg->timing_core);
388     else
389        c_mask_64_127 |= (long)(1L << (io_cfg->timing_core - 64));
390
391     nWorkerCore = 1L;
392     for (i = 0; i < coreNum && i < 64; i++) {
393         if (nWorkerCore & (uint64_t)io_cfg->pkt_proc_core) {
394             c_mask |= nWorkerCore;
395         }
396         nWorkerCore = nWorkerCore << 1;
397     }
398
399     nWorkerCore = 1L;
400     for (i = 64; i < coreNum && i < 128; i++) {
401         if (nWorkerCore & (uint64_t)io_cfg->pkt_proc_core_64_127) {
402             c_mask_64_127 |= nWorkerCore;
403         }
404         nWorkerCore = nWorkerCore << 1;
405     }
406
407     printf("total cores %d c_mask 0x%lx%016lx core %d [id] system_core %d [id] pkt_proc_core 0x%lx%016lx [mask] pkt_aux_core %d [id] timing_core %d [id]\n",
408         coreNum, c_mask_64_127, c_mask, io_cfg->core, io_cfg->system_core, io_cfg->pkt_proc_core_64_127, io_cfg->pkt_proc_core, io_cfg->pkt_aux_core, io_cfg->timing_core);
409
410     snprintf(core_mask, sizeof(core_mask), "-c 0x%lx%016lx",c_mask_64_127,c_mask);
411
412     ctx->io_cfg = *io_cfg;
413
414     for (ivf = 0; ivf < XRAN_VF_MAX; ivf++){
415         for (i = 0; i < ID_MAX; i++)     /* Initialize all as broadcast */
416             memset(&ctx->entities[ivf][i], 0xFF, sizeof(ctx->entities[0][0]));
417     }
418
419     printf("%s: Calling rte_eal_init:", __FUNCTION__);
420     for (i = 0; i < RTE_DIM(argv); i++)
421     {
422         printf("%s ", argv[i]);
423     }
424     printf("\n");
425
426     /* This will return on system_core, which is not necessarily the
427      * one we're on right now. */
428     if (rte_eal_init(RTE_DIM(argv), argv) < 0)
429         rte_panic("Cannot init EAL: %s\n", rte_strerror(rte_errno));
430
431     if (rte_eal_process_type() == RTE_PROC_SECONDARY)
432         rte_exit(EXIT_FAILURE,
433                 "Secondary process type not supported.\n");
434
435     xran_init_mbuf_pool(mtu);
436
437 #ifdef RTE_LIBRTE_PDUMP
438     /* initialize packet capture framework */
439     rte_pdump_init();
440 #endif
441
442     /* Timers. */
443     rte_timer_subsystem_init();
444
445     *lcore_id = rte_get_next_lcore(rte_lcore_id(), 0, 0);
446
447     PANIC_ON(*lcore_id == RTE_MAX_LCORE, "out of lcores for io_loop()");
448
449     for (i = 0; i < XRAN_VF_MAX; i++)
450         port[i] = 0xffff;
451
452     if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
453         for (i = 0; i < XRAN_VF_MAX && i < io_cfg->num_vfs; i++){
454             if(io_cfg->dpdk_dev[i]){
455                 struct rte_dev_iterator iterator;
456                 uint16_t port_id;
457
458                 if (rte_dev_probe(io_cfg->dpdk_dev[i]) != 0 ||
459                     rte_eth_dev_count_avail() == 0) {
460                         errx(1, "Network port doesn't exist\n");
461                 }
462
463                 RTE_ETH_FOREACH_MATCHING_DEV(port_id, io_cfg->dpdk_dev[i], &iterator){
464                         port[i] = port_id;
465                         xran_init_port(port[i], io_cfg->num_rxq, mtu);
466             }
467
468                 if(!(i & 1) || io_cfg->one_vf_cu_plane){
469                 snprintf(ring_name, RTE_DIM(ring_name), "%s_%d", "tx_ring_up", i);
470                     ctx->tx_ring[i] = rte_ring_create(ring_name, NUM_MBUFS_RING_TRX,
471                     rte_lcore_to_socket_id(*lcore_id), RING_F_SC_DEQ);
472                     PANIC_ON(ctx->tx_ring[i] == NULL, "failed to allocate tx ring");
473                     for(qi = 0; qi < io_cfg->num_rxq; qi++) {
474                         snprintf(ring_name, RTE_DIM(ring_name), "%s_%d_%d", "rx_ring_up", i, qi);
475                         ctx->rx_ring[i][qi] = rte_ring_create(ring_name, NUM_MBUFS_RING_TRX,
476                             rte_lcore_to_socket_id(*lcore_id), RING_F_SP_ENQ);
477                         PANIC_ON(ctx->rx_ring[i][qi] == NULL, "failed to allocate rx ring");
478                     }
479             }else {
480                 snprintf(ring_name, RTE_DIM(ring_name), "%s_%d", "tx_ring_cp", i);
481                     ctx->tx_ring[i] = rte_ring_create(ring_name, NUM_MBUFS_RING_TRX,
482                     rte_lcore_to_socket_id(*lcore_id), RING_F_SC_DEQ);
483                     PANIC_ON(ctx->tx_ring[i] == NULL, "failed to allocate rx ring");
484                     for(qi = 0; qi < io_cfg->num_rxq; qi++) {
485                         snprintf(ring_name, RTE_DIM(ring_name), "%s_%d_%d", "rx_ring_cp", i, qi);
486                         ctx->rx_ring[i][qi] = rte_ring_create(ring_name, NUM_MBUFS_RING_TRX,
487                             rte_lcore_to_socket_id(*lcore_id), RING_F_SP_ENQ);
488                         PANIC_ON(ctx->rx_ring[i][qi] == NULL, "failed to allocate rx ring");
489                     }
490                 }
491             } else {
492                 printf("no DPDK port provided\n");
493                 xran_init_port_mempool(i, mtu);
494             }
495
496             if(io_cfg->dpdk_dev[i]){
497                 check_port_link_status(port[i]);
498             }
499         }
500     } else {
501         rte_panic("ethdi_dpdk_io_loop() failed to start  with RTE_PROC_SECONDARY\n");
502     }
503
504     for (i = 0; i < XRAN_VF_MAX && i < io_cfg->num_vfs; i++){
505         ctx->io_cfg.port[i] = port[i];
506         print_dbg("port_id 0x%04x\n", ctx->io_cfg.port[i]);
507     }
508
509     for (i = 0; i < XRAN_VF_MAX; i++){
510         ctx->vf2xran_port[i] = 0xFFFF;
511         ctx->rxq_per_port[i] = 1;
512         for (qi = 0; qi < XRAN_VF_QUEUE_MAX; qi++){
513             ctx->vf_and_q2pc_id[i][qi] = 0xFFFF;
514
515             ctx->vf_and_q2cid[i][qi].cuPortId     = 0xFF;
516             ctx->vf_and_q2cid[i][qi].bandSectorId = 0xFF;
517             ctx->vf_and_q2cid[i][qi].ccId         = 0xFF;
518             ctx->vf_and_q2cid[i][qi].ruPortId     = 0xFF;
519         }
520     }
521
522     for (i = 0; i < XRAN_VF_MAX && i < io_cfg->num_vfs; i++){
523         if(io_cfg->dpdk_dev[i]){
524             struct rte_ether_addr *p_addr;
525
526             if(i % (io_cfg->nEthLinePerPort * (2 - 1*ctx->io_cfg.one_vf_cu_plane)) == 0) /* C-p and U-p VFs per line */
527                 xran_port +=1;
528
529             rte_eth_macaddr_get(port[i], &ctx->entities[i][io_cfg->id]);
530
531             p_addr = &ctx->entities[i][io_cfg->id];
532             printf("[%2d] vf %2u local  SRC MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
533                    " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
534                    (unsigned)xran_port,
535                    (unsigned)i,
536                    p_addr->addr_bytes[0], p_addr->addr_bytes[1], p_addr->addr_bytes[2],
537                    p_addr->addr_bytes[3], p_addr->addr_bytes[4], p_addr->addr_bytes[5]);
538
539             p_addr = &p_ru_addr[i];
540             printf("[%2d] vf %2u remote DST MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
541                    " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
542                    (unsigned)xran_port,
543                    (unsigned)i,
544                    p_addr->addr_bytes[0], p_addr->addr_bytes[1], p_addr->addr_bytes[2],
545                    p_addr->addr_bytes[3], p_addr->addr_bytes[4], p_addr->addr_bytes[5]);
546
547             rte_ether_addr_copy(&p_ru_addr[i],  &ctx->entities[i][ID_O_RU]);
548             ctx->vf2xran_port[i] = xran_port;
549             ctx->rxq_per_port[i] = io_cfg->num_rxq;
550         }
551     }
552
553     for(i = 0; i < xran_port + 1 && i < XRAN_PORTS_NUM; i++) {
554         snprintf(ring_name, RTE_DIM(ring_name), "%s_%d", "dl_gen_ring_up", i);
555         ctx->up_dl_pkt_gen_ring[i] = rte_ring_create(ring_name, NUM_MBUFS_RING,
556         rte_lcore_to_socket_id(*lcore_id), /*RING_F_SC_DEQ*/0);
557         PANIC_ON(ctx->up_dl_pkt_gen_ring[i] == NULL, "failed to allocate dl gen ring");
558         printf("created %s\n", ring_name);
559     }
560
561     return 1;
562 }
563
564 static inline uint16_t xran_tx_from_ring(int port, struct rte_ring *r)
565 {
566     struct rte_mbuf *mbufs[BURST_SIZE];
567     uint16_t dequeued, sent = 0;
568     uint32_t remaining;
569     long t1 = MLogXRANTick();
570
571     dequeued = rte_ring_dequeue_burst(r, (void **)mbufs, BURST_SIZE,
572             &remaining);
573     if (!dequeued)
574         return 0;   /* Nothing to send. */
575
576     while (1) {     /* When tx queue is full it is trying again till succeed */
577         sent += rte_eth_tx_burst(port, 0, &mbufs[sent], dequeued - sent);
578         if (sent == dequeued){
579             MLogXRANTask(PID_RADIO_ETH_TX_BURST, t1, MLogXRANTick());
580             return remaining;
581     }
582 }
583 }
584
585 int32_t process_dpdk_io(void* args)
586 {
587     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
588     struct xran_io_cfg * cfg = &(xran_ethdi_get_ctx()->io_cfg);
589     int32_t* port = &cfg->port[0];
590     int port_id = 0;
591     int qi      = 0;
592
593     rte_timer_manage();
594
595     for (port_id = 0; port_id < XRAN_VF_MAX && port_id < ctx->io_cfg.num_vfs; port_id++){
596         struct rte_mbuf *mbufs[BURST_RX_IO_SIZE];
597         if(port[port_id] == 0xFF)
598             return 0;
599
600         /* RX */
601         for(qi = 0; qi < ctx->rxq_per_port[port_id]; qi++) {
602             const uint16_t rxed = rte_eth_rx_burst(port[port_id], qi, mbufs, BURST_RX_IO_SIZE);
603         if (rxed != 0){
604             unsigned enq_n = 0;
605                 long t1 = MLogXRANTick();
606                 ctx->rx_vf_queue_cnt[port[port_id]][qi] += rxed;
607                 enq_n =  rte_ring_enqueue_burst(ctx->rx_ring[port_id][qi], (void*)mbufs, rxed, NULL);
608             if(rxed - enq_n)
609                 rte_panic("error enq\n");
610                 MLogXRANTask(PID_RADIO_RX_VALIDATE, t1, MLogXRANTick());
611         }
612         }
613
614         /* TX */
615
616         xran_tx_from_ring(port[port_id], ctx->tx_ring[port_id]);
617         /* One way Delay Measurements */
618         if ((cfg->eowd_cmn[cfg->id].owdm_enable != 0) && (cfg->eowd_cmn[cfg->id].measVf == port_id))
619         {
620           if (!xran_ecpri_port_update_required(cfg, (uint16_t)port_id))
621             {
622 #ifdef ORAN_OWD_DEBUG_TX_LOOP
623                 printf("going to owd tx for port %d\n", port_id);
624 #endif
625                 if (xran_ecpri_one_way_delay_measurement_transmitter((uint16_t) port_id, (void*)xran_dev_get_ctx()) != OK)
626                 {
627                     errx(1,"Exit pdio port_id %d", port_id);
628                 }
629             }
630         }
631
632         if (XRAN_STOPPED == xran_if_current_state)
633             return -1;
634     }
635
636     if (XRAN_STOPPED == xran_if_current_state)
637             return -1;
638
639     return 0;
640 }
641
642 int32_t process_dpdk_io_tx(void* args)
643 {
644     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
645     struct xran_io_cfg * cfg = &(xran_ethdi_get_ctx()->io_cfg);
646     int32_t* port = &cfg->port[0];
647     int port_id = 0;
648
649     //rte_timer_manage();
650
651     for (port_id = 0; port_id < XRAN_VF_MAX && port_id < ctx->io_cfg.num_vfs; port_id++){
652         if(port[port_id] == 0xFF)
653             return 0;
654         /* TX */
655         xran_tx_from_ring(port[port_id], ctx->tx_ring[port_id]);
656
657         if (XRAN_STOPPED == xran_if_current_state)
658             return -1;
659     }
660
661     if (XRAN_STOPPED == xran_if_current_state)
662             return -1;
663
664     return 0;
665 }
666
667 int32_t process_dpdk_io_rx(void* args)
668 {
669     struct xran_ethdi_ctx *ctx = xran_ethdi_get_ctx();
670     struct xran_io_cfg * cfg = &(xran_ethdi_get_ctx()->io_cfg);
671     int32_t* port = &cfg->port[0];
672     int port_id = 0;
673     int qi     = 0;
674
675     rte_timer_manage();
676
677     if (XRAN_RUNNING != xran_if_current_state)
678             return 0;
679
680     for (port_id = 0; port_id < XRAN_VF_MAX && port_id < ctx->io_cfg.num_vfs; port_id++){
681         struct rte_mbuf *mbufs[BURST_RX_IO_SIZE];
682         if(port[port_id] == 0xFF)
683             return 0;
684
685         /* RX */
686         for(qi = 0; qi < ctx->rxq_per_port[port_id]; qi++){
687             const uint16_t rxed = rte_eth_rx_burst(port[port_id], qi, mbufs, BURST_RX_IO_SIZE);
688             if (rxed != 0){
689                 unsigned enq_n = 0;
690                 long t1 = MLogXRANTick();
691                 ctx->rx_vf_queue_cnt[port[port_id]][qi] += rxed;
692                 enq_n =  rte_ring_enqueue_burst(ctx->rx_ring[port_id][qi], (void*)mbufs, rxed, NULL);
693                 if(rxed - enq_n)
694                     rte_panic("error enq\n");
695                 MLogXRANTask(PID_RADIO_RX_VALIDATE, t1, MLogXRANTick());
696             }
697         }
698         if (XRAN_STOPPED == xran_if_current_state)
699             return -1;
700     }
701
702     if (XRAN_STOPPED == xran_if_current_state)
703             return -1;
704
705     return 0;
706 }
707