Skip to content

Commit 83161e0

Browse files
authored
Split callset into chunks for processing and add additional cli args (#72)
Add multiprocessing for samples in callset and add cli support to specify chunk size, add bypass-intersecting-intervals-phase and allow a dryrun to see what may be processed without actually executing the query.
1 parent 496865e commit 83161e0

2 files changed

Lines changed: 101 additions & 4 deletions

File tree

examples/genomicsdb_query

Lines changed: 95 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,26 @@ def parse_callset_json_for_row_ranges(callset_file, samples=None):
9696
return row_tuples
9797

9898

99+
def parse_callset_json_for_split_row_ranges(callset_file, chunk_size):
100+
callset = json.loads(genomicsdb.read_entire_file(callset_file))
101+
callsets = callset["callsets"]
102+
chunks = int(len(callsets) / chunk_size + 1)
103+
last_chunk_size = len(callsets) - (chunks - 1) * chunk_size
104+
# Collapse small last chunk into the last but one chunk
105+
if last_chunk_size < chunk_size / 2:
106+
chunks -= 1
107+
last_chunk_size += chunk_size
108+
if chunks == 1:
109+
return None
110+
split_row_ranges = []
111+
for i in range(0, chunks):
112+
if i == chunks - 1:
113+
split_row_ranges.append((chunk_size * i, chunk_size * i + last_chunk_size - 1))
114+
else:
115+
split_row_ranges.append((chunk_size * i, chunk_size * (i + 1) - 1))
116+
return split_row_ranges
117+
118+
99119
def parse_vidmap_json_for_attributes(vidmap_file, attributes=None):
100120
if attributes is None:
101121
return ["GT"]
@@ -229,6 +249,11 @@ def setup():
229249
default=8,
230250
help="Optional - number of processing units for multiprocessing(default: %(default)s). Run nproc from command line to print the number of processing units available to a process for the user", # noqa
231251
)
252+
parser.add_argument(
253+
"--chunk-size",
254+
default=10240,
255+
help="Optional - hint to split number of samples for multiprocessing used in conjunction with -n/--nproc and when -s/-S/--sample/--sample-list is not specified (default: %(default)s)", # noqa
256+
)
232257
parser.add_argument(
233258
"-t",
234259
"--output-type",
@@ -253,7 +278,19 @@ def setup():
253278
"-o",
254279
"--output",
255280
default="query_output",
256-
help="a prefix filename to csv outputs from the tool. The filenames will be suffixed with the interval and .csv/.json (default: %(default)s)", # noqa
281+
help="a prefix filename to outputs from the tool. The filenames will be suffixed with the interval and .csv/.json/... (default: %(default)s)", # noqa
282+
)
283+
parser.add_argument(
284+
"-d",
285+
"--dryrun",
286+
action="store_true",
287+
help="displays the query that will be run without actually executing the query (default: %(default)s)", # noqa
288+
)
289+
parser.add_argument(
290+
"-b",
291+
"--bypass-intersecting-intervals-phase",
292+
action="store_true",
293+
help="iterate only once bypassing the intersecting intervals phase (default: %(default)s)", # noqa
257294
)
258295

259296
args = parser.parse_args()
@@ -369,6 +406,15 @@ class GenomicsDBExportConfig(NamedTuple):
369406
callset_file: str
370407
attributes: str
371408
filter: str
409+
bypass_intersecting_intervals_phase: bool
410+
411+
def __str__(self):
412+
if self.filter:
413+
filter_str = f" filter={self.filter}"
414+
else:
415+
filter_str = ""
416+
bypass_str = f" bypass_intersecting_intervals_phase={self.bypass_intersecting_intervals_phase}"
417+
return f"workspace={self.workspace} attributes={self.attributes}{filter_str}{bypass_str}"
372418

373419

374420
class GenomicsDBQueryConfig(NamedTuple):
@@ -379,6 +425,13 @@ class GenomicsDBQueryConfig(NamedTuple):
379425
array_name: str
380426
row_tuples: List[tuple]
381427

428+
def __str__(self):
429+
if self.row_tuples:
430+
row_tuples_str = f"{self.row_tuples}"
431+
else:
432+
row_tuples_str = "ALL"
433+
return f"\tinterval={self.interval} array={self.array_name} callset rows={row_tuples_str}"
434+
382435

383436
class OutputConfig(NamedTuple):
384437
filename: str
@@ -398,7 +451,7 @@ def configure_export(config: GenomicsDBExportConfig):
398451
export_config.workspace = config.workspace
399452
export_config.vid_mapping_file = config.vidmap_file
400453
export_config.callset_mapping_file = config.callset_file
401-
export_config.bypass_intersecting_intervals_phase = False
454+
export_config.bypass_intersecting_intervals_phase = config.bypass_intersecting_intervals_phase
402455
export_config.enable_shared_posixfs_optimizations = True
403456
if config.attributes:
404457
export_config.attributes.extend(config.attributes)
@@ -505,15 +558,17 @@ def main():
505558
max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size)
506559
print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files")
507560

508-
export_config = GenomicsDBExportConfig(workspace, vidmap_file, callset_file, attributes, args.filter)
561+
export_config = GenomicsDBExportConfig(
562+
workspace, vidmap_file, callset_file, attributes, args.filter, args.bypass_intersecting_intervals_phase
563+
)
509564
configs = []
510565
for interval in intervals:
511566
print(f"Processing interval({interval})...")
512567

513568
contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions)
514569
if len(arrays) == 0:
515570
logging.error(f"No arrays in the workspace matched input interval({interval})")
516-
continue
571+
# continue
517572

518573
print(f"\tArrays:{arrays} under consideration for interval({interval})")
519574
for idx, array in enumerate(arrays):
@@ -529,6 +584,42 @@ def main():
529584
if len(configs) == 0:
530585
print("Nothing to process!!. Check output for possible errors")
531586
sys.exit(1)
587+
588+
# Check if there is room for row_tuples to be parallelized
589+
chunk_size = int(args.chunk_size)
590+
if row_tuples is None and len(configs) < args.nproc and chunk_size > 1:
591+
row_tuples = parse_callset_json_for_split_row_ranges(callset_file, chunk_size)
592+
if row_tuples:
593+
new_configs = []
594+
for idx_row, row_tuple in enumerate(row_tuples):
595+
for idx, config in enumerate(configs):
596+
query_config = config.query_config
597+
split_query_config = GenomicsDBQueryConfig(
598+
query_config.interval,
599+
query_config.contig,
600+
query_config.start,
601+
query_config.end,
602+
query_config.array_name,
603+
[row_tuple],
604+
)
605+
output_config = config.output_config
606+
split_output_config = OutputConfig(
607+
generate_output_filename(
608+
output, output_type, query_config.interval, len(configs) * idx + idx_row
609+
),
610+
output_type,
611+
json_type,
612+
max_arrow_bytes,
613+
)
614+
new_configs.append(Config(export_config, split_query_config, split_output_config))
615+
configs = new_configs
616+
617+
if args.dryrun:
618+
print(f"Query configurations for {export_config}:")
619+
for config in configs:
620+
print(config.query_config)
621+
sys.exit(0)
622+
532623
if min(len(configs), args.nproc) == 1:
533624
results = list(map(process, configs))
534625
else:

examples/test.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,14 @@ if [[ $PARTITION != "t0_1_2" ]]; then
187187
exit 1
188188
fi
189189
run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT"
190+
run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT -d"
190191
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT"
191192
run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -a GT -o $OUTPUT"
193+
run_command "genomicsdb_query -w $WORKSPACE -i 1 -o $OUTPUT -d"
194+
run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -o $OUTPUT"
195+
run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT -d"
196+
run_command "genomicsdb_query -w $WORKSPACE -i 1 --chunk-size=2 -b -o $OUTPUT"
197+
run_command "genomicsdb_query -w $WORKSPACE -i 4 --chunk-size=4 -b -o $OUTPUT -d"
192198

193199
OLDSTYLE_JSONS="-l $OLDSTYLE_DIR/loader.json -c $OLDSTYLE_DIR/callset_t0_1_2.json -v $OLDSTYLE_DIR/vid.json"
194200
run_command "genomicsdb_cache -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS"

0 commit comments

Comments
 (0)