diff --git a/example/classifier/odp_classifier.c b/example/classifier/odp_classifier.c index 49be25381b8..e23452d5f6f 100644 --- a/example/classifier/odp_classifier.c +++ b/example/classifier/odp_classifier.c @@ -12,6 +12,10 @@ * @cond _ODP_HIDE_FROM_DOXYGEN_ */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + #include #include #include @@ -87,7 +91,6 @@ typedef struct { char value[DISPLAY_STRING_LEN]; /**< Display string for value */ char mask[DISPLAY_STRING_LEN]; /**< Display string for mask */ int has_src_cos; - } global_statistics; typedef struct { @@ -95,13 +98,24 @@ typedef struct { uint64_t count; } ci_pass_counters; +typedef struct { + char cos_name[ODP_COS_NAME_LEN]; + odp_aggr_enq_profile_t cos_enq_prof; + odp_event_aggr_config_t q_aggr; + odp_schedule_prio_t q_prio; + odp_schedule_sync_t q_sync; + uint32_t num_aggr; +} cos_q_param_t; + typedef struct { odp_pktout_queue_t pktout[MAX_WORKERS]; int num_pktout; global_statistics stats[MAX_PMR_COUNT]; ci_pass_counters ci_pass_rules[MAX_PMR_COUNT]; + cos_q_param_t cos_q_param[MAX_PMR_COUNT]; int policy_count; /**< global policy count */ int num_ci_pass_rules; /**< ci pass count */ + int num_cos_q_param; int appl_mode; /**< application mode */ odp_atomic_u64_t total_packets; /**< total received packets */ unsigned int cpu_count; /**< Number of CPUs to use */ @@ -116,6 +130,7 @@ typedef struct { int cos_pools; int pool_size; int burst_size; + uint32_t vector_size; } appl_args_t; enum packet_mode { @@ -354,23 +369,83 @@ static odp_pktio_t create_pktio(const char *dev, odp_pool_t pool) return pktio; } +static void process_packets(odp_packet_t pkts[], int num, odp_queue_t in_q, + odp_pktout_queue_t out_q, appl_args_t *appl, int thr) +{ + odp_pool_t pool; + int i, j, dropped, sent; + global_statistics *stats; + unsigned long err_cnt = 0; + + if (odp_unlikely(appl->verbose)) { + for (j = 0; j < num; j++) { + odp_queue_info_t info; + uint32_t len = odp_packet_len(pkts[j]); + + if (odp_queue_info(in_q, &info) == 0) + printf("Queue: %s\n", info.name); + + if (len > 96) + len = 96; + + odp_packet_print_data(pkts[j], 0, len); + } + } + + /* Total packets received */ + odp_atomic_add_u64(&appl->total_packets, num); + + /* Drop packets with errors */ + dropped = drop_err_pkts(pkts, num); + if (odp_unlikely(dropped)) { + num -= dropped; + err_cnt += dropped; + ODPH_ERR("Drop frame - err_cnt:%lu\n", err_cnt); + } + + for (j = 0; j < num; j++) { + pool = odp_packet_pool(pkts[j]); + + for (i = 0; i < MAX_PMR_COUNT; i++) { + stats = &appl->stats[i]; + if (in_q == stats->queue) + odp_atomic_inc_u64(&stats->queue_pkt_count); + if (pool == stats->pool) + odp_atomic_inc_u64(&stats->pool_pkt_count); + } + } + + if (appl->appl_mode == APPL_MODE_DROP) { + odp_packet_free_multi(pkts, num); + return; + } + + /* Swap Eth MACs and possibly IP-addrs before sending back */ + swap_pkt_addrs(pkts, num); + + sent = odp_pktout_send(out_q, pkts, num); + sent = sent < 0 ? 0 : sent; + + if (sent != num) { + ODPH_ERR(" [%i] Packet send failed\n", thr); + odp_packet_free_multi(pkts + sent, num - sent); + } +} + /** * Worker threads to receive the packet * */ static int pktio_receive_thread(void *arg) { - int thr; - odp_packet_t pkt[MAX_PKT_BURST]; - odp_pool_t pool; - odp_event_t ev[MAX_PKT_BURST]; + const int thr = odp_thread_id(); + odp_packet_t vector_pkts[MAX_PKT_BURST], single_pkts[MAX_PKT_BURST]; + odp_event_t ev, evs[MAX_PKT_BURST], *ev_tbl; + odp_event_vector_t evv; + int num_recv, num_pkts, vector_size; odp_queue_t queue; - int i, j, num, dropped, sent; - global_statistics *stats; - unsigned long err_cnt = 0; - thr = odp_thread_id(); appl_args_t *appl = (appl_args_t *)arg; - uint64_t wait_time = odp_schedule_wait_time(100 * ODP_TIME_MSEC_IN_NS); + const uint64_t wait_time = odp_schedule_wait_time(100 * ODP_TIME_MSEC_IN_NS); odp_pktout_queue_t pktout = appl_args_gbl->pktout[thr % appl_args_gbl->num_pktout]; /* Loop packets */ @@ -379,70 +454,71 @@ static int pktio_receive_thread(void *arg) break; /* Use schedule to get buf from any input queue */ - num = odp_schedule_multi(&queue, wait_time, ev, appl_args_gbl->burst_size); + num_recv = odp_schedule_multi(&queue, wait_time, evs, appl_args_gbl->burst_size); /* Loop back to receive packets incase of invalid event */ - if (odp_unlikely(!num)) + if (odp_unlikely(!num_recv)) continue; - odp_packet_from_event_multi(pkt, ev, num); - - if (odp_unlikely(appl->verbose)) { - for (j = 0; j < num; j++) { - odp_queue_info_t info; - uint32_t len = odp_packet_len(pkt[j]); - - if (odp_queue_info(queue, &info) == 0) - printf("Queue: %s\n", info.name); + num_pkts = 0; - if (len > 96) - len = 96; + for (int i = 0; i < num_recv; i++) { + ev = evs[i]; - odp_packet_print_data(pkt[j], 0, len); + if (odp_event_type(ev) == ODP_EVENT_VECTOR) { + evv = odp_event_vector_from_event(ev); + odp_event_vector_tbl(evv, &ev_tbl); + vector_size = odp_event_vector_size(evv); + odp_packet_from_event_multi(vector_pkts, ev_tbl, vector_size); + process_packets(vector_pkts, vector_size, queue, pktout, appl, + thr); + odp_event_vector_free(evv); + continue; } - } - - /* Total packets received */ - odp_atomic_add_u64(&appl->total_packets, num); - /* Drop packets with errors */ - dropped = drop_err_pkts(pkt, num); - if (odp_unlikely(dropped)) { - num -= dropped; - err_cnt += dropped; - ODPH_ERR("Drop frame - err_cnt:%lu\n", err_cnt); + single_pkts[num_pkts++] = odp_packet_from_event(ev); } - for (j = 0; j < num; j++) { - pool = odp_packet_pool(pkt[j]); - - for (i = 0; i < MAX_PMR_COUNT; i++) { - stats = &appl->stats[i]; - if (queue == stats->queue) - odp_atomic_inc_u64(&stats->queue_pkt_count); - if (pool == stats->pool) - odp_atomic_inc_u64(&stats->pool_pkt_count); - } - } + if (num_pkts > 0) + process_packets(single_pkts, num_pkts, queue, pktout, appl, thr); + } - if (appl->appl_mode == APPL_MODE_DROP) { - odp_packet_free_multi(pkt, num); - continue; - } + return 0; +} - /* Swap Eth MACs and possibly IP-addrs before sending back */ - swap_pkt_addrs(pkt, num); +static odp_pool_t create_vector_pool(appl_args_t *args) +{ + /* For now, singleton pool */ + static odp_pool_t pool = ODP_POOL_INVALID; + odp_pool_param_t param; - sent = odp_pktout_send(pktout, pkt, num); - sent = sent < 0 ? 0 : sent; + if (pool != ODP_POOL_INVALID) + return pool; - if (sent != num) { - ODPH_ERR(" [%i] Packet send failed\n", thr); - odp_packet_free_multi(pkt + sent, num - sent); - } + odp_pool_param_init(¶m); + param.event_vector.num = args->pool_size; + param.event_vector.max_size = args->vector_size; + param.type = ODP_POOL_EVENT_VECTOR; + pool = odp_pool_create("vector_pool", ¶m); + + if (pool == ODP_POOL_INVALID) { + ODPH_ERR("Error: failed to create event vector pool\n"); + exit(EXIT_FAILURE); } - return 0; + return pool; +} + +static void destroy_vector_pool(odp_pool_t pool) +{ + /* Destroy the singleton pool once */ + static odp_bool_t is_destroyed; + + if (is_destroyed) + return; + + odp_pool_destroy(pool); + is_destroyed = true; } static odp_pool_t pool_create(const char *name) @@ -522,6 +598,18 @@ static odp_cos_t configure_default_cos(odp_pktio_t pktio, appl_args_t *args) return cos_default; } +static cos_q_param_t *find_cos_q_param(appl_args_t *args, const char *name) +{ + int i; + + for (i = 0; i < args->num_cos_q_param; i++) { + if (strcmp(args->cos_q_param[i].cos_name, name) == 0) + return &args->cos_q_param[i]; + } + + return NULL; +} + static int find_cos(appl_args_t *args, const char *name, odp_cos_t *cos) { global_statistics *stats; @@ -546,18 +634,29 @@ static void configure_cos(odp_cos_t default_cos, appl_args_t *args) const char *queue_name; odp_cls_cos_param_t cls_param; int i; + cos_q_param_t *cq_param; global_statistics *stats; odp_queue_param_t qparam; for (i = 0; i < args->policy_count - 1; i++) { stats = &args->stats[i]; - + queue_name = stats->cos_name; odp_queue_param_init(&qparam); - qparam.type = ODP_QUEUE_TYPE_SCHED; - qparam.sched.sync = ODP_SCHED_SYNC_PARALLEL; + qparam.type = ODP_QUEUE_TYPE_SCHED; qparam.sched.group = ODP_SCHED_GROUP_ALL; + odp_cls_cos_param_init(&cls_param); + cq_param = find_cos_q_param(args, queue_name); + + if (cq_param != NULL) { + qparam.sched.sync = cq_param->q_sync; + qparam.sched.prio = cq_param->q_prio; + qparam.num_aggr = cq_param->num_aggr; + qparam.aggr = &cq_param->q_aggr; + cls_param.aggr_enq_profile = cq_param->cos_enq_prof; + } else { + qparam.sched.sync = ODP_SCHED_SYNC_PARALLEL; + } - queue_name = args->stats[i].cos_name; stats->queue = odp_queue_create(queue_name, &qparam); if (ODP_QUEUE_INVALID == stats->queue) { ODPH_ERR("odp_queue_create failed\n"); @@ -569,14 +668,22 @@ static void configure_cos(odp_cos_t default_cos, appl_args_t *args) snprintf(cos_name, sizeof(cos_name), "CoS%s", stats->cos_name); - odp_cls_cos_param_init(&cls_param); cls_param.pool = pool_create(pool_name); if (appl_args_gbl->cos_pools) stats->pool = cls_param.pool; - cls_param.queue = stats->queue; + + if (cq_param != NULL && cq_param->num_aggr > 0) + cls_param.queue = odp_queue_aggr(stats->queue, 0); + else + cls_param.queue = stats->queue; stats->cos = odp_cls_cos_create(cos_name, &cls_param); + if (ODP_COS_INVALID == stats->cos) { + ODPH_ERR("odp_cls_cos_create failed\n"); + exit(EXIT_FAILURE); + } + odp_atomic_init_u64(&stats->queue_pkt_count, 0); odp_atomic_init_u64(&stats->pool_pkt_count, 0); } @@ -706,6 +813,9 @@ int main(int argc, char *argv[]) /* Create packet pool */ pool = pool_create("packet_pool"); + for (i = 0; i < args->num_cos_q_param; i++) + args->cos_q_param[i].q_aggr.pool = create_vector_pool(args); + /* Configure scheduler */ odp_schedule_config(NULL); @@ -722,6 +832,7 @@ int main(int argc, char *argv[]) printf("\n"); odp_pool_print_all(); + odp_queue_print_all(); odp_cls_print_all(); if (odp_pktio_start(pktio)) { @@ -780,6 +891,14 @@ int main(int argc, char *argv[]) if (odp_pktio_close(pktio)) ODPH_ERR("err: close pktio error\n"); + + for (i = 0; i < args->num_cos_q_param; i++) { + cos_q_param_t *cos_q_param = &args->cos_q_param[i]; + + if (cos_q_param->q_aggr.pool != ODP_POOL_INVALID) + destroy_vector_pool(cos_q_param->q_aggr.pool); + } + if (odp_pool_destroy(pool)) ODPH_ERR("err: odp_pool_destroy error\n"); @@ -1111,6 +1230,118 @@ static int parse_pmr_policy(appl_args_t *appl_args, char *optarg) return -1; } +static int parse_cos_q_param(appl_args_t *appl_args, char *optarg) +{ + char *tmp_str = strdup(optarg), *tmp; + cos_q_param_t *cos_q_param; + int ret = 0; + + if (tmp_str == NULL) + ODPH_ABORT("Failed to allocate memory, aborting.\n"); + + if (appl_args->num_cos_q_param >= MAX_PMR_COUNT) { + ODPH_ERR("Too many queue CoS configurations. Max count is %i.\n", MAX_PMR_COUNT); + ret = -1; + goto out; + } + + cos_q_param = &appl_args->cos_q_param[appl_args->num_cos_q_param]; + tmp = strtok(tmp_str, ":"); + + if (tmp == NULL) { + ODPH_ERR("CoS queue name missing.\n"); + ret = -1; + goto out; + } + + odph_strcpy(cos_q_param->cos_name, tmp, ODP_COS_NAME_LEN); + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("CoS queue priority missing.\n"); + ret = -1; + goto out; + } + + cos_q_param->q_prio = strcmp(tmp, "default") == 0 ? + odp_schedule_default_prio() : atoi(tmp); + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("CoS queue synchronization type missing.\n"); + ret = -1; + goto out; + } + + if (strcmp(tmp, "ODP_SCHED_SYNC_PARALLEL") == 0) { + cos_q_param->q_sync = ODP_SCHED_SYNC_PARALLEL; + } else if (strcmp(tmp, "ODP_SCHED_SYNC_ATOMIC") == 0) { + cos_q_param->q_sync = ODP_SCHED_SYNC_ATOMIC; + } else if (strcmp(tmp, "ODP_SCHED_SYNC_ORDERED") == 0) { + cos_q_param->q_sync = ODP_SCHED_SYNC_ORDERED; + } else { + ODPH_ERR("Invalid queue synchronization type: \"%s\".\n", tmp); + ret = -1; + goto out; + } + + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("CoS queue aggregator enqueue profile type missing.\n"); + ret = -1; + goto out; + } + + if (strcmp(tmp, "ODP_AEP_TYPE_NONE") == 0) { + cos_q_param->cos_enq_prof.type = ODP_AEP_TYPE_NONE; + } else if (strcmp(tmp, "ODP_AEP_TYPE_IPV4_FRAG") == 0) { + cos_q_param->cos_enq_prof.type = ODP_AEP_TYPE_IPV4_FRAG; + } else if (strcmp(tmp, "ODP_AEP_TYPE_IPV6_FRAG") == 0) { + cos_q_param->cos_enq_prof.type = ODP_AEP_TYPE_IPV6_FRAG; + } else if (strcmp(tmp, "ODP_AEP_TYPE_CUSTOM") == 0) { + cos_q_param->cos_enq_prof.type = ODP_AEP_TYPE_CUSTOM; + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("Custom aggregator enqueue profile param missing.\n"); + ret = -1; + goto out; + } + + cos_q_param->cos_enq_prof.param = (uintptr_t)strtoll(tmp, NULL, 16); + } else { + ODPH_ERR("Invalid queue aggregator enqueue profile type: \"%s\".\n", tmp); + ret = -1; + goto out; + } + + tmp = strtok(NULL, ":"); + + if (tmp != NULL) { + cos_q_param->q_aggr.max_tmo_ns = atoll(tmp); + tmp = strtok(NULL, ":"); + + if (tmp == NULL) { + ODPH_ERR("Malformed aggregator parameter format.\n"); + ret = -1; + goto out; + } + + cos_q_param->q_aggr.max_size = atoi(tmp); + cos_q_param->q_aggr.event_type = ODP_EVENT_PACKET; + cos_q_param->num_aggr = 1; + } + + appl_args->num_cos_q_param++; + appl_args->vector_size = ODPH_MAX(appl_args->vector_size, cos_q_param->q_aggr.max_size); + +out: + free(tmp_str); + + return ret; +} + static int parse_policy_ci_pass_count(appl_args_t *appl_args, char *optarg) { int num_ci_pass_rules; @@ -1170,6 +1401,7 @@ static int parse_args(int argc, char *argv[], appl_args_t *appl_args) {"count", required_argument, NULL, 'c'}, {"interface", required_argument, NULL, 'i'}, {"policy", required_argument, NULL, 'p'}, + {"cos_queue_param", required_argument, NULL, 'q'}, {"mode", required_argument, NULL, 'm'}, {"time", required_argument, NULL, 't'}, {"ci_pass", required_argument, NULL, 'C'}, @@ -1184,7 +1416,7 @@ static int parse_args(int argc, char *argv[], appl_args_t *appl_args) {NULL, 0, NULL, 0} }; - static const char *shortopts = "+c:t:i:p:m:t:C:Pvhe:l:d:s:b:"; + static const char *shortopts = "+c:t:i:p:q:m:t:C:Pvhe:l:d:s:b:"; appl_args->cpu_count = 1; /* Use one worker by default */ appl_args->verbose = 0; @@ -1212,6 +1444,12 @@ static int parse_args(int argc, char *argv[], appl_args_t *appl_args) break; } break; + case 'q': + if (parse_cos_q_param(appl_args, optarg)) { + ret = -1; + break; + } + break; case 't': appl_args->time = atoi(optarg); break; @@ -1316,7 +1554,9 @@ static void usage(void) "ODP Classifier example.\n" "Usage: odp_classifier OPTIONS\n" " E.g. odp_classifier -i eth1 -m 0 -p \"ODP_PMR_SIP_ADDR:10.10.10.0:0xFFFFFF00:queue1\" \\\n" - " -p \"ODP_PMR_SIP_ADDR:10.10.10.10:0xFFFFFFFF:queue1:queue2\"\n" + " -p \"ODP_PMR_SIP_ADDR:10.10.10.10:0xFFFFFFFF:queue1:queue2\" \\\n" + " -q \"queue1:1:ODP_SCHED_SYNC_PARALLEL:ODP_AEP_TYPE_NONE\" \\\n" + " -q \"queue2:3:ODP_SCHED_SYNC_ATOMIC:ODP_AEP_TYPE_CUSTOM:1234:0:4\" \\\n" "\n" "The above example would classify:\n" " 1) Packets from source IP address 10.10.10.0/24 to queue1, except ...\n" @@ -1339,7 +1579,26 @@ static void usage(void) " this is not defined.\n" " Name of the destination queue (CoS).\n" "\n" - " -c, --count CPU count, 0=all available, default=1\n" + " -q, --cos_queue_param ::::::\n" + "\n" + " Configuration for a queue (CoS) defined with '--policy' 'dst queue'.\n" + " CoS queue priority. Use priority value or \"default\" for default priority.\n" + " CoS queue synchronization. Use ODP_SCHED_SYNC_PARALLEL,\n" + " ODP_SCHED_SYNC_ATOMIC or ODP_SCHED_SYNC_ORDERED.\n" + " \n" + " Aggregator enqueue profile type for CoS. Use odp_aggr_enq_profile_t::type\n" + " names.\n" + " \n" + " If ODP_AEP_TYPE_CUSTOM aggregator enqueue profile was used,\n" + " pass a hex value (without '0x') for odp_aggr_enq_profile_t::param.\n" + " \n" + " Aggregator configuration parameter timeout in nanoseconds. Optional,\n" + " if not passed, aggregation for queue disabled.\n" + " \n" + " Aggregator configuration parameter vector size. Mandatory if 'optional tmo ns'\n" + " passed, optional otherwise.\n" + "\n"); + printf(" -c, --count CPU count, 0=all available, default=1\n" "\n" " -m, --mode 0: Packet Drop mode. Received packets will be dropped\n" " !0: Echo mode. Received packets will be sent back\n"