From 5b8f22b6f8fdf1ff574270709a28f79428bbbef5 Mon Sep 17 00:00:00 2001 From: Tuomas Taipale Date: Thu, 16 Apr 2026 12:48:15 +0000 Subject: [PATCH] example: classifier: add queue and cos configurability Add new `-q`, `--cos_queue_param` option for configuring the CoS/queue that is defined with the policy destination queue parameter (`-p `). For a queue, priority, synchronization and a single packet aggregator with timeout and vector size can be configured. For the related CoS, aggregator enqueue profile and optional parameter for the custom profile can be configured. These now enable the testing of event aggregation in the context of classification with varying CoS aggregator enqueuing profiles. Signed-off-by: Tuomas Taipale --- example/classifier/odp_classifier.c | 393 +++++++++++++++++++++++----- 1 file changed, 326 insertions(+), 67 deletions(-) diff --git a/example/classifier/odp_classifier.c b/example/classifier/odp_classifier.c index 49be25381b8..e23452d5f6f 100644 --- a/example/classifier/odp_classifier.c +++ b/example/classifier/odp_classifier.c @@ -12,6 +12,10 @@ * @cond _ODP_HIDE_FROM_DOXYGEN_ */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + #include #include #include @@ -87,7 +91,6 @@ typedef struct { char value[DISPLAY_STRING_LEN]; /**< Display string for value */ char mask[DISPLAY_STRING_LEN]; /**< Display string for mask */ int has_src_cos; - } global_statistics; typedef struct { @@ -95,13 +98,24 @@ typedef struct { uint64_t count; } ci_pass_counters; +typedef struct { + char cos_name[ODP_COS_NAME_LEN]; + odp_aggr_enq_profile_t cos_enq_prof; + odp_event_aggr_config_t q_aggr; + odp_schedule_prio_t q_prio; + odp_schedule_sync_t q_sync; + uint32_t num_aggr; +} cos_q_param_t; + typedef struct { odp_pktout_queue_t pktout[MAX_WORKERS]; int num_pktout; global_statistics stats[MAX_PMR_COUNT]; ci_pass_counters ci_pass_rules[MAX_PMR_COUNT]; + cos_q_param_t cos_q_param[MAX_PMR_COUNT]; int policy_count; /**< global policy count */ int num_ci_pass_rules; /**< ci pass count */ + int num_cos_q_param; int appl_mode; /**< application mode */ odp_atomic_u64_t total_packets; /**< total received packets */ unsigned int cpu_count; /**< Number of CPUs to use */ @@ -116,6 +130,7 @@ typedef struct { int cos_pools; int pool_size; int burst_size; + uint32_t vector_size; } appl_args_t; enum packet_mode { @@ -354,23 +369,83 @@ static odp_pktio_t create_pktio(const char *dev, odp_pool_t pool) return pktio; } +static void process_packets(odp_packet_t pkts[], int num, odp_queue_t in_q, + odp_pktout_queue_t out_q, appl_args_t *appl, int thr) +{ + odp_pool_t pool; + int i, j, dropped, sent; + global_statistics *stats; + unsigned long err_cnt = 0; + + if (odp_unlikely(appl->verbose)) { + for (j = 0; j < num; j++) { + odp_queue_info_t info; + uint32_t len = odp_packet_len(pkts[j]); + + if (odp_queue_info(in_q, &info) == 0) + printf("Queue: %s\n", info.name); + + if (len > 96) + len = 96; + + odp_packet_print_data(pkts[j], 0, len); + } + } + + /* Total packets received */ + odp_atomic_add_u64(&appl->total_packets, num); + + /* Drop packets with errors */ + dropped = drop_err_pkts(pkts, num); + if (odp_unlikely(dropped)) { + num -= dropped; + err_cnt += dropped; + ODPH_ERR("Drop frame - err_cnt:%lu\n", err_cnt); + } + + for (j = 0; j < num; j++) { + pool = odp_packet_pool(pkts[j]); + + for (i = 0; i < MAX_PMR_COUNT; i++) { + stats = &appl->stats[i]; + if (in_q == stats->queue) + odp_atomic_inc_u64(&stats->queue_pkt_count); + if (pool == stats->pool) + odp_atomic_inc_u64(&stats->pool_pkt_count); + } + } + + if (appl->appl_mode == APPL_MODE_DROP) { + odp_packet_free_multi(pkts, num); + return; + } + + /* Swap Eth MACs and possibly IP-addrs before sending back */ + swap_pkt_addrs(pkts, num); + + sent = odp_pktout_send(out_q, pkts, num); + sent = sent < 0 ? 0 : sent; + + if (sent != num) { + ODPH_ERR(" [%i] Packet send failed\n", thr); + odp_packet_free_multi(pkts + sent, num - sent); + } +} + /** * Worker threads to receive the packet * */ static int pktio_receive_thread(void *arg) { - int thr; - odp_packet_t pkt[MAX_PKT_BURST]; - odp_pool_t pool; - odp_event_t ev[MAX_PKT_BURST]; + const int thr = odp_thread_id(); + odp_packet_t vector_pkts[MAX_PKT_BURST], single_pkts[MAX_PKT_BURST]; + odp_event_t ev, evs[MAX_PKT_BURST], *ev_tbl; + odp_event_vector_t evv; + int num_recv, num_pkts, vector_size; odp_queue_t queue; - int i, j, num, dropped, sent; - global_statistics *stats; - unsigned long err_cnt = 0; - thr = odp_thread_id(); appl_args_t *appl = (appl_args_t *)arg; - uint64_t wait_time = odp_schedule_wait_time(100 * ODP_TIME_MSEC_IN_NS); + const uint64_t wait_time = odp_schedule_wait_time(100 * ODP_TIME_MSEC_IN_NS); odp_pktout_queue_t pktout = appl_args_gbl->pktout[thr % appl_args_gbl->num_pktout]; /* Loop packets */ @@ -379,70 +454,71 @@ static int pktio_receive_thread(void *arg) break; /* Use schedule to get buf from any input queue */ - num = odp_schedule_multi(&queue, wait_time, ev, appl_args_gbl->burst_size); + num_recv = odp_schedule_multi(&queue, wait_time, evs, appl_args_gbl->burst_size); /* Loop back to receive packets incase of invalid event */ - if (odp_unlikely(!num)) + if (odp_unlikely(!num_recv)) continue; - odp_packet_from_event_multi(pkt, ev, num); - - if (odp_unlikely(appl->verbose)) { - for (j = 0; j < num; j++) { - odp_queue_info_t info; - uint32_t len = odp_packet_len(pkt[j]); - - if (odp_queue_info(queue, &info) == 0) - printf("Queue: %s\n", info.name); + num_pkts = 0; - if (len > 96) - len = 96; + for (int i = 0; i < num_recv; i++) { + ev = evs[i]; - odp_packet_print_data(pkt[j], 0, len); + if (odp_event_type(ev) == ODP_EVENT_VECTOR) { + evv = odp_event_vector_from_event(ev); + odp_event_vector_tbl(evv, &ev_tbl); + vector_size = odp_event_vector_size(evv); + odp_packet_from_event_multi(vector_pkts, ev_tbl, vector_size); + process_packets(vector_pkts, vector_size, queue, pktout, appl, + thr); + odp_event_vector_free(evv); + continue; } - } - - /* Total packets received */ - odp_atomic_add_u64(&appl->total_packets, num); - /* Drop packets with errors */ - dropped = drop_err_pkts(pkt, num); - if (odp_unlikely(dropped)) { - num -= dropped; - err_cnt += dropped; - ODPH_ERR("Drop frame - err_cnt:%lu\n", err_cnt); + single_pkts[num_pkts++] = odp_packet_from_event(ev); } - for (j = 0; j < num; j++) { - pool = odp_packet_pool(pkt[j]); - - for (i = 0; i < MAX_PMR_COUNT; i++) { - stats = &appl->stats[i]; - if (queue == stats->queue) - odp_atomic_inc_u64(&stats->queue_pkt_count); - if (pool == stats->pool) - odp_atomic_inc_u64(&stats->pool_pkt_count); - } - } + if (num_pkts > 0) + process_packets(single_pkts, num_pkts, queue, pktout, appl, thr); + } - if (appl->appl_mode == APPL_MODE_DROP) { - odp_packet_free_multi(pkt, num); - continue; - } + return 0; +} - /* Swap Eth MACs and possibly IP-addrs before sending back */ - swap_pkt_addrs(pkt, num); +static odp_pool_t create_vector_pool(appl_args_t *args) +{ + /* For now, singleton pool */ + static odp_pool_t pool = ODP_POOL_INVALID; + odp_pool_param_t param; - sent = odp_pktout_send(pktout, pkt, num); - sent = sent < 0 ? 0 : sent; + if (pool != ODP_POOL_INVALID) + return pool; - if (sent != num) { - ODPH_ERR(" [%i] Packet send failed\n", thr); - odp_packet_free_multi(pkt + sent, num - sent); - } + odp_pool_param_init(¶m); + param.event_vector.num = args->pool_size; + param.event_vector.max_size = args->vector_size; + param.type = ODP_POOL_EVENT_VECTOR; + pool = odp_pool_create("vector_pool", ¶m); + + if (pool == ODP_POOL_INVALID) { + ODPH_ERR("Error: failed to create event vector pool\n"); + exit(EXIT_FAILURE); } - return 0; + return pool; +} + +static void destroy_vector_pool(odp_pool_t pool) +{ + /* Destroy the singleton pool once */ + static odp_bool_t is_destroyed; + + if (is_destroyed) + return; + + odp_pool_destroy(pool); + is_destroyed = true; } static odp_pool_t pool_create(const char *name) @@ -522,6 +598,18 @@ static odp_cos_t configure_default_cos(odp_pktio_t pktio, appl_args_t *args) return cos_default; } +static cos_q_param_t *find_cos_q_param(appl_args_t *args, const char *name) +{ + int i; + + for (i = 0; i < args->num_cos_q_param; i++) { + if (strcmp(args->cos_q_param[i].cos_name, name) == 0) + return &args->cos_q_param[i]; + } + + return NULL; +} + static int find_cos(appl_args_t *args, const char *name, odp_cos_t *cos) { global_statistics *stats; @@ -546,18 +634,29 @@ static void configure_cos(odp_cos_t default_cos, appl_args_t *args) const char *queue_name; odp_cls_cos_param_t cls_param; int i; + cos_q_param_t *cq_param; global_statistics *stats; odp_queue_param_t qparam; for (i = 0; i < args->policy_count - 1; i++) { stats = &args->stats[i]; - + queue_name = stats->cos_name; odp_queue_param_init(&qparam); - qparam.type = ODP_QUEUE_TYPE_SCHED; - qparam.sched.sync = ODP_SCHED_SYNC_PARALLEL; + qparam.type = ODP_QUEUE_TYPE_SCHED; qparam.sched.group = ODP_SCHED_GROUP_ALL; + odp_cls_cos_param_init(&cls_param); + cq_param = find_cos_q_param(args, queue_name); + + if (cq_param != NULL) { + qparam.sched.sync = cq_param->q_sync; + qparam.sched.prio = cq_param->q_prio; + qparam.num_aggr = cq_param->num_aggr; + qparam.aggr = &cq_param->q_aggr; + cls_param.aggr_enq_profile = cq_param->cos_enq_prof; + } else { + qparam.sched.sync = ODP_SCHED_SYNC_PARALLEL; + } - queue_name = args->stats[i].cos_name; stats->queue = odp_queue_create(queue_name, &qparam); if (ODP_QUEUE_INVALID == stats->queue) { ODPH_ERR("odp_queue_create failed\n"); @@ -569,14 +668,22 @@ static void configure_cos(odp_cos_t default_cos, appl_args_t *args) snprintf(cos_name, sizeof(cos_name), "CoS%s", stats->cos_name); - odp_cls_cos_param_init(&cls_param); cls_param.pool = pool_create(pool_name); if (appl_args_gbl->cos_pools) stats->pool = cls_param.pool; - cls_param.queue = stats->queue; + + if (cq_param != NULL && cq_param->num_aggr > 0) + cls_param.queue = odp_queue_aggr(stats->queue, 0); + else + cls_param.queue = stats->queue; stats->cos = odp_cls_cos_create(cos_name, &cls_param); + if (ODP_COS_INVALID == stats->cos) { + ODPH_ERR("odp_cls_cos_create failed\n"); + exit(EXIT_FAILURE); + } + odp_atomic_init_u64(&stats->queue_pkt_count, 0); odp_atomic_init_u64(&stats->pool_pkt_count, 0); } @@ -706,6 +813,9 @@ int main(int argc, char *argv[]) /* Create packet pool */ pool = pool_create("packet_pool"); + for (i = 0; i < args->num_cos_q_param; i++) + args->cos_q_param[i].q_aggr.pool = create_vector_pool(args); + /* Configure scheduler */ odp_schedule_config(NULL); @@ -722,6 +832,7 @@ int main(int argc, char *argv[]) printf("\n"); odp_pool_print_all(); + odp_queue_print_all(); odp_cls_print_all(); if (odp_pktio_start(pktio)) { @@ -780,6 +891,14 @@ int main(int argc, char *argv[]) if (odp_pktio_close(pktio)) ODPH_ERR("err: close pktio error\n"); + + for (i = 0; i < args->num_cos_q_param; i++) { + cos_q_param_t *cos_q_param = &args->cos_q_param[i]; + + if (cos_q_param->q_aggr.pool != ODP_POOL_INVALID) + destroy_vector_pool(cos_q_param->q_aggr.pool); + } + if (odp_pool_destroy(pool)) ODPH_ERR("err: odp_pool_destroy error\n"); @@ -1111,6 +1230,118 @@ static int parse_pmr_policy(appl_args_t *appl_args, char *optarg) return -1; } +static int parse_cos_q_param(appl_args_t *appl_args, char *optarg) +{ + char *tmp_str = strdup(optarg), *tmp; + cos_q_param_t *cos_q_param; + int ret = 0; + + if (tmp_str == NULL) + ODPH_ABORT("Failed to allocate memory, aborting.\n"); + + if (appl_args->num_cos_q_param >= MAX_PMR_COUNT) { + ODPH_ERR("Too many queue CoS configurations. Max count is %i.\n", MAX_PMR_COUNT); + ret = -1; + goto out; + } + + cos_q_param = &appl_args->cos_q_param[appl_args->num_cos_q_param]; + tmp = strtok(tmp_str, ":"); + + if (tmp == NULL) { + ODPH_ERR("CoS queue name missing.\n"); + ret = -1; + goto out; + } + + odph_strcpy(cos_q_param->cos_name, tmp, ODP_COS_NAME_LEN); + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("CoS queue priority missing.\n"); + ret = -1; + goto out; + } + + cos_q_param->q_prio = strcmp(tmp, "default") == 0 ? + odp_schedule_default_prio() : atoi(tmp); + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("CoS queue synchronization type missing.\n"); + ret = -1; + goto out; + } + + if (strcmp(tmp, "ODP_SCHED_SYNC_PARALLEL") == 0) { + cos_q_param->q_sync = ODP_SCHED_SYNC_PARALLEL; + } else if (strcmp(tmp, "ODP_SCHED_SYNC_ATOMIC") == 0) { + cos_q_param->q_sync = ODP_SCHED_SYNC_ATOMIC; + } else if (strcmp(tmp, "ODP_SCHED_SYNC_ORDERED") == 0) { + cos_q_param->q_sync = ODP_SCHED_SYNC_ORDERED; + } else { + ODPH_ERR("Invalid queue synchronization type: \"%s\".\n", tmp); + ret = -1; + goto out; + } + + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("CoS queue aggregator enqueue profile type missing.\n"); + ret = -1; + goto out; + } + + if (strcmp(tmp, "ODP_AEP_TYPE_NONE") == 0) { + cos_q_param->cos_enq_prof.type = ODP_AEP_TYPE_NONE; + } else if (strcmp(tmp, "ODP_AEP_TYPE_IPV4_FRAG") == 0) { + cos_q_param->cos_enq_prof.type = ODP_AEP_TYPE_IPV4_FRAG; + } else if (strcmp(tmp, "ODP_AEP_TYPE_IPV6_FRAG") == 0) { + cos_q_param->cos_enq_prof.type = ODP_AEP_TYPE_IPV6_FRAG; + } else if (strcmp(tmp, "ODP_AEP_TYPE_CUSTOM") == 0) { + cos_q_param->cos_enq_prof.type = ODP_AEP_TYPE_CUSTOM; + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("Custom aggregator enqueue profile param missing.\n"); + ret = -1; + goto out; + } + + cos_q_param->cos_enq_prof.param = (uintptr_t)strtoll(tmp, NULL, 16); + } else { + ODPH_ERR("Invalid queue aggregator enqueue profile type: \"%s\".\n", tmp); + ret = -1; + goto out; + } + + tmp = strtok(NULL, ":"); + + if (tmp != NULL) { + cos_q_param->q_aggr.max_tmo_ns = atoll(tmp); + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("Malformed aggregator parameter format.\n"); + ret = -1; + goto out; + } + + cos_q_param->q_aggr.max_size = atoi(tmp); + cos_q_param->q_aggr.event_type = ODP_EVENT_PACKET; + cos_q_param->num_aggr = 1; + } + + appl_args->num_cos_q_param++; + appl_args->vector_size = ODPH_MAX(appl_args->vector_size, cos_q_param->q_aggr.max_size); + +out: + free(tmp_str); + + return ret; +} + static int parse_policy_ci_pass_count(appl_args_t *appl_args, char *optarg) { int num_ci_pass_rules; @@ -1170,6 +1401,7 @@ static int parse_args(int argc, char *argv[], appl_args_t *appl_args) {"count", required_argument, NULL, 'c'}, {"interface", required_argument, NULL, 'i'}, {"policy", required_argument, NULL, 'p'}, + {"cos_queue_param", required_argument, NULL, 'q'}, {"mode", required_argument, NULL, 'm'}, {"time", required_argument, NULL, 't'}, {"ci_pass", required_argument, NULL, 'C'}, @@ -1184,7 +1416,7 @@ static int parse_args(int argc, char *argv[], appl_args_t *appl_args) {NULL, 0, NULL, 0} }; - static const char *shortopts = "+c:t:i:p:m:t:C:Pvhe:l:d:s:b:"; + static const char *shortopts = "+c:t:i:p:q:m:t:C:Pvhe:l:d:s:b:"; appl_args->cpu_count = 1; /* Use one worker by default */ appl_args->verbose = 0; @@ -1212,6 +1444,12 @@ static int parse_args(int argc, char *argv[], appl_args_t *appl_args) break; } break; + case 'q': + if (parse_cos_q_param(appl_args, optarg)) { + ret = -1; + break; + } + break; case 't': appl_args->time = atoi(optarg); break; @@ -1316,7 +1554,9 @@ static void usage(void) "ODP Classifier example.\n" "Usage: odp_classifier OPTIONS\n" " E.g. odp_classifier -i eth1 -m 0 -p \"ODP_PMR_SIP_ADDR:10.10.10.0:0xFFFFFF00:queue1\" \\\n" - " -p \"ODP_PMR_SIP_ADDR:10.10.10.10:0xFFFFFFFF:queue1:queue2\"\n" + " -p \"ODP_PMR_SIP_ADDR:10.10.10.10:0xFFFFFFFF:queue1:queue2\" \\\n" + " -q \"queue1:1:ODP_SCHED_SYNC_PARALLEL:ODP_AEP_TYPE_NONE\" \\\n" + " -q \"queue2:3:ODP_SCHED_SYNC_ATOMIC:ODP_AEP_TYPE_CUSTOM:1234:0:4\" \\\n" "\n" "The above example would classify:\n" " 1) Packets from source IP address 10.10.10.0/24 to queue1, except ...\n" @@ -1339,7 +1579,26 @@ static void usage(void) " this is not defined.\n" " Name of the destination queue (CoS).\n" "\n" - " -c, --count CPU count, 0=all available, default=1\n" + " -q, --cos_queue_param ::::::\n" + "\n" + " Configuration for a queue (CoS) defined with '--policy' 'dst queue'.\n" + " CoS queue priority. Use priority value or \"default\" for default priority.\n" + " CoS queue synchronization. Use ODP_SCHED_SYNC_PARALLEL,\n" + " ODP_SCHED_SYNC_ATOMIC or ODP_SCHED_SYNC_ORDERED.\n" + " \n" + " Aggregator enqueue profile type for CoS. Use odp_aggr_enq_profile_t::type\n" + " names.\n" + " \n" + " If ODP_AEP_TYPE_CUSTOM aggregator enqueue profile was used,\n" + " pass a hex value (without '0x') for odp_aggr_enq_profile_t::param.\n" + " \n" + " Aggregator configuration parameter timeout in nanoseconds. Optional,\n" + " if not passed, aggregation for queue disabled.\n" + " \n" + " Aggregator configuration parameter vector size. Mandatory if 'optional tmo ns'\n" + " passed, optional otherwise.\n" + "\n"); + printf(" -c, --count CPU count, 0=all available, default=1\n" "\n" " -m, --mode 0: Packet Drop mode. Received packets will be dropped\n" " !0: Echo mode. Received packets will be sent back\n"