Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 36 additions & 46 deletions fulltext/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,20 @@

import pandas as pd
import requests
from datasets import Dataset, concatenate_datasets, load_dataset
from tqdm import tqdm
from datasets import Dataset, load_dataset


def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("--input_dataset", type=str, default="HuggingFaceTB/bisac_expanded_final")
parser.add_argument("--n_topics", type=int, default=1000)
parser.add_argument("--n_pages", type=int, default=20)
parser.add_argument("--save_interval", type=int, default=1000)
parser.add_argument("--target_datadet_prefix", type=str, default="HuggingFaceTB/search_default")
parser.add_argument("--n_topics", type=int, default=-1)
parser.add_argument("--n_pages", type=int, default=1000)
parser.add_argument("--save_interval", type=int, default=50)
parser.add_argument("--target_datadet_prefix", type=str, default="bisac_boosted_new_index")
parser.add_argument("--shuffle_seed", type=int, default=0)
return parser.parse_args()


def load_and_merge(directory_path, id, args):
"""To avoid datasets pyarrow overflow:
1- load each json chunk in a pandas dataframe
2- explode the topic_hits column
3- convert each chunk to datasets and concatenate the subsets"""
json_files = [file for file in os.listdir(directory_path) if file.endswith('.json')]
print(f"Found {len(json_files)} chunks (={args.n_topics / args.save_interval})")

print("Loading the chunks...")
data_subsets = []
for file in tqdm(json_files):
file_path = os.path.join(directory_path, file)
with open(file_path, 'r') as f:
data = json.load(f)
df = pd.DataFrame(data).explode("topic_hits")
data_subsets.append(Dataset.from_pandas(df))

print("Merging the chunks...")
merged_data = concatenate_datasets(data_subsets).remove_columns(["__index_level_0__"])
print(merged_data)
print(f"Sanity check on the pages from topic 0: {merged_data[0]['topic_hits']}")
merged_data.push_to_hub(f"{args.target_datadet_prefix}_{id}", private=True)
print(f"Done! The data is available at '{args.target_datadet_prefix}_{id}' 🔥")


# wait until the server is up
while True:
try:
Expand All @@ -60,27 +35,40 @@ def load_and_merge(directory_path, id, args):

args = get_args()
data = load_dataset(args.input_dataset, split="train", cache_dir="/scratch/cosmo/.cache")
data = data.shuffle(0).select(range(args.n_topics))

data = data if args.n_topics < 0 else data.shuffle(args.shuffle_seed).select(range(args.n_topics))
print(f"Dataset with topics: {data}")

# path where the data chunks will be saved
id = f"{args.n_topics}t_{args.n_pages}p"
os.makedirs(f"intermediate_data/{id}", exist_ok=True)
data_path = f"{args.target_datadet_prefix}/{args.n_topics}t_{args.n_pages}p/data"
os.makedirs(data_path, exist_ok=True)

intermediate_data = []
total_length = len(data)
for index in range(len(data)):
sample = data[index]
sample["topic_hits"] = []
query = " / ".join([sample["top_category"].strip(), sample["subcategory"].strip(), sample["subtopic"].strip(),])
top_category = sample["top_category"].strip()
subcategory = sample["subcategory"].strip()
subtopic = sample["subtopic"].strip()
for c in ['!', '"', '$', "'", '(', ')', '/', '<', '@', '\\', '^', '|', '~']:
top_category = top_category.replace(c, ' ')
subcategory = subcategory.replace(c, ' ')
subtopic = subtopic.replace(c, ' ')
# boosting the IDF score of subtopic tokens
subtopic = " ".join([w + "^2" for w in subtopic.split()])
query = " ".join([top_category, subcategory, subtopic])
while True:
try:
max_pages = 10_000
max_pages = 3_000
print(f"n_pages requested: {args.n_pages}, max_pages: {max_pages}")
response = requests.post(
"http://127.0.0.1:9308/search",
data=json.dumps(
{
"index": "fineweb",
"size": args.n_pages,
"query": {"match": {"content": query}},
"query": {"query_string": query},
"max_matches": max_pages,
}
),
Expand All @@ -93,23 +81,25 @@ def load_and_merge(directory_path, id, args):
else:
hits = response.json()["hits"]["hits"]
sample["topic_hits"] = hits
print(f"Number pages retrieved: {len(hits)}")
print(f"Number pages retrieved: {len(hits)} for query {query}")
break
except requests.exceptions.ConnectionError as e:
print(e, file=sys.stderr)
time.sleep(5)
continue
intermediate_data.append(sample)

# Save data every save_interval topics and reinitialize intermediate dicts
save_interval = min(args.n_topics, args.save_interval)
if index > 0 and (index + 1) % save_interval == 0:
# Save data every save_interval topics and reinitialize intermediate dicts
save_path = f"intermediate_data/{id}/data_{index + 1 - save_interval}_{index + 1}.json"
with open(save_path, "w") as fp:
json.dump(intermediate_data, fp)
if (index > 0 and (index + 1) % save_interval == 0) or (index + 1 == total_length):
start_index = index + 1 - save_interval if (index + 1) % save_interval == 0 else max(0, index + 1 - (index + 1) % save_interval)
save_path = f"{data_path}/data_{start_index}_{index + 1}.parquet"
# we load in a dataframe first to explode topic hits and avoid OOM with `datasets`
print(f"Saving data {index}")
df = pd.DataFrame(intermediate_data).explode("topic_hits")
ds = Dataset.from_pandas(df)
ds.to_parquet(save_path)
print(f"💾 Saved intermediate data at {save_path}")
intermediate_data = []

# merge dataset and push to hub
directory_path = f"intermediate_data/{id}"
load_and_merge(directory_path, id, args)
print(f"Done! The data is available at '{save_path}' 🔥")