diff --git a/Server/src/redis/MsgBusImpl_redis.cpp b/Server/src/redis/MsgBusImpl_redis.cpp index 1e00203..092054e 100644 --- a/Server/src/redis/MsgBusImpl_redis.cpp +++ b/Server/src/redis/MsgBusImpl_redis.cpp @@ -6,7 +6,7 @@ #include #include - +#include #include #include @@ -102,92 +102,211 @@ void MsgBusImpl_redis::update_Peer(obj_bgp_peer &peer, obj_peer_up_event *up, ob /** * Abstract method Implementation - See MsgBusInterface.hpp for details */ -void MsgBusImpl_redis::update_unicastPrefix(obj_bgp_peer &peer, vector &rib, - obj_path_attr *attr, unicast_prefix_action_code code) { - if (attr == NULL) +void MsgBusImpl_redis::update_unicastPrefix(obj_bgp_peer &peer, + std::vector &rib, + obj_path_attr *attr, + unicast_prefix_action_code code) +{ + // Correlate concurrent invocations in logs + static std::atomic call_seq{0}; + const uint64_t call_id = ++call_seq; + + // Only enforce attr for ADD; DEL should proceed without it. + if (code == UNICAST_PREFIX_ACTION_ADD && attr == nullptr) { + LOG_INFO("MsgBusImpl_redis[%llu] update_unicastPrefix: ADD requested but attr==NULL (peer=%s) — ignoring", + (unsigned long long)call_id, peer.peer_addr); return; + } + + const char* actionStr = (code == UNICAST_PREFIX_ACTION_ADD) ? "add" : "del"; + + // Timestamp + hashes (to match Kafka line contents) + std::string ts, path_hash_str, peer_hash_str, router_hash_str; + getTimestamp(peer.timestamp_secs, peer.timestamp_us, ts); + if (attr) hash_toStr(attr->hash_id, path_hash_str); + hash_toStr(peer.hash_id, peer_hash_str); + hash_toStr(peer.router_hash_id, router_hash_str); + + LOG_INFO("MsgBusImpl_redis[%llu] update_unicastPrefix[start] action=%s peer=%s as=%u ts=%s rib_size=%zu isAdjIn=%d isPrePolicy=%d", + (unsigned long long)call_id, actionStr, peer.peer_addr, peer.peer_as, ts.c_str(), rib.size(), + (int)peer.isAdjIn, (int)peer.isPrePolicy); + + std::vector del_keys; + del_keys.reserve(rib.size()); + + size_t add_count = 0, del_count = 0; - vector del_keys; - string neigh = peer.peer_addr; - - for (size_t i = 0; i < rib.size(); i++) { - // Loop through the vector array of rib entries - vector addFieldValues; - addFieldValues.reserve(MAX_ATTRIBUTES_COUNT); - - // rib table schema as BGP_RIB_OUT_TABLE|192.181.168.0/25|10.0.0.59 - vector keys; - string redisMgr_pfx = rib[i].prefix; - redisMgr_pfx += "/"; - redisMgr_pfx += to_string(rib[i].prefix_len); - keys.reserve(MAX_ATTRIBUTES_COUNT); - keys.emplace_back(redisMgr_pfx); + // Small helper to keep previews short in logs + auto cut = [](const std::string& s)->std::string { + return s.size() > 160 ? (s.substr(0,160) + "...") : s; + }; + + for (size_t i = 0; i < rib.size(); ++i) { + // Compose "prefix/len" + std::string pfx_len = std::string(rib[i].prefix) + "/" + std::to_string(rib[i].prefix_len); + + // Redis key parts (prefix/len, neighbor) + std::vector keys; + keys.reserve(2); + keys.emplace_back(pfx_len); keys.emplace_back(peer.peer_addr); - switch (code) { - - case UNICAST_PREFIX_ACTION_ADD: - { - addFieldValues.emplace_back(make_pair("origin", string(attr->origin))); - addFieldValues.emplace_back(make_pair("as_path", attr->as_path)); - stringstream as_path_count; - as_path_count << attr->as_path_count; - addFieldValues.emplace_back(make_pair("as_path_count", as_path_count.str())); - stringstream origin_as; - origin_as << attr->origin_as; - addFieldValues.emplace_back(make_pair("origin_as", origin_as.str())); - addFieldValues.emplace_back(make_pair("next_hop", string(attr->next_hop))); - stringstream local_pref; - local_pref << attr->local_pref; - addFieldValues.emplace_back(make_pair("local_pref", local_pref.str())); - addFieldValues.emplace_back(make_pair("community_list", attr->community_list)); - addFieldValues.emplace_back(make_pair("ext_community_list", attr->ext_community_list)); - addFieldValues.emplace_back(make_pair("large_community_list", attr->large_community_list)); - addFieldValues.emplace_back(make_pair("originator_id", string(attr->originator_id))); - - for (const auto& fieldValue : addFieldValues) { - const std::string& field = std::get<0>(fieldValue); - const std::string& value = std::get<1>(fieldValue); - DEBUG("MsgBusImpl_redis update_unicastPrefix field = %s, value = %s", field.c_str(), value.c_str()); - } - if(peer.isAdjIn) - { - redisMgr_.WriteBMPTable(BMP_TABLE_RIB_IN, keys, addFieldValues); - } - else - { - redisMgr_.WriteBMPTable(BMP_TABLE_RIB_OUT, keys, addFieldValues); - } + // RIB hash (already computed by producer of obj_rib) + std::string rib_hash_str; hash_toStr(rib[i].hash_id, rib_hash_str); + + LOG_INFO("MsgBusImpl_redis[%llu] RIB[%zu] prefix=%s/%d isIPv4=%d path_id=%u labels=%s rib_hash=%s", + (unsigned long long)call_id, i, rib[i].prefix, rib[i].prefix_len, (int)rib[i].isIPv4, + (unsigned)rib[i].path_id, rib[i].labels, rib_hash_str.c_str()); + + if (code == UNICAST_PREFIX_ACTION_ADD) { + // Convert all fields to std::string safely (works for char[] or std::string sources) + const std::string origin = attr->origin; + const std::string as_path = attr->as_path; + const uint32_t as_path_count = attr->as_path_count; + const uint32_t origin_as = attr->origin_as; + const std::string next_hop = attr->next_hop; + const uint32_t med = attr->med; + const uint32_t local_pref = attr->local_pref; + const std::string aggregator = attr->aggregator; + const std::string community_list = attr->community_list; + const std::string ext_comm_list = attr->ext_community_list; + const std::string cluster_list = attr->cluster_list; + const int atomic_agg = (int)attr->atomic_agg; + const int nexthop_isIPv4 = (int)attr->nexthop_isIPv4; + const std::string originator_id = attr->originator_id; + const uint32_t path_id = rib[i].path_id; + const std::string labels = rib[i].labels; + const int isPrePolicy = (int)peer.isPrePolicy; + const int isAdjIn = (int)peer.isAdjIn; + const std::string large_comm_list = attr->large_community_list; + + // Kafka-parity RAW_LINE (router_ip removed; peer_addr included after peer_hash) + char raw[4096]; + int n = std::snprintf( + raw, sizeof(raw), + // action, seq, rib_hash, router_hash, path_hash, peer_hash, peer_addr, peer_as, ts, + // prefix, prefix_len, isIPv4, origin, as_path, as_path_count, origin_as, next_hop, + // med, local_pref, aggregator, community_list, ext_comm_list, cluster_list, + // atomic_agg, nexthop_isIPv4, originator_id, path_id, labels, isPrePolicy, isAdjIn, large_comm_list + "%s\t%llu\t%s\t%s\t%s\t%s\t%s\t%u\t%s\t%s\t%d\t%d\t%s\t%s\t%u\t%u\t%s\t%u\t%u\t%s\t%s\t%s\t%s\t%d\t%d\t%s\t%u\t%s\t%d\t%d\t%s", + actionStr, + (unsigned long long)0, /* seq if you track one */ + rib_hash_str.c_str(), + router_hash_str.c_str(), + path_hash_str.c_str(), + peer_hash_str.c_str(), + peer.peer_addr, + (unsigned)peer.peer_as, + ts.c_str(), + rib[i].prefix, + rib[i].prefix_len, + (int)rib[i].isIPv4, + origin.c_str(), + as_path.c_str(), + (unsigned)as_path_count, + (unsigned)origin_as, + next_hop.c_str(), + (unsigned)med, + (unsigned)local_pref, + aggregator.c_str(), + community_list.c_str(), + ext_comm_list.c_str(), + cluster_list.c_str(), + atomic_agg, + nexthop_isIPv4, + originator_id.c_str(), + (unsigned)path_id, + labels.c_str(), + isPrePolicy, + isAdjIn, + large_comm_list.c_str()); + + if (n < 0) { + LOG_INFO("MsgBusImpl_redis[%llu] RAW_LINE format error", (unsigned long long)call_id); + } else if ((size_t)n >= sizeof(raw)) { + LOG_INFO("MsgBusImpl_redis[%llu] RAW_LINE truncated (len=%d cap=%zu)", + (unsigned long long)call_id, n, sizeof(raw)); } - break; - - case UNICAST_PREFIX_ACTION_DEL: - { - string com_key; - if(peer.isAdjIn) - { - com_key = BMP_TABLE_RIB_IN; - } - else - { - com_key = BMP_TABLE_RIB_OUT; - } - com_key += redisMgr_.GetKeySeparator(); - com_key += redisMgr_pfx; - com_key += redisMgr_.GetKeySeparator(); - com_key += neigh; - del_keys.push_back(com_key); - } - break; + LOG_INFO("MsgBusImpl_redis[%llu] RAW_LINE[%zu]=%s", (unsigned long long)call_id, i, raw); + + // Compact preview (resilient to syslog truncation) + LOG_INFO("MsgBusImpl_redis[%llu] ADD preview origin='%s' as_path.len=%zu next_hop='%s' comm.len=%zu ext_comm.len=%zu cluster.len=%zu " + "labels.len=%zu large_comm.len=%zu", + (unsigned long long)call_id, + cut(origin).c_str(), as_path.size(), cut(next_hop).c_str(), + community_list.size(), ext_comm_list.size(), cluster_list.size(), + labels.size(), large_comm_list.size()); + + // Prepare Redis fields (parity with logged values) + std::vector fvs; + fvs.reserve(32); + fvs.emplace_back(std::make_pair("origin", origin)); + fvs.emplace_back(std::make_pair("as_path", as_path)); + fvs.emplace_back(std::make_pair("as_path_count", std::to_string(as_path_count))); + fvs.emplace_back(std::make_pair("origin_as", std::to_string(origin_as))); + fvs.emplace_back(std::make_pair("next_hop", next_hop)); + fvs.emplace_back(std::make_pair("med", std::to_string(med))); + fvs.emplace_back(std::make_pair("local_pref", std::to_string(local_pref))); + fvs.emplace_back(std::make_pair("aggregator", aggregator)); + fvs.emplace_back(std::make_pair("community_list", community_list)); + fvs.emplace_back(std::make_pair("ext_community_list", ext_comm_list)); + fvs.emplace_back(std::make_pair("cluster_list", cluster_list)); + fvs.emplace_back(std::make_pair("atomic_agg", std::to_string(atomic_agg))); + fvs.emplace_back(std::make_pair("nexthop_isIPv4", std::to_string(nexthop_isIPv4))); + fvs.emplace_back(std::make_pair("originator_id", originator_id)); + fvs.emplace_back(std::make_pair("large_community_list", large_comm_list)); + fvs.emplace_back(std::make_pair("peer_addr", peer.peer_addr)); + + // RIB/peer context + hashes + timestamp + fvs.emplace_back(std::make_pair("path_id", std::to_string(path_id))); + fvs.emplace_back(std::make_pair("labels", labels)); + fvs.emplace_back(std::make_pair("isIPv4", std::to_string((int)rib[i].isIPv4))); + fvs.emplace_back(std::make_pair("peer_as", std::to_string(peer.peer_as))); + fvs.emplace_back(std::make_pair("isPrePolicy", std::to_string(isPrePolicy))); + fvs.emplace_back(std::make_pair("isAdjIn", std::to_string(isAdjIn))); + fvs.emplace_back(std::make_pair("rib_hash", rib_hash_str)); + fvs.emplace_back(std::make_pair("path_hash", path_hash_str)); + fvs.emplace_back(std::make_pair("peer_hash", peer_hash_str)); + fvs.emplace_back(std::make_pair("router_hash", router_hash_str)); + fvs.emplace_back(std::make_pair("ts", ts)); + + const char* table = peer.isAdjIn ? BMP_TABLE_RIB_IN : BMP_TABLE_RIB_OUT; + LOG_INFO("MsgBusImpl_redis[%llu] ADD -> table=%s key=%s|%s fields_count=%zu", + (unsigned long long)call_id, table, pfx_len.c_str(), peer.peer_addr, fvs.size()); + LOG_INFO("MsgBusImpl_redis[%llu] ADD key_prefix=%s neighbor=%s rib_hash=%s path_hash=%s peer_hash=%s router_hash=%s", + (unsigned long long)call_id, pfx_len.c_str(), peer.peer_addr, + rib_hash_str.c_str(), path_hash_str.c_str(), peer_hash_str.c_str(), router_hash_str.c_str()); + + if (peer.isAdjIn) + redisMgr_.WriteBMPTable(BMP_TABLE_RIB_IN, keys, fvs); + else + redisMgr_.WriteBMPTable(BMP_TABLE_RIB_OUT, keys, fvs); + + ++add_count; + } else { + std::string com_key = peer.isAdjIn ? BMP_TABLE_RIB_IN : BMP_TABLE_RIB_OUT; + com_key += redisMgr_.GetKeySeparator() + pfx_len; + com_key += redisMgr_.GetKeySeparator() + peer.peer_addr; + LOG_INFO("MsgBusImpl_redis[%llu] DEL -> key=%s", (unsigned long long)call_id, com_key.c_str()); + del_keys.push_back(com_key); + ++del_count; } } if (!del_keys.empty()) { + LOG_INFO("MsgBusImpl_redis[%llu] RemoveEntityFromBMPTable count=%zu", + (unsigned long long)call_id, del_keys.size()); redisMgr_.RemoveEntityFromBMPTable(del_keys); } + + LOG_INFO("MsgBusImpl_redis[%llu] update_unicastPrefix[done] action=%s adds=%zu dels=%zu", + (unsigned long long)call_id, actionStr, add_count, del_count); } + + + /** * Abstract method Implementation - See MsgBusInterface.hpp for details */