From fcd1683c1acc6dc2e891b0a0173daaf2fed2adfc Mon Sep 17 00:00:00 2001 From: Sweta Vooda Date: Mon, 29 Apr 2024 05:37:16 +0000 Subject: [PATCH] handle null and zero vector insertion to pinecone --- src/remote/clients/pinecone/pinecone.c | 9 ++++++--- src/remote/clients/pinecone/pinecone_api.c | 4 ++-- src/remote/clients/pinecone/pinecone_api.h | 2 +- src/remote/clients/pinecone/pinecone_utils.c | 8 ++++++++ src/remote/remote_scan.c | 2 +- 5 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/remote/clients/pinecone/pinecone.c b/src/remote/clients/pinecone/pinecone.c index fd3f9bc1..be2edf03 100644 --- a/src/remote/clients/pinecone/pinecone.c +++ b/src/remote/clients/pinecone/pinecone.c @@ -86,14 +86,14 @@ char* pinecone_create_host_from_spec(int dimensions, VectorMetric metric, char* char* pinecone_index_name = palloc(20); sprintf(pinecone_index_name, "pgvr-%u", index->rd_id); // TODO: remote index name - create_response = remote_create_index(pinecone_api_key, pinecone_index_name, dimensions, remote_metric_name, spec_json); + create_response = pinecone_create_index(pinecone_api_key, pinecone_index_name, dimensions, remote_metric_name, spec_json); host = cJSON_GetStringValue(cJSON_GetObjectItemCaseSensitive(create_response, "host")); // now we wait until the remote index is done initializing // todo: timeout and error handling // we don't want to ping pinecone.io with describe_index because pinecone.io may be aware of the index before // the host is actually live. We poll the host for liveness with get_index_stats instead. index_stats = remote_get_index_stats(pinecone_api_key, host); - while (index_stats == NULL) { + while (index_stats == NULL || cJSON_GetObjectItemCaseSensitive(index_stats, "totalVectorCount")==NULL) { elog(DEBUG1, "Waiting for remote index to initialize..."); pg_usleep(1000000); // 1 second index_stats = remote_get_index_stats(pinecone_api_key, host); @@ -232,11 +232,13 @@ ItemPointerData* pinecone_query_with_fetch(char* host, int top_k, PreparedQuery int n_results; ItemPointerData* fetched_ctids = pinecone_extract_ctids_from_fetch_response(fetch_response, &n_results); // 2. return the best (first) checkpoint which is among the fetched - for (int i = 0; i < n_checkpoints; i++) { + bool br=false; + for (int i = 0; i < n_checkpoints && !br; i++) { RemoteCheckpoint checkpoint = checkpoints[i]; for (int j = 0; j < n_results; j++) { if (ItemPointerEquals(&checkpoint.tid, &fetched_ctids[j])) { *best_checkpoint_return = checkpoint; + br=true; break; } } @@ -268,6 +270,7 @@ void pinecone_append_prepare_bulk_insert(PreparedBulkInsert prepared_vectors, Tu cJSON* json_vectors = (cJSON*) prepared_vectors; char* vector_id = pinecone_id_from_heap_tid(*ctid); cJSON* json_vector = tuple_get_remote_vector(tupdesc, values, nulls, vector_id); + if(json_vector==NULL) return; cJSON_AddItemToArray(json_vectors, json_vector); } void pinecone_end_prepare_bulk_insert(PreparedBulkInsert prepared_vectors) { diff --git a/src/remote/clients/pinecone/pinecone_api.c b/src/remote/clients/pinecone/pinecone_api.c index ed5775da..2d6968e0 100644 --- a/src/remote/clients/pinecone/pinecone_api.c +++ b/src/remote/clients/pinecone/pinecone_api.c @@ -125,7 +125,7 @@ cJSON* describe_index(const char *api_key, const char *index_name) { cJSON* remote_get_index_stats(const char *api_key, const char *index_host) { cJSON* resp; char url[100] = "https://"; strcat(url, index_host); strcat(url, "/describe_index_stats"); - resp = generic_remote_request(api_key, url, "GET", NULL, true); + resp = generic_remote_request(api_key, url, "GET", NULL, false); return resp; } @@ -170,7 +170,7 @@ cJSON* remote_list_vectors(const char *api_key, const char *index_host, int limi * pod: environment, replicas, pod_type, pods, shards, metadata_config * Refer to https://docs.pinecone.io/reference/create_index */ -cJSON* remote_create_index(const char *api_key, const char *index_name, const int dimension, const char *metric, cJSON *spec) { +cJSON* pinecone_create_index(const char *api_key, const char *index_name, const int dimension, const char *metric, cJSON *spec) { cJSON *request = cJSON_CreateObject(); cJSON_AddItemToObject(request, "name", cJSON_CreateString(index_name)); cJSON_AddItemToObject(request, "dimension", cJSON_CreateNumber(dimension)); diff --git a/src/remote/clients/pinecone/pinecone_api.h b/src/remote/clients/pinecone/pinecone_api.h index 188ff7f2..dc632d2f 100644 --- a/src/remote/clients/pinecone/pinecone_api.h +++ b/src/remote/clients/pinecone/pinecone_api.h @@ -21,7 +21,7 @@ cJSON* remote_delete_vectors(const char *api_key, const char *index_host, cJSON cJSON* remote_delete_index(const char *api_key, const char *index_name); cJSON* remote_delete_all(const char *api_key, const char *index_host); cJSON* remote_list_vectors(const char *api_key, const char *index_host, int limit, char* pagination_token); -cJSON* remote_create_index(const char *api_key, const char *index_name, const int dimension, const char *metric, cJSON *spec); +cJSON* pinecone_create_index(const char *api_key, const char *index_name, const int dimension, const char *metric, cJSON *spec); cJSON** remote_query_with_fetch(const char *api_key, const char *index_host, cJSON* request_body, bool with_fetch, cJSON* fetch_ids); cJSON* remote_bulk_upsert(const char *api_key, const char *index_host, cJSON *vectors, int batch_size); CURL* get_remote_query_handle(const char *api_key, const char *index_host, cJSON* request_body, ResponseData* response_data); diff --git a/src/remote/clients/pinecone/pinecone_utils.c b/src/remote/clients/pinecone/pinecone_utils.c index f9c9fc89..10454a9d 100644 --- a/src/remote/clients/pinecone/pinecone_utils.c +++ b/src/remote/clients/pinecone/pinecone_utils.c @@ -34,7 +34,15 @@ cJSON* tuple_get_remote_vector(TupleDesc tup_desc, Datum *values, bool *isnull, cJSON *metadata = cJSON_CreateObject(); Vector *vector; cJSON *json_values; + if(isnull[0]){ + elog(DEBUG1, "Cannot insert NULL vectors to pinecone."); + return NULL; + } vector = DatumGetVector(values[0]); + if(vector_eq_zero_internal(vector)) { + elog(DEBUG1, "Cannot insert zero vectors to pinecone."); + return NULL; + } json_values = cJSON_CreateFloatArray(vector->x, vector->dim); // prepare metadata for (int i = 1; i < tup_desc->natts; i++) // skip the first column which is the vector diff --git a/src/remote/remote_scan.c b/src/remote/remote_scan.c index 9eb9619a..afd3e519 100644 --- a/src/remote/remote_scan.c +++ b/src/remote/remote_scan.c @@ -230,7 +230,7 @@ bool remote_gettuple(IndexScanDesc scan, ScanDirection dir) if (ItemPointerCompare(tid, new_tid) == 0) { // if the next tuple is the same as the current tuple, then we need to skip // N.B. we don't need a loop because the same tid appears at most twice in the sortstate - elog(DEBUG1, "Skipping duplicate tuple, found in both local buffer and remote index"); + elog(DEBUG1, "Skipping duplicate tuple %d:%d, found in both local buffer and remote index", ItemPointerGetBlockNumber(tid),ItemPointerGetOffsetNumber(tid)); so->more_tuples = tuplesort_gettupleslot(so->tid_sortstate, true, false, so->tid_sortslot, NULL); } }