|
| 1 | +import json |
| 2 | +import os |
| 3 | +import re |
| 4 | +from typing import Any, Dict, List, Optional, Set, Tuple |
| 5 | + |
| 6 | +import yaml |
| 7 | + |
| 8 | +from structkit.commands import Command |
| 9 | +from structkit.completers import structures_completer |
| 10 | + |
| 11 | + |
| 12 | +class GraphCommand(Command): |
| 13 | + """Visualize dependency relationships between StructKit structures.""" |
| 14 | + |
| 15 | + def __init__(self, parser): |
| 16 | + super().__init__(parser) |
| 17 | + parser.description = "Visualize structure dependencies from folders[].struct references" |
| 18 | + structure_arg = parser.add_argument('structure_definition', nargs='?', type=str, help='Structure name or local YAML file to graph') |
| 19 | + structure_arg.completer = structures_completer |
| 20 | + parser.add_argument( |
| 21 | + '-s', |
| 22 | + '--structures-path', |
| 23 | + type=str, |
| 24 | + help='Path to structure definitions (env: STRUCTKIT_STRUCTURES_PATH)', |
| 25 | + default=os.getenv('STRUCTKIT_STRUCTURES_PATH', None) |
| 26 | + ) |
| 27 | + parser.add_argument('--all', action='store_true', help='Graph all available built-in and custom structures') |
| 28 | + parser.add_argument( |
| 29 | + '--format', |
| 30 | + choices=['text', 'json', 'mermaid'], |
| 31 | + default='text', |
| 32 | + help='Output format (default: text)' |
| 33 | + ) |
| 34 | + parser.set_defaults(func=self.execute) |
| 35 | + |
| 36 | + def execute(self, args): |
| 37 | + if not args.all and not args.structure_definition: |
| 38 | + self.parser.error('provide a structure name, a YAML file, or --all') |
| 39 | + if args.all and args.structure_definition: |
| 40 | + self.parser.error('provide either a structure name/YAML file or --all, not both') |
| 41 | + |
| 42 | + graph = self.build_graph( |
| 43 | + structure_definition=args.structure_definition, |
| 44 | + structures_path=args.structures_path, |
| 45 | + all_structures=args.all, |
| 46 | + ) |
| 47 | + print(self.format_graph(graph, args.format)) |
| 48 | + |
| 49 | + def build_graph( |
| 50 | + self, |
| 51 | + structure_definition: Optional[str] = None, |
| 52 | + structures_path: Optional[str] = None, |
| 53 | + all_structures: bool = False, |
| 54 | + ) -> Dict[str, Any]: |
| 55 | + roots = self._list_available_structures(structures_path) if all_structures else [structure_definition] |
| 56 | + graph = { |
| 57 | + 'roots': roots, |
| 58 | + 'nodes': [], |
| 59 | + 'edges': [], |
| 60 | + 'missing': [], |
| 61 | + 'cycles': [], |
| 62 | + } |
| 63 | + state = { |
| 64 | + 'node_names': set(), |
| 65 | + 'edge_keys': set(), |
| 66 | + 'missing_keys': set(), |
| 67 | + 'cycle_keys': set(), |
| 68 | + } |
| 69 | + |
| 70 | + for root in roots: |
| 71 | + if root: |
| 72 | + self._walk(root, structures_path, graph, state, []) |
| 73 | + |
| 74 | + return graph |
| 75 | + |
| 76 | + def format_graph(self, graph: Dict[str, Any], output_format: str = 'text') -> str: |
| 77 | + if output_format == 'json': |
| 78 | + return json.dumps(graph, indent=2) |
| 79 | + if output_format == 'mermaid': |
| 80 | + return self._format_mermaid(graph) |
| 81 | + return self._format_text(graph) |
| 82 | + |
| 83 | + def _walk( |
| 84 | + self, |
| 85 | + structure_definition: str, |
| 86 | + structures_path: Optional[str], |
| 87 | + graph: Dict[str, Any], |
| 88 | + state: Dict[str, Set[Any]], |
| 89 | + stack: List[str], |
| 90 | + ): |
| 91 | + node_name = self._display_name(structure_definition) |
| 92 | + config, source = self._load_yaml_config(structure_definition, structures_path) |
| 93 | + |
| 94 | + if node_name not in state['node_names']: |
| 95 | + graph['nodes'].append({ |
| 96 | + 'name': node_name, |
| 97 | + 'source': source, |
| 98 | + 'missing': config is None, |
| 99 | + }) |
| 100 | + state['node_names'].add(node_name) |
| 101 | + |
| 102 | + if config is None: |
| 103 | + return |
| 104 | + if not isinstance(config, dict): |
| 105 | + return |
| 106 | + |
| 107 | + if node_name in stack: |
| 108 | + self._add_cycle(stack + [node_name], graph, state) |
| 109 | + return |
| 110 | + |
| 111 | + next_stack = stack + [node_name] |
| 112 | + for dependency, folder in self._iter_dependencies(config): |
| 113 | + dep_name = self._display_name(dependency) |
| 114 | + edge_key = (node_name, dep_name, folder) |
| 115 | + if edge_key not in state['edge_keys']: |
| 116 | + graph['edges'].append({'from': node_name, 'to': dep_name, 'folder': folder}) |
| 117 | + state['edge_keys'].add(edge_key) |
| 118 | + |
| 119 | + if dep_name in next_stack: |
| 120 | + self._add_cycle(next_stack + [dep_name], graph, state) |
| 121 | + continue |
| 122 | + |
| 123 | + dep_config, dep_source = self._load_yaml_config(dependency, structures_path) |
| 124 | + if dep_config is None: |
| 125 | + if dep_name not in state['node_names']: |
| 126 | + graph['nodes'].append({'name': dep_name, 'source': dep_source, 'missing': True}) |
| 127 | + state['node_names'].add(dep_name) |
| 128 | + missing_key = (node_name, dep_name) |
| 129 | + if missing_key not in state['missing_keys']: |
| 130 | + graph['missing'].append({'from': node_name, 'to': dep_name, 'folder': folder, 'source': dep_source}) |
| 131 | + state['missing_keys'].add(missing_key) |
| 132 | + continue |
| 133 | + |
| 134 | + self._walk(dependency, structures_path, graph, state, next_stack) |
| 135 | + |
| 136 | + def _add_cycle(self, cycle: List[str], graph: Dict[str, Any], state: Dict[str, Set[Any]]): |
| 137 | + key = tuple(cycle) |
| 138 | + if key not in state['cycle_keys']: |
| 139 | + graph['cycles'].append(cycle) |
| 140 | + state['cycle_keys'].add(key) |
| 141 | + |
| 142 | + def _iter_dependencies(self, config: Dict[str, Any]) -> List[Tuple[str, str]]: |
| 143 | + dependencies = [] |
| 144 | + for item in config.get('folders', []) or []: |
| 145 | + if not isinstance(item, dict): |
| 146 | + continue |
| 147 | + for folder, content in item.items(): |
| 148 | + if not isinstance(content, dict): |
| 149 | + continue |
| 150 | + structs = content.get('struct') or content.get('structkit') |
| 151 | + if isinstance(structs, str): |
| 152 | + structs = [structs] |
| 153 | + if not isinstance(structs, list): |
| 154 | + continue |
| 155 | + for nested_struct in structs: |
| 156 | + if isinstance(nested_struct, str) and nested_struct: |
| 157 | + dependencies.append((nested_struct, str(folder))) |
| 158 | + return dependencies |
| 159 | + |
| 160 | + def _load_yaml_config(self, structure_definition: str, structures_path: Optional[str]): |
| 161 | + file_path = self._resolve_structure_path(structure_definition, structures_path) |
| 162 | + if not file_path or not os.path.exists(file_path): |
| 163 | + return None, file_path |
| 164 | + try: |
| 165 | + with open(file_path, 'r') as f: |
| 166 | + return yaml.safe_load(f) or {}, file_path |
| 167 | + except (OSError, yaml.YAMLError) as exc: |
| 168 | + self.logger.error(f"❗ Failed to load {file_path}: {exc}") |
| 169 | + return None, file_path |
| 170 | + |
| 171 | + def _resolve_structure_path(self, structure_definition: str, structures_path: Optional[str]) -> Optional[str]: |
| 172 | + if structure_definition.startswith('file://'): |
| 173 | + return structure_definition[7:] |
| 174 | + if structure_definition.endswith(('.yaml', '.yml')): |
| 175 | + return structure_definition |
| 176 | + |
| 177 | + this_file = os.path.dirname(os.path.realpath(__file__)) |
| 178 | + contribs_path = os.path.join(this_file, '..', 'contribs') |
| 179 | + candidates = [] |
| 180 | + if structures_path: |
| 181 | + candidates.append(os.path.join(structures_path, f'{structure_definition}.yaml')) |
| 182 | + candidates.append(os.path.join(structures_path, f'{structure_definition}.yml')) |
| 183 | + candidates.append(os.path.join(contribs_path, f'{structure_definition}.yaml')) |
| 184 | + candidates.append(os.path.join(contribs_path, f'{structure_definition}.yml')) |
| 185 | + for candidate in candidates: |
| 186 | + if os.path.exists(candidate): |
| 187 | + return candidate |
| 188 | + return candidates[0] if candidates else None |
| 189 | + |
| 190 | + def _list_available_structures(self, structures_path: Optional[str]) -> List[str]: |
| 191 | + this_file = os.path.dirname(os.path.realpath(__file__)) |
| 192 | + contribs_path = os.path.join(this_file, '..', 'contribs') |
| 193 | + paths = [] |
| 194 | + if structures_path: |
| 195 | + paths.append(structures_path) |
| 196 | + paths.append(contribs_path) |
| 197 | + |
| 198 | + names = set() |
| 199 | + for path in paths: |
| 200 | + if not path or not os.path.exists(path): |
| 201 | + continue |
| 202 | + for root, _, files in os.walk(path): |
| 203 | + for file_name in files: |
| 204 | + if not file_name.endswith(('.yaml', '.yml')): |
| 205 | + continue |
| 206 | + rel = os.path.relpath(os.path.join(root, file_name), path) |
| 207 | + rel = os.path.splitext(rel)[0] |
| 208 | + names.add(rel) |
| 209 | + return sorted(names) |
| 210 | + |
| 211 | + def _display_name(self, structure_definition: str) -> str: |
| 212 | + if structure_definition.startswith('file://'): |
| 213 | + return structure_definition[7:] |
| 214 | + return structure_definition |
| 215 | + |
| 216 | + def _format_text(self, graph: Dict[str, Any]) -> str: |
| 217 | + children: Dict[str, List[Dict[str, str]]] = {} |
| 218 | + for edge in graph['edges']: |
| 219 | + children.setdefault(edge['from'], []).append(edge) |
| 220 | + |
| 221 | + lines = ['Dependency graph:'] |
| 222 | + for root in graph['roots']: |
| 223 | + if not root: |
| 224 | + continue |
| 225 | + root_name = self._display_name(root) |
| 226 | + lines.extend(self._format_text_node(root_name, children, set(), '')) |
| 227 | + |
| 228 | + if graph['missing']: |
| 229 | + lines.append('') |
| 230 | + lines.append('Missing references:') |
| 231 | + for missing in graph['missing']: |
| 232 | + lines.append(f" - {missing['from']} -> {missing['to']} (folder: {missing['folder']})") |
| 233 | + |
| 234 | + if graph['cycles']: |
| 235 | + lines.append('') |
| 236 | + lines.append('Cycles:') |
| 237 | + for cycle in graph['cycles']: |
| 238 | + lines.append(f" - {' -> '.join(cycle)}") |
| 239 | + |
| 240 | + if not graph['missing'] and not graph['cycles']: |
| 241 | + lines.append('') |
| 242 | + lines.append('No missing references or cycles detected.') |
| 243 | + |
| 244 | + return '\n'.join(lines) |
| 245 | + |
| 246 | + def _format_text_node(self, node: str, children: Dict[str, List[Dict[str, str]]], stack: Set[str], prefix: str) -> List[str]: |
| 247 | + lines = [node] if not prefix else [f'{prefix}{node}'] |
| 248 | + if node in stack: |
| 249 | + lines[-1] += ' (cycle)' |
| 250 | + return lines |
| 251 | + |
| 252 | + next_stack = set(stack) |
| 253 | + next_stack.add(node) |
| 254 | + edges = children.get(node, []) |
| 255 | + for index, edge in enumerate(edges): |
| 256 | + is_last = index == len(edges) - 1 |
| 257 | + lines.extend(self._format_text_child(edge['to'], children, next_stack, '', is_last)) |
| 258 | + return lines |
| 259 | + |
| 260 | + def _format_text_child( |
| 261 | + self, |
| 262 | + node: str, |
| 263 | + children: Dict[str, List[Dict[str, str]]], |
| 264 | + stack: Set[str], |
| 265 | + prefix: str, |
| 266 | + is_last: bool, |
| 267 | + ) -> List[str]: |
| 268 | + connector = '└── ' if is_last else '├── ' |
| 269 | + line = f'{prefix}{connector}{node}' |
| 270 | + if node in stack: |
| 271 | + return [line + ' (cycle)'] |
| 272 | + |
| 273 | + next_stack = set(stack) |
| 274 | + next_stack.add(node) |
| 275 | + edges = children.get(node, []) |
| 276 | + lines = [line] |
| 277 | + child_prefix = prefix + (' ' if is_last else '│ ') |
| 278 | + for index, edge in enumerate(edges): |
| 279 | + lines.extend(self._format_text_child(edge['to'], children, next_stack, child_prefix, index == len(edges) - 1)) |
| 280 | + return lines |
| 281 | + |
| 282 | + def _format_mermaid(self, graph: Dict[str, Any]) -> str: |
| 283 | + node_ids = {node['name']: f'n{index}' for index, node in enumerate(graph['nodes'])} |
| 284 | + lines = ['graph TD'] |
| 285 | + if not graph['nodes']: |
| 286 | + return '\n'.join(lines) |
| 287 | + |
| 288 | + for node in graph['nodes']: |
| 289 | + label = self._escape_mermaid_label(node['name']) |
| 290 | + lines.append(f' {node_ids[node["name"]]}["{label}"]') |
| 291 | + |
| 292 | + for edge in graph['edges']: |
| 293 | + lines.append(f' {node_ids[edge["from"]]} --> {node_ids[edge["to"]]}') |
| 294 | + |
| 295 | + missing_nodes = [node_ids[node['name']] for node in graph['nodes'] if node.get('missing')] |
| 296 | + if missing_nodes: |
| 297 | + lines.append(' classDef missing fill:#ffe6e6,stroke:#cc0000,color:#660000') |
| 298 | + lines.append(f' class {",".join(missing_nodes)} missing') |
| 299 | + return '\n'.join(lines) |
| 300 | + |
| 301 | + def _escape_mermaid_label(self, value: str) -> str: |
| 302 | + return re.sub(r'"', r'\\"', value) |
0 commit comments