Skip to content

Commit 5dec1cf

Browse files
committed
ENH: better docking for repetitive elements
1 parent de01d6f commit 5dec1cf

2 files changed

Lines changed: 144 additions & 115 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "versalign"
3-
version = "2.1.0"
3+
version = "2.2.0"
44
description = "Naive alignment for lists of arbitrary objects"
55
readme = "README.md"
66
requires-python = ">=3.10"

src/versalign/docking.py

Lines changed: 143 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@
1010
from versalign.config import DEFAULT_GAP_REPR
1111
from versalign.helpers import seq_to_arr, arr_to_seq
1212
from versalign.blocking import get_symbol_score_lookup
13-
from versalign.pairwise import pairwise_alignment, pairwise_alignment_score
14-
15-
16-
DockStrategy = Literal["nonoverlap"]
13+
from versalign.pairwise import pairwise_alignment
1714

1815

1916
@dataclass(frozen=True)
@@ -44,10 +41,10 @@ class DockingResult:
4441
"""
4542
Represents the result of docking multiple blocks against a target sequence.
4643
47-
:var placements: chosen placements, reordered by center coordinate
48-
:var unused_blocks: per-block indices that were not used
49-
:var docked_row: center-anchored row for visualization
50-
:var total_score: total score of all placements
44+
:param placements: chosen placements, reordered by center coordinate
45+
:param unused_blocks: per-block indices that were not used
46+
:param docked_row: center-anchored row for visualization
47+
:param total_score: total score of all placements
5148
"""
5249

5350
placements: list[DockPlacement]
@@ -60,8 +57,8 @@ def _alignment_ctx(aligner: Aligner, gap_repr: str) -> tuple[PairwiseAligner, li
6057
"""
6158
Prepare alignment context for docking.
6259
63-
:var aligner: Aligner object
64-
:var gap_repr: representation of gaps in sequences
60+
:param aligner: Aligner object
61+
:param gap_repr: representation of gaps in sequences
6562
:return: tuple containing the underlying PairwiseAligner, alphabet list, gap index,
6663
"""
6764
aligner_obj: PairwiseAligner = aligner.aligner
@@ -75,9 +72,9 @@ def _extract_center_interval(center_aln: list[str], block_aln: list[str], gap_re
7572
"""
7673
Extract the start and end indices of the block alignment within the center alignment.
7774
78-
:var center_aln: aligned center sequence segment
79-
:var block_aln: aligned block sequence segment
80-
:var gap_repr: representation of gaps in sequences
75+
:param center_aln: aligned center sequence segment
76+
:param block_aln: aligned block sequence segment
77+
:param gap_repr: representation of gaps in sequences
8178
:return: tuple of (start, end) indices in the center sequence, or None if no coverage
8279
"""
8380
# Map candidate columns to target positions
@@ -106,9 +103,9 @@ def _make_center_anchored_row(
106103
"""
107104
Create a center-anchored row representing the docked blocks.
108105
109-
:var center: the target sequence
110-
:var placements: list of DockPlacement objects
111-
:var gap_repr: representation of gaps in sequences
106+
:param center: the target sequence
107+
:param placements: list of DockPlacement objects
108+
:param gap_repr: representation of gaps in sequences
112109
:return: list of symbols representing the docked row
113110
"""
114111
# Simple mapping view: one symbol per center position; no insertion
@@ -173,103 +170,23 @@ def score_docked_region(
173170
return total
174171

175172

176-
def dock_against_target(
177-
aligner: Aligner,
178-
target: Sequence[str],
179-
candidates: Sequence[Sequence[str]],
180-
gap_repr: str = DEFAULT_GAP_REPR,
181-
allow_block_reverse: bool = False,
182-
strategy: DockStrategy = "nonoverlap",
183-
) -> DockingResult:
184-
"""
185-
Dock multiple blocks against a target sequence.
186-
187-
:var aligner: Aligner object
188-
:var target: target sequence as a list of symbols
189-
:var candidates: list of block sequences to dock
190-
:var gap_repr: representation of gaps in sequences
191-
:var allow_block_reverse: whether to consider reversed blocks
192-
:var strategy: docking strategy to use (currently only "nonoverlap" supported)
193-
:return: DockingResult object containing placements and summary information
173+
def _select_nonoverlapping(placements: list[DockPlacement]) -> list[DockPlacement]:
194174
"""
195-
if not target:
196-
raise ValueError("target sequence must not be empty")
197-
if not candidates:
198-
return DockingResult(
199-
placements=[],
200-
unused_blocks=[],
201-
docked_row=[gap_repr] * len(target),
202-
total_score=0.0,
203-
)
175+
Weighted interval scheduling on DockPlacement.start/end with weight=score.
204176
205-
aligner_obj, alphabet, gap_idx, label_fn = _alignment_ctx(aligner, gap_repr)
206-
idx, mat = get_symbol_score_lookup(aligner)
207-
208-
target_int = seq_to_arr(list(target), alphabet, label_fn).astype(np.int32)
209-
210-
# Generate placements for each block (fwd and optionally rev)
211-
placements: list[DockPlacement] = []
212-
for bi, blk in enumerate(candidates):
213-
blk_fwd = seq_to_arr(list(blk), alphabet, label_fn).astype(np.int32)
214-
orientations = [(False, blk_fwd)]
215-
if allow_block_reverse:
216-
blk_rev = seq_to_arr(list(reversed(blk)), alphabet, label_fn).astype(np.int32)
217-
orientations.append((True, blk_rev))
218-
219-
best_for_block: DockPlacement | None = None
220-
221-
for is_rev, blk_int in orientations:
222-
_, tgt_aln_int, blk_aln_int = pairwise_alignment(aligner_obj, target_int, blk_int, gap_repr=gap_idx)
223-
tgt_aln = arr_to_seq(tgt_aln_int, alphabet)
224-
blk_aln = arr_to_seq(blk_aln_int, alphabet)
225-
226-
interval = _extract_center_interval(tgt_aln, blk_aln, gap_repr)
227-
if interval is None:
228-
continue
229-
start, end = interval
230-
231-
region_score = score_docked_region(
232-
center_aln=tgt_aln,
233-
block_aln=blk_aln,
234-
start=start,
235-
end=end,
236-
gap_repr=gap_repr,
237-
idx=idx,
238-
mat=mat,
239-
)
240-
241-
cand = DockPlacement(
242-
block_idx=bi,
243-
reversed=is_rev,
244-
start=start,
245-
end=end,
246-
score=float(region_score),
247-
center_aln=tgt_aln,
248-
block_aln=blk_aln,
249-
)
250-
if best_for_block is None or cand.score > best_for_block.score:
251-
best_for_block = cand
252-
253-
if best_for_block is not None and best_for_block.score > 0.0:
254-
placements.append(best_for_block)
255-
177+
:param placements: list of DockPlacement objects
178+
:return: list of chosen non-overlapping DockPlacement objects with maximum total score
179+
"""
256180
if not placements:
257-
return DockingResult(
258-
placements=[],
259-
unused_blocks=list(range(len(candidates))),
260-
docked_row=[gap_repr] * len(target),
261-
total_score=0.0,
262-
)
263-
264-
# Weighted interval scheduling (non-overlapping)
265-
placements.sort(key=lambda p: (p.end, p.start))
266-
ends = [p.end for p in placements]
181+
return []
182+
183+
placements = sorted(placements, key=lambda p: (p.end, p.start))
267184

268185
def prev_nonoverlap(i: int) -> int:
269186
"""
270187
Find the index of the last placement that does not overlap with placement i.
271188
272-
:var i: index of the current placement
189+
:param i: index of the current placement
273190
:return: index of the last non-overlapping placement, or -1 if none exists
274191
"""
275192
lo, hi = 0, i - 1
@@ -283,7 +200,6 @@ def prev_nonoverlap(i: int) -> int:
283200
hi = mid - 1
284201
return ans
285202

286-
# Dynamic programming to find optimal set of non-overlapping placements
287203
n = len(placements)
288204
pidx = [prev_nonoverlap(i) for i in range(n)]
289205
dp = [0.0] * n
@@ -298,7 +214,7 @@ def prev_nonoverlap(i: int) -> int:
298214
take[i] = True
299215
else:
300216
dp[i] = excl
301-
217+
302218
chosen: list[DockPlacement] = []
303219
i = n - 1
304220
while i >= 0:
@@ -308,19 +224,132 @@ def prev_nonoverlap(i: int) -> int:
308224
else:
309225
i -= 1
310226
chosen.reverse()
227+
return chosen
311228

312-
used_blocks = {p.block_idx for p in chosen}
313-
unused_blocks = [i for i in range(len(candidates)) if i not in used_blocks]
314229

315-
docker_row = _make_center_anchored_row(list(target), chosen, gap_repr)
316-
total_score = float(sum(p.score for p in chosen))
230+
def dock_against_target(
231+
aligner: Aligner,
232+
target: Sequence[str],
233+
candidates: Sequence[Sequence[str]],
234+
gap_repr: str,
235+
mask_repr: str,
236+
allow_block_reverse: bool = False,
237+
max_passes: int = 3,
238+
) -> DockingResult:
239+
"""
240+
Dock multiple blocks against a target sequence.
241+
242+
:param aligner: Aligner object
243+
:param target: target sequence as a list of symbols
244+
:param candidates: list of block sequences to dock
245+
:param gap_repr: representation of gaps in sequences
246+
:param allow_block_reverse: whether to consider reversed blocks
247+
:param strategy: docking strategy to use (currently only "nonoverlap" supported)
248+
:param max_passes: maximum number of passes for iterative docking
249+
:return: DockingResult object containing placements and summary information
250+
"""
251+
if not target:
252+
raise ValueError("target sequence must not be empty")
253+
if not candidates:
254+
return DockingResult(
255+
placements=[],
256+
unused_blocks=[],
257+
docked_row=[gap_repr] * len(target),
258+
total_score=0.0,
259+
)
260+
261+
aligner_obj, alphabet, gap_idx, label_fn = _alignment_ctx(aligner, gap_repr)
262+
idx, mat = get_symbol_score_lookup(aligner)
263+
264+
target_int = seq_to_arr(list(target), alphabet, label_fn).astype(np.int32)
265+
mask_idx = alphabet.index(mask_repr)
266+
267+
chosen_all: list[DockPlacement] = []
268+
occupied = np.zeros(len(target), dtype=bool)
269+
270+
remaining = list(range(len(candidates)))
271+
272+
for _pass in range(max_passes):
273+
if not remaining:
274+
break
275+
276+
# Build masked target for this pass
277+
masked_target_int = target_int.copy()
278+
masked_target_int[occupied] = mask_idx
279+
280+
# Propose onse best placement per remaining block
281+
proposed: list[DockPlacement] = []
282+
283+
for bi in remaining:
284+
blk = candidates[bi]
285+
blk_fwd = seq_to_arr(list(blk), alphabet, label_fn).astype(np.int32)
286+
orientations = [(False, blk_fwd)]
287+
if allow_block_reverse:
288+
blk_rev = seq_to_arr(list(reversed(blk)), alphabet, label_fn).astype(np.int32)
289+
orientations.append((True, blk_rev))
290+
291+
best_for_block: DockPlacement | None = None
292+
293+
for is_rev, blk_int in orientations:
294+
_, tgt_aln_int, blk_aln_int = pairwise_alignment(aligner_obj, masked_target_int, blk_int, gap_repr=gap_idx)
295+
tgt_aln = arr_to_seq(tgt_aln_int, alphabet)
296+
blk_aln = arr_to_seq(blk_aln_int, alphabet)
297+
298+
interval = _extract_center_interval(tgt_aln, blk_aln, gap_repr)
299+
if interval is None:
300+
continue
301+
start, end = interval
302+
303+
region_score = score_docked_region(
304+
center_aln=tgt_aln,
305+
block_aln=blk_aln,
306+
start=start,
307+
end=end,
308+
gap_repr=gap_repr,
309+
idx=idx,
310+
mat=mat,
311+
)
312+
313+
cand = DockPlacement(
314+
block_idx=bi,
315+
reversed=is_rev,
316+
start=start,
317+
end=end,
318+
score=float(region_score),
319+
center_aln=tgt_aln,
320+
block_aln=blk_aln,
321+
)
322+
if best_for_block is None or cand.score > best_for_block.score:
323+
best_for_block = cand
324+
325+
if best_for_block is not None and best_for_block.score > 0.0:
326+
proposed.append(best_for_block)
327+
328+
# Choose a non-overlappin subset among the newly proposed placements
329+
chosen_this_pass = _select_nonoverlapping(proposed)
330+
331+
if not chosen_this_pass:
332+
break # no progress => stop
333+
334+
# Accept them and update occupied mask
335+
chosen_all.extend(chosen_this_pass)
336+
for p in chosen_this_pass:
337+
occupied[p.start : p.end + 1] = True
338+
339+
used_now = {p.block_idx for p in chosen_all}
340+
remaining = [i for i in range(len(candidates)) if i not in used_now]
341+
342+
# Finalize
343+
chosen_sorted = sorted(chosen_all, key=lambda p: (p.start, p.end))
344+
used_blocks = {p.block_idx for p in chosen_sorted}
345+
unused_blocks = [i for i in range(len(candidates)) if i not in used_blocks]
317346

318-
# Reorder by target coordinate for readability
319-
chosen_sorted = sorted(chosen, key=lambda p: (p.start, p.end))
347+
docked_row = _make_center_anchored_row(list(target), chosen_sorted, gap_repr)
348+
total_score = float(sum(p.score for p in chosen_sorted))
320349

321350
return DockingResult(
322351
placements=chosen_sorted,
323352
unused_blocks=unused_blocks,
324-
docked_row=docker_row,
353+
docked_row=docked_row,
325354
total_score=total_score,
326355
)

0 commit comments

Comments
 (0)