[GG-181] Use intermediate host for swap moves#2248
[GG-181] Use intermediate host for swap moves#2248bimboterminator1 wants to merge 1 commit intofeature/ADBDEV-6608from
Conversation
|
In the description: |
| 'remove_hosts', 'remove_hosts_file', 'target_datadirs', 'target_datadirs_file', | ||
| 'target_segment_count', 'mirror_mode', 'skip_rebalance', 'show_plan', | ||
| 'skip_resource_estimation', 'analyze', 'replay_lag', 'hba_hostnames' | ||
| 'skip_resource_estimation', 'analyze', 'replay_lag', 'hba_hostnames', 'inplace_swap_roles' |
There was a problem hiding this comment.
Please add checks for these new incompatible combinations into test
| batch_mirror_steps = [] | ||
| batch_primary_steps = [[],[],[]] | ||
|
|
||
| def is_swap_phase3(move: LogicalMove) -> bool: |
There was a problem hiding this comment.
Why do we need this phase metadata? From my understanding, when we are performing a swap, it will always transform in 3 movements:
- mirror to an intermediate host;
- primary to its target host;
- mirror from an intermediate host to its target host;
So, there always will be role change between any of these 3 steps. And they will not get into the same batch.
Or am I missing smth? Can you please advise a cluster configuration, where only role_changed check is not enough?
| "Need intermediate host to avoid primary and mirror coexistence." ) | ||
|
|
||
| swap_move_ids = set() | ||
| for prim_move, mir_move in swap_pairs: |
There was a problem hiding this comment.
Can we merge this loop with the one below:
...
for prim_move, mir_move in swap_pairs:
content_id = prim_move.seg.getSegmentContentId()
try:
# Select intermediate host
...
?
| active_hosts_count = len([h for h in self.target_hosts | ||
| if h.status in [HostStatus.ACTIVE, HostStatus.NEW]]) | ||
| if active_hosts_count < 3: | ||
| raise ValidationError( |
There was a problem hiding this comment.
There is also a check in select_intermediate_host():
if not candidates:
raise PlanningError(
f"No intermediate host available for swap of content {content_id}. "
f"Need at least 3 hosts to perform swaps safely."
)
which intersects with this check. The check in select_intermediate_host() seems to be more general, so maybe we can leave only it, and remove the check here?
| swap_phase3_moves.append(phase3) | ||
|
|
||
| except Exception as e: | ||
| raise PlanningError(f"Failed to plan swap for content {content_id}: {e}") |
There was a problem hiding this comment.
Maybe add a reminder here about --inplace-swap-roles option?
| if swap_pairs: | ||
| self.logger.info(f"Detected {len(swap_pairs)} primary-mirror pairs which just swap hosts") | ||
| active_hosts_count = len([h for h in self.target_hosts | ||
| if h.status in [HostStatus.ACTIVE, HostStatus.NEW]]) | ||
| if active_hosts_count < 3: | ||
| raise ValidationError( | ||
| "Cannot safely perform primary-mirror swaps with less than 3 hosts. " | ||
| "Need intermediate host to avoid primary and mirror coexistence." ) | ||
|
|
||
| swap_move_ids = set() | ||
| for prim_move, mir_move in swap_pairs: | ||
| swap_move_ids.add(prim_move.seg.getSegmentDbId()) | ||
| swap_move_ids.add(mir_move.seg.getSegmentDbId()) | ||
|
|
||
| # Track intermediate host usage for better distribution | ||
| used_intermediate_hosts = {} | ||
|
|
||
| # Decompose swaps with intermediate host selection | ||
| swap_phase1_moves = [] | ||
| swap_phase2_moves = [] | ||
| swap_phase3_moves = [] | ||
|
|
||
| for prim_move, mir_move in swap_pairs: | ||
| content_id = prim_move.seg.getSegmentContentId() | ||
| try: | ||
| # Select intermediate host | ||
| intermediate_host = self.select_intermediate_host( | ||
| prim_move, | ||
| mir_move, | ||
| used_intermediate_hosts, | ||
| resource_estimator # Pass the estimator with cached filesystem data | ||
| ) | ||
|
|
||
| # Decompose into 3 phases | ||
| phase1, phase2, phase3 = self.decompose_swap( | ||
| prim_move, | ||
| mir_move, | ||
| intermediate_host, | ||
| port_allocator | ||
| ) | ||
|
|
||
| swap_phase1_moves.append(phase1) | ||
| swap_phase2_moves.append(phase2) | ||
| swap_phase3_moves.append(phase3) | ||
|
|
||
| except Exception as e: | ||
| raise PlanningError(f"Failed to plan swap for content {content_id}: {e}") | ||
|
|
||
| # Collect non-swap moves | ||
| non_swap_mirror_moves = [] | ||
| non_swap_primary_moves = [] | ||
|
|
||
| for move in moves: | ||
| if move.seg.getSegmentDbId() not in swap_move_ids: | ||
| if move.seg.isSegmentPrimary(): | ||
| non_swap_primary_moves.append(move) | ||
| else: | ||
| non_swap_mirror_moves.append(move) | ||
|
|
||
| moves_with_swap = [] | ||
| # Batch 1: All initial mirror moves (regular + phase1) | ||
| moves_with_swap.extend(non_swap_mirror_moves) | ||
| moves_with_swap.extend(swap_phase1_moves) | ||
|
|
||
| # Batch 2: All primary moves (regular + phase2) | ||
| moves_with_swap.extend(non_swap_primary_moves) | ||
| moves_with_swap.extend(swap_phase2_moves) | ||
|
|
||
| return moves | ||
| # Batch 3: Phase 3 mirrors (must execute last) | ||
| moves_with_swap.extend(swap_phase3_moves) | ||
|
|
||
| final_moves = moves_with_swap |
There was a problem hiding this comment.
Maybe move this logic under if swap_pairs: condition into a separate function in order to improve readability? So it would be smth like:
| if swap_pairs: | |
| self.logger.info(f"Detected {len(swap_pairs)} primary-mirror pairs which just swap hosts") | |
| active_hosts_count = len([h for h in self.target_hosts | |
| if h.status in [HostStatus.ACTIVE, HostStatus.NEW]]) | |
| if active_hosts_count < 3: | |
| raise ValidationError( | |
| "Cannot safely perform primary-mirror swaps with less than 3 hosts. " | |
| "Need intermediate host to avoid primary and mirror coexistence." ) | |
| swap_move_ids = set() | |
| for prim_move, mir_move in swap_pairs: | |
| swap_move_ids.add(prim_move.seg.getSegmentDbId()) | |
| swap_move_ids.add(mir_move.seg.getSegmentDbId()) | |
| # Track intermediate host usage for better distribution | |
| used_intermediate_hosts = {} | |
| # Decompose swaps with intermediate host selection | |
| swap_phase1_moves = [] | |
| swap_phase2_moves = [] | |
| swap_phase3_moves = [] | |
| for prim_move, mir_move in swap_pairs: | |
| content_id = prim_move.seg.getSegmentContentId() | |
| try: | |
| # Select intermediate host | |
| intermediate_host = self.select_intermediate_host( | |
| prim_move, | |
| mir_move, | |
| used_intermediate_hosts, | |
| resource_estimator # Pass the estimator with cached filesystem data | |
| ) | |
| # Decompose into 3 phases | |
| phase1, phase2, phase3 = self.decompose_swap( | |
| prim_move, | |
| mir_move, | |
| intermediate_host, | |
| port_allocator | |
| ) | |
| swap_phase1_moves.append(phase1) | |
| swap_phase2_moves.append(phase2) | |
| swap_phase3_moves.append(phase3) | |
| except Exception as e: | |
| raise PlanningError(f"Failed to plan swap for content {content_id}: {e}") | |
| # Collect non-swap moves | |
| non_swap_mirror_moves = [] | |
| non_swap_primary_moves = [] | |
| for move in moves: | |
| if move.seg.getSegmentDbId() not in swap_move_ids: | |
| if move.seg.isSegmentPrimary(): | |
| non_swap_primary_moves.append(move) | |
| else: | |
| non_swap_mirror_moves.append(move) | |
| moves_with_swap = [] | |
| # Batch 1: All initial mirror moves (regular + phase1) | |
| moves_with_swap.extend(non_swap_mirror_moves) | |
| moves_with_swap.extend(swap_phase1_moves) | |
| # Batch 2: All primary moves (regular + phase2) | |
| moves_with_swap.extend(non_swap_primary_moves) | |
| moves_with_swap.extend(swap_phase2_moves) | |
| return moves | |
| # Batch 3: Phase 3 mirrors (must execute last) | |
| moves_with_swap.extend(swap_phase3_moves) | |
| final_moves = moves_with_swap | |
| if swap_pairs: | |
| final_moves = self.handle_swaps(......) |
| return self.filesystem_allocations[fs_key]['required_kb'] | ||
| return 0 | ||
|
|
||
| def get_available_space_for_directory(self, hostname: str, host_address: str, directory: str) -> Optional[int]: |
I think it is easy to add it. diff --git a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py
index 059f6e27c8f..663c971d012 100644
--- a/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py
+++ b/gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py
@@ -4546,6 +4546,13 @@ def impl(context, delay):
def impl(context):
os.environ['GPMGMT_FAULT_DELAY_MS'] = ""
+@given('store text in file "{filename}"')
+@then('store text in file "{filename}"')
+@when('store text in file "{filename}"')
+def impl(context, filename):
+ with open(filename, 'w') as fp:
+ fp.write(context.text + '\n')
+
@given('stub')
def impl(context):
pass
And following sample test definition: I was able to bring up a cluster with the required custom config: Therefore, I suggest to add behave tests in scope of this ticket. Also, it would be nice to see cases of cluster configuration, where several swaps are required, in the tests . |
Previously, primary and mirror could coexist at the same host during
execution of moves, where segments just swap their hosts. This violates
the HA rule for the whole cluster.
When the suboptimal rebalance plan requires swapping the locations
of a primary segment and its mirror, the planner now decomposes
this into three safe phases using an intermediate host to prevent
primary-mirror coexistence violations.
planner.py now detects swap moves in form_moves() and
chooses the appropriate 3rd host for mirror movement.
The search is performed based on available space, considering
other moves, host status, and on other swap counts.
Thus, plan, which previously looked like:
now expands into three moves
Additionally unit tests were fixed, because we've forgotten to check them
in previous patches.
Behave tests were not added, since we don't have functionality of creating
a cluster with custom configuration.
Example of configuration with single swap for manual testing:
conf