Skip to content

Commit 6459e05

Browse files
authored
Merge pull request #337 from Meesho/main
Version bump
2 parents d7fb9b7 + 7d50829 commit 6459e05

9 files changed

Lines changed: 127 additions & 30 deletions

File tree

horizon/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v1.3.0
1+
v1.4.0

horizon/internal/skye/handler/skye.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ func (s *skyeConfig) RegisterVariant(request VariantRegisterRequest) (RequestSta
469469
return fmt.Errorf("model with name '%s' does not exist for entity '%s'", variantPayload.Model, variantPayload.Entity)
470470
}
471471

472-
if models.Models[variantPayload.Model].ModelType != enums.ModelType(enums.DELTA) && variantPayload.OTDTrainingDataPath == "" {
472+
if models.Models[variantPayload.Model].ModelType == enums.ModelType(enums.DELTA) && variantPayload.OTDTrainingDataPath == "" {
473473
return fmt.Errorf("otd_training_data_path is required for DELTA model type")
474474
}
475475

quick-start/docker-compose.yml

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ services:
461461
# Variants list
462462
- VARIANTS_LIST=ads_gold,ads_mall,ads_new_catalog,ads,ct_gst,ct_high_asp,ct_non_gst,organic_gold,organic,organic_gst,organic_high_asp,organic_mall,organic_melp,organic_non_gst,widget_ads,organic_gst_pd
463463
# MQ configuration
464-
- MQ_ID_TOPICS_MAPPING=2450:skye.embeddings-1
464+
- MQ_ID_TOPICS_MAPPING=2450:skye.embedding
465465
# Horizon to Skye ScyllaDB configuration
466466
- HORIZON_TO_SKYE_SCYLLA_CONF_ID_MAP=2:1
467467
- SCYLLA_2_CONTACT_POINTS=scylla
@@ -651,9 +651,14 @@ services:
651651
onfs-network:
652652
ipv4_address: 172.18.0.20
653653
depends_on:
654-
- etcd
655-
- kafka
656-
- qdrant
654+
etcd:
655+
condition: service_healthy
656+
qdrant:
657+
condition: service_healthy
658+
kafka:
659+
condition: service_healthy
660+
kafka-init:
661+
condition: service_completed_successfully
657662
restart: unless-stopped
658663

659664
skye-admin-healthcheck:
@@ -714,18 +719,25 @@ services:
714719
# Etcd
715720
- ETCD_SERVER=etcd:2379
716721
- ETCD_WATCHER_ENABLED=true
722+
# Admin HTTP client (used by embedding consumer for e.g. trigger indexing)
723+
- ADMIN_HOST=skye-admin
724+
- ADMIN_PORT=8092
725+
- ADMIN_TIMEOUT_IN_MS=30000
726+
- ADMIN_MAX_IDLE_CONNS=100
727+
- ADMIN_MAX_IDLE_CONNS_PER_HOST=100
728+
- ADMIN_IDLE_CONN_TIMEOUT_IN_MS=30000
717729
# Embedding consumer (ID=2)
718730
- EMBEDDING_CONSUMER_KAFKA_IDS=2
719731
- KAFKA_2_TOPICS=skye.embedding
720732
- KAFKA_2_BOOTSTRAP_SERVERS=broker:29092
721733
- KAFKA_2_BASIC_AUTH_CREDENTIAL_SOURCE=NONE
722-
- KAFKA_2_GROUP_ID=skye-embedding-consumer
734+
- KAFKA_2_GROUP_ID=skye-embedding-consumer-v2
723735
- KAFKA_2_AUTO_OFFSET_RESET=earliest
724736
- KAFKA_2_AUTO_COMMIT_INTERVAL_MS=5000
725737
- KAFKA_2_ENABLE_AUTO_COMMIT=false
726738
- KAFKA_2_LISTENER_CONCURRENCY=1
727-
- KAFKA_2_CLIENT_ID=skye-embedding-consumer
728-
- KAFKA_2_BATCH_SIZE=10
739+
- KAFKA_2_CLIENT_ID=skye-embedding-consumer-v2
740+
- KAFKA_2_BATCH_SIZE=1
729741
- KAFKA_2_POLL_TIMEOUT=1000
730742
# Embedding sequence consumer (ID=3)
731743
- EMBEDDING_CONSUMER_SEQUENCE_KAFKA_IDS=3
@@ -776,6 +788,11 @@ services:
776788
- KAFKA_7_CLIENT_ID=skye-realtime-delta-consumer
777789
- KAFKA_7_BATCH_SIZE=10
778790
- KAFKA_7_POLL_TIMEOUT=1000
791+
# Aggregator DB (Scylla) - required for embedding consumer when entity store uses conf_id=1
792+
- STORAGE_AGGREGATOR_DB_COUNT=1
793+
- STORAGE_AGGREGATOR_DB_1_CONTACT_POINTS=scylla
794+
- STORAGE_AGGREGATOR_DB_1_PORT=9042
795+
- STORAGE_AGGREGATOR_DB_1_KEYSPACE=skye
779796
networks:
780797
onfs-network:
781798
ipv4_address: 172.18.0.23
@@ -788,6 +805,10 @@ services:
788805
condition: service_healthy
789806
kafka-init:
790807
condition: service_completed_successfully
808+
skye-admin:
809+
condition: service_started
810+
scylla:
811+
condition: service_healthy
791812
restart: unless-stopped
792813

793814
skye-consumers-healthcheck:

quick-start/stop.sh

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,24 @@ remove_containers() {
4848
echo "✅ Containers removed"
4949
}
5050

51+
clean_skye_etcd() {
52+
echo "🧹 Clearing Skye config from etcd..."
53+
if docker ps -q -f name=^etcd$ | grep -q . 2>/dev/null && docker network ls -q | grep -q "onfs-network" 2>/dev/null; then
54+
if docker run --rm --network onfs-network quay.io/coreos/etcd:v3.5.12 etcdctl --endpoints=http://etcd:2379 del --prefix "/config/skye" 2>/dev/null; then
55+
echo "✅ Skye etcd keys removed"
56+
else
57+
echo "⚠️ Skye etcd cleanup skipped (etcd not reachable or no keys)"
58+
fi
59+
else
60+
echo "⚠️ etcd not running or network missing, skipping Skye etcd cleanup"
61+
fi
62+
}
63+
5164
remove_volumes() {
5265
echo "💾 Removing persistent volumes..."
5366

54-
# Remove named volumes
55-
VOLUMES=("scylla-data" "mysql-data" "kafka-data")
67+
# Remove named volumes (includes Skye: mysql Horizon/Skye tables, scylla skye keyspace, qdrant)
68+
VOLUMES=("scylla-data" "mysql-data" "kafka-data" "qdrant-data")
5669
for volume in "${VOLUMES[@]}"; do
5770
if docker volume ls -q | grep -q "^${volume}$"; then
5871
echo "🗑️ Removing volume: $volume"
@@ -120,7 +133,7 @@ show_status() {
120133
fi
121134

122135
# Check for volumes
123-
EXISTING_VOLUMES=$(docker volume ls -q | grep -E "scylla-data|mysql-data|kafka-data" 2>/dev/null || true)
136+
EXISTING_VOLUMES=$(docker volume ls -q | grep -E "scylla-data|mysql-data|kafka-data|qdrant-data" 2>/dev/null || true)
124137
if [ -n "$EXISTING_VOLUMES" ]; then
125138
echo "💾 Existing volumes:"
126139
echo "$EXISTING_VOLUMES" | sed 's/^/ • /'
@@ -144,6 +157,7 @@ if [ "$PURGE_FLAG" = "--purge" ]; then
144157
echo "⏳ Starting in 3 seconds... (Ctrl+C to cancel)"
145158
sleep 3
146159

160+
clean_skye_etcd
147161
stop_services
148162
remove_containers
149163
remove_images

quick-start/test-skye.sh

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,21 @@
33
# Skye End-to-End Test Script
44
# =============================================================================
55
# Tests the full Skye flow:
6-
# 1. Health checks on all Skye services
6+
# 1. Health checks on all Skye services (admin, serving, qdrant, consumers)
77
# 2. Register store, frequency, entity, model, variant via skye-admin
88
# 3. Create Qdrant collection directly
99
# 4. Insert test vectors into Qdrant
10-
# 5. Query similar candidates via skye-serving (gRPC)
10+
# 5. Send 3 embedding events to Kafka (skye-consumers consume them)
11+
# 6. Verify Qdrant search
12+
# 7. Query similar candidates via skye-serving (gRPC)
1113
# =============================================================================
1214

1315
ADMIN_URL="http://localhost:8092"
1416
SERVING_URL="localhost:8094"
1517
QDRANT_URL="http://localhost:6333"
18+
CONSUMERS_URL="http://localhost:8093"
19+
BROKER_CONTAINER="${BROKER_CONTAINER:-broker}"
20+
SKYE_EMBEDDING_TOPIC="${SKYE_EMBEDDING_TOPIC:-skye.embedding}"
1621

1722
GREEN='\033[0;32m'
1823
RED='\033[0;31m'
@@ -52,6 +57,7 @@ info "Checking service health..."
5257
curl -sf "${ADMIN_URL}/health" > /dev/null && pass "skye-admin is healthy" || fail "skye-admin is not reachable at ${ADMIN_URL}"
5358
curl -sf "http://${SERVING_URL}/health/self" > /dev/null && pass "skye-serving is healthy" || fail "skye-serving is not reachable at ${SERVING_URL}"
5459
curl -sf "${QDRANT_URL}/healthz" > /dev/null && pass "Qdrant is healthy" || fail "Qdrant is not reachable at ${QDRANT_URL}"
60+
curl -sf "${CONSUMERS_URL}/health" > /dev/null && pass "skye-consumers is healthy" || fail "skye-consumers is not reachable at ${CONSUMERS_URL}"
5561

5662
echo ""
5763

@@ -94,7 +100,7 @@ admin_post "Register model" "/api/v1/model/register-model" '{
94100
},
95101
"model_type": "RESET",
96102
"kafka_id": 0,
97-
"training_data_path": "",
103+
"training_data_path": "gs://test",
98104
"metadata": {
99105
"entity": "test-products",
100106
"key-type": "product_id"
@@ -114,8 +120,8 @@ admin_post "Register variant" "/api/v1/model/register-variant" '{
114120
"variant": "v1",
115121
"vector_db_type": "QDRANT",
116122
"vector_db_config": {
117-
"read_host": "qdrant:6334",
118-
"write_host": "qdrant:6334",
123+
"read_host": "172.18.0.3",
124+
"write_host": "172.18.0.3",
119125
"port": "6334",
120126
"http2config": {
121127
"deadline": 5000,
@@ -182,7 +188,31 @@ echo " Response: ${RESP}"
182188
pass "Test vectors inserted (upserted)"
183189

184190
# ---------------------------------------------------------------------------
185-
# Step 8: Verify vectors via Qdrant search (sanity check)
191+
# Step 8: Send 3 embedding events to Kafka (skye-consumers will consume them)
192+
# ---------------------------------------------------------------------------
193+
info "Sending 3 embedding events to topic ${SKYE_EMBEDDING_TOPIC}..."
194+
# Minimal valid event JSON (one line each) for entity=test-products, model=product-embeddings, vector_dim=4
195+
EVT1='{"candidate_id":"1","entity":"test-products","model_name":"product-embeddings","environment":"local","embedding_store_version":1,"partition":"","index_space":{"embedding":[0.1,0.2,0.3,0.4],"variants_version_map":{"v1":1},"variants_index_map":{"v1":true},"operation":"ADD","payload":{"portfolio_id":"0"}},"search_space":{"embedding":[0.1,0.2,0.3,0.4]}}'
196+
EVT2='{"candidate_id":"2","entity":"test-products","model_name":"product-embeddings","environment":"local","embedding_store_version":1,"partition":"","index_space":{"embedding":[0.2,0.3,0.4,0.5],"variants_version_map":{"v1":1},"variants_index_map":{"v1":true},"operation":"ADD","payload":{"portfolio_id":"0"}},"search_space":{"embedding":[0.2,0.3,0.4,0.5]}}'
197+
EVT3='{"candidate_id":"3","entity":"test-products","model_name":"product-embeddings","environment":"local","embedding_store_version":1,"partition":"","index_space":{"embedding":[0.9,0.8,0.7,0.6],"variants_version_map":{"v1":1},"variants_index_map":{"v1":true},"operation":"ADD","payload":{"portfolio_id":"0"}},"search_space":{"embedding":[0.9,0.8,0.7,0.6]}}'
198+
199+
if docker exec -i "${BROKER_CONTAINER}" /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server broker:29092 --topic "${SKYE_EMBEDDING_TOPIC}" 2>/dev/null <<EOF
200+
${EVT1}
201+
${EVT2}
202+
${EVT3}
203+
EOF
204+
then
205+
pass "Produced 3 messages to ${SKYE_EMBEDDING_TOPIC}"
206+
else
207+
fail "Failed to produce messages (is container ${BROKER_CONTAINER} running?)"
208+
fi
209+
210+
info "Waiting 5s for skye-consumers to process..."
211+
sleep 5
212+
pass "Check skye-consumers logs for 'Processing N embedding events' to confirm consumption"
213+
214+
# ---------------------------------------------------------------------------
215+
# Step 9: Verify vectors via Qdrant search (sanity check)
186216
# ---------------------------------------------------------------------------
187217
info "Verifying Qdrant search works..."
188218
RESP=$(curl -s -X POST "${QDRANT_URL}/collections/${COLLECTION_NAME}/points/search" \
@@ -196,11 +226,10 @@ echo " Response: ${RESP}"
196226
pass "Qdrant search verified"
197227

198228
# ---------------------------------------------------------------------------
199-
# Step 9: Query via skye-serving gRPC
229+
# Step 10: Query via skye-serving gRPC
200230
# ---------------------------------------------------------------------------
201231
info "Querying similar candidates via skye-serving gRPC..."
202232

203-
# Check if grpcurl is available
204233
if command -v grpcurl &> /dev/null; then
205234
RESP=$(grpcurl -plaintext \
206235
-H "skye-caller-id: test-script" \
@@ -217,9 +246,7 @@ if command -v grpcurl &> /dev/null; then
217246
echo " Response: ${RESP}"
218247
pass "gRPC query completed"
219248
else
220-
echo -e "${YELLOW} ⚠️ grpcurl not installed. Install it to test gRPC:${NC}"
221-
echo " brew install grpcurl"
222-
echo ""
249+
warn "grpcurl not installed. Install it to test gRPC: brew install grpcurl"
223250
echo " Then run manually:"
224251
echo ' grpcurl -plaintext -H "skye-caller-id: test" -H "skye-auth-token: test" -d '"'"'{"entity":"test-products","modelName":"product-embeddings","variant":"v1","limit":3,"embeddings":[{"embedding":[0.1,0.2,0.3,0.4]}]}'"'"' localhost:8094 SkyeSimilarCandidateService/getSimilarCandidates'
225252
fi

skye/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v1.0.0
1+
v1.1.0

skye/internal/consumers/listener/embedding.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func ProcessEmbeddingEvents(record []skafka.ConsumerRecord[string, []byte], c *k
2828
"environment", event.Environment})
2929
events = append(events, event)
3030
}
31+
log.Info().Msgf("Processing %d embedding events", len(events))
3132

3233
err := embeddingConsumer.Process(events)
3334
if err != nil {

skye/internal/consumers/listener/embedding/embedding.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ func (e *EmbeddingConsumer) produceFailureEvents(failedEvents []Event) {
6565
continue
6666
}
6767
failureProducerKafkaId := modelConf.FailureProducerKafkaId
68+
if failureProducerKafkaId == 0 {
69+
log.Debug().Msg("Skipping failure topic produce (failure_producer_kafka_id=0)")
70+
continue
71+
}
6872
skafka.InitProducer(failureProducerKafkaId) // idempotent — ensures producer exists for this dynamic ID
6973
jsonBytes, err := json.Marshal(failedEvent)
7074
if err != nil {
@@ -301,15 +305,23 @@ func (e *EmbeddingConsumer) shouldIndex(event Event, variant string, aggregatorD
301305
log.Error().Msgf("Error getting variant config for entity %s, model %s, variant %s: %v", event.Entity, event.Model, variant, err)
302306
return false, err
303307
}
308+
if variantConfig == nil {
309+
return false, fmt.Errorf("variant config is nil for entity %s model %s variant %s", event.Entity, event.Model, variant)
310+
}
304311
aggregatorFilters := variantConfig.Filter
305312
if aggregatorFilters == nil {
306313
return true, nil
307314
}
315+
if aggregatorData == nil {
316+
aggregatorData = make(map[string]interface{})
317+
}
308318
for _, criteria := range aggregatorFilters {
309319
for _, filter := range criteria {
310320
filterData := filter.DefaultValue
311-
if dataValue, exists := aggregatorData[filter.ColumnName]; exists {
312-
filterData = dataValue.(string)
321+
if dataValue, exists := aggregatorData[filter.ColumnName]; exists && dataValue != nil {
322+
if s, ok := dataValue.(string); ok {
323+
filterData = s
324+
}
313325
}
314326
if filterData != filter.FilterValue {
315327
return false, nil
@@ -325,6 +337,9 @@ func (e *EmbeddingConsumer) preparePayloadIndexMap(event Event, rtColumns map[st
325337
log.Error().Msgf("Error getting variant config for entity %s, model %s, variant %s: %v", event.Entity, event.Model, variant, err)
326338
return nil, err
327339
}
340+
if variantConfig == nil || variantConfig.VectorDbConfig.Payload == nil {
341+
return make(map[string]interface{}), nil
342+
}
328343
variantPayload := variantConfig.VectorDbConfig.Payload
329344
payloadIndexMap := make(map[string]interface{})
330345
for key, variantPayloadValue := range variantPayload {

skye/pkg/kafka/kafka.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ const (
2828
clientId = "client.id"
2929
)
3030

31+
// splitAndTrimTopics splits a comma-separated topic list and trims spaces (e.g. "a, b" -> ["a", "b"]).
32+
func splitAndTrimTopics(topicsStr string) []string {
33+
parts := strings.Split(topicsStr, ",")
34+
out := make([]string, 0, len(parts))
35+
for _, p := range parts {
36+
if t := strings.TrimSpace(p); t != "" {
37+
out = append(out, t)
38+
}
39+
}
40+
return out
41+
}
42+
3143
// BatchHandler processes a batch of raw Kafka messages.
3244
// Return nil on success (processBatch will commit); return error to trigger seek-back.
3345
type BatchHandler func(msgs []*kafka.Message, c *kafka.Consumer) error
@@ -53,6 +65,8 @@ func StartConsumers(kafkaIds string, consumerName string, handler BatchHandler)
5365
log.Error().Err(err).Msgf("Failed to build kafka config for %s (kafkaId=%s)", consumerName, kafkaId)
5466
continue
5567
}
68+
log.Info().Str("topic", cfg.Topics).Str("bootstrap", cfg.BootstrapURLs).Str("group", cfg.GroupID).
69+
Msgf("Starting %s consumer kafkaId=%s (subscribe to topic)", consumerName, kafkaId)
5670
kl := NewKafkaListener(cfg, handler)
5771
kl.Init()
5872
kl.Consume()
@@ -95,9 +109,13 @@ func (k *KafkaListener) Init() {
95109
if err != nil {
96110
log.Panic().Err(err).Msg("Failed to create Kafka consumer.")
97111
}
98-
err = consumer.SubscribeTopics([]string{k.kafkaConfig.Topics}, nil)
112+
topics := splitAndTrimTopics(k.kafkaConfig.Topics)
113+
if len(topics) == 0 {
114+
topics = []string{strings.TrimSpace(k.kafkaConfig.Topics)}
115+
}
116+
err = consumer.SubscribeTopics(topics, nil)
99117
if err != nil {
100-
log.Panic().Err(err).Msgf("Failed to subscribe to topic %s", k.kafkaConfig.Topics)
118+
log.Panic().Err(err).Msgf("Failed to subscribe to topics %v", topics)
101119
}
102120
k.consumers = append(k.consumers, consumer)
103121
}
@@ -147,7 +165,7 @@ func (k *KafkaListener) Consume() {
147165

148166
case <-flushTimer.C:
149167
if msgCount > 0 {
150-
log.Debug().Msgf("Processing %d messages due to timeout", msgCount)
168+
log.Info().Int("msgCount", msgCount).Msg("Flushing batch due to timeout")
151169
k.processBatch(consumer, messages)
152170
msgCount = 0
153171
messages = messages[:0]
@@ -165,12 +183,13 @@ func (k *KafkaListener) Consume() {
165183
"group:" + k.kafkaConfig.GroupID,
166184
"client:" + k.kafkaConfig.ClientID,
167185
})
186+
log.Info().Str("topic", *e.TopicPartition.Topic).Int32("partition", e.TopicPartition.Partition).Msg("Kafka message received")
168187

169188
messages = append(messages, e)
170189
msgCount++
171190

172191
if msgCount == k.kafkaConfig.BatchSize {
173-
log.Debug().Msgf("Processing batch of %d messages", msgCount)
192+
log.Info().Int("msgCount", msgCount).Msg("Processing batch (batch full)")
174193
k.processBatch(consumer, messages)
175194
msgCount = 0
176195
messages = messages[:0]

0 commit comments

Comments
 (0)