diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..66af02d --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.jsonl +*.log +*.ini +.* +!.gitignore \ No newline at end of file diff --git a/README.md b/README.md index f0f0437..20aec19 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ The `command` argument can be one of the following: - `description`: A one-sentence summary of *packages*, *classes*, and *methods*/*constructors*. - `roleStereotype`: A classification of *classes* into one of [Wirfs-Brock's role stereotypes](https://wirfs-brock.com/PDFs/Characterizing%20Classes.pdf). - `layer`: A classification of *packages*, *classes*, and *methods*/*constructors* into architectural layers. + - `secdfdTypes` (optional, enable via `[secdfd]` config): Multi-label SecDFD classification for v2 `Type`, `Operation`, and `Variable` nodes. Currently, this command adds all the properties above, i.e., there is no way to select only one or two properties to add to the graph. diff --git a/arcana/__main__.py b/arcana/__main__.py index 5827e96..280b87f 100644 --- a/arcana/__main__.py +++ b/arcana/__main__.py @@ -1,9 +1,15 @@ import argparse import configparser +import time import json +import logging import sys -from arcana.filters import CLISeeder, MetricsFilter, LLMFilter, MergeFilter +from arcana.checkpoint import writer +from arcana.llm_filter.filter import LLMFilter +from arcana.merge_filter import MergeFilter +from arcana.metrics import MetricsFilter +from arcana.seeder import CLISeeder from arcanalib.graph import Graph from arcanalib.pipefilter import Pipeline @@ -55,6 +61,16 @@ def main(): commands = args.command.split('-') if commands: + current_time_str = time.strftime("%Y%m%d-%H%M%S") + jsonl_file = f'arcana-{current_time_str}.jsonl' + w = writer(jsonl_file) + log_file = f'arcana-{current_time_str}.log' + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s %(name)s %(levelname)s: %(message)s", + filename=log_file, + filemode="a", # append + ) pipeline = Pipeline(*[ filters[command](config) for command in commands diff --git a/arcana/checkpoint.py b/arcana/checkpoint.py new file mode 100644 index 0000000..75c8a44 --- /dev/null +++ b/arcana/checkpoint.py @@ -0,0 +1,101 @@ +import json +import logging +from threading import Lock + +from arcana.custom_encoder import CustomJSONEncoder + +logger = logging.getLogger(__name__) + + +class JSONLWriter: + _instance = None + _lock = Lock() + + def __new__(cls, path: str, append: bool = False): + with cls._lock: + if cls._instance is None: + inst = super().__new__(cls) + mode = "a" if append else "w" + inst._file = open(path, mode, buffering=1) + cls._instance = inst + return cls._instance + + def write(self, data: dict): + with self._lock: + self._file.write(json.dumps(data, cls=CustomJSONEncoder) + "\n") + + def flush(self): + with self._lock: + try: + self._file.flush() + except Exception: + pass + + +_writer_path: str = "checkpoints.jsonl" +_writer_append: bool = False + + +def configure_writer(path: str, append: bool = False): + """Call once before the first writer() use to set path and open mode.""" + global _writer_path, _writer_append + _writer_path = path + _writer_append = append + + +def writer(path=None): + effective = path or _writer_path + return JSONLWriter(effective, _writer_append) + + +# --------------------------------------------------------------------------- +# Checkpoint loading (for resume) +# --------------------------------------------------------------------------- + +def load_checkpoint(path: str, graph): + """Load a JSONL checkpoint file into *graph*, applying node properties and edges. + + Handles both node entries {"data": {"id": ..., "labels": [...], "properties": {...}}} + and edge entries {"data": {"id": ..., "source": ..., "target": ..., "label": ..., "properties": {...}}}. + """ + loaded_nodes = 0 + loaded_edges = 0 + try: + with open(path, encoding="utf-8") as f: + for lineno, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line).get('data', {}) + except json.JSONDecodeError as e: + logger.warning("Skipping malformed checkpoint line %d: %s", lineno, e) + continue + + eid = entry.get('id') + if not eid: + continue + + if 'source' in entry and 'target' in entry and 'label' in entry: + # Edge entry + graph.add_edge( + entry['source'], + entry['target'], + entry['label'], + **entry.get('properties', {}), + ) + loaded_edges += 1 + else: + # Node entry + props = entry.get('properties', {}) + if eid in graph.nodes: + graph.nodes[eid].properties.update(props) + else: + graph.add_node(eid, *entry.get('labels', []), **props) + loaded_nodes += 1 + + except FileNotFoundError: + logger.warning("Checkpoint file not found: %s", path) + return + + logger.info("Loaded checkpoint %s: %d nodes, %d edges.", path, loaded_nodes, loaded_edges) diff --git a/arcana/custom_encoder.py b/arcana/custom_encoder.py new file mode 100644 index 0000000..f42bd95 --- /dev/null +++ b/arcana/custom_encoder.py @@ -0,0 +1,9 @@ +import json + +class CustomJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, set): + # Convert set to list + return list(obj) + # Call the default method for other types + return super().default(obj) \ No newline at end of file diff --git a/arcana/filters.py b/arcana/filters.py index 472e9b7..88b019c 100644 --- a/arcana/filters.py +++ b/arcana/filters.py @@ -1,472 +1,37 @@ -import json import os -import re -import subprocess -import sys -import time -from collections import Counter -from typing import Dict, Any - -from openai import OpenAI -from tqdm.auto import tqdm - -from arcana import templates -from arcanalib.graph import Graph, Node, triplets, invert, lift -from arcanalib.pipefilter import Filter, Seeder - - -def remove_java_comments(java_source: str) -> str: - """ - Remove single-line and multi-line comments from a given Java source code string. - - Args: - java_source (str): The Java source code as a string. - - Returns: - str: The Java source code without comments. - """ - pattern = r"(//.*?$)|(/\*.*?\*/)" - return re.sub(pattern, "", java_source, flags=re.MULTILINE | re.DOTALL).strip() - - -def sentence(s: str) -> str: - """ - Capitalize the first letter of a string and ensure it ends with a period. - - Args: - s (str): The input string. - - Returns: - str: The formatted string. - """ - t = s.strip() - if t[-1] in '.?!…~–—': - return f'{t[0].upper()}{t[1:]}' - return f'{t[0].upper()}{t[1:]}.' - - -def lower1(s: str) -> str: - """ - Lowercase the first character of a string. - - Args: - s (str): The input string. - - Returns: - str: The string with the first character lowercased. - """ - return s[0].lower() + s[1:] if s else s - - -def prettify_json(obj: dict) -> str: - """ - Convert a dictionary to a pretty-printed JSON string. - - Args: - obj (dict): The input dictionary. - - Returns: - str: The pretty-printed JSON string. - """ - return json.dumps(obj, indent='\t') - - -class CLISeeder(Seeder): - - def __init__(self, command) -> None: - """ - Initialize the seeder with a command. - - :param command: The command to be executed. - """ - self.command = command - - # sys.stderr.write(f"Command: {self.command}\n") - - def generate(self) -> Graph: - """ - Execute the command, parse the JSON output into a dict, and pass the dict to the Graph constructor. - - :return: The generated Graph object. - """ - # Execute the command - process = subprocess.run( - self.command, - capture_output=True, - text=True, - encoding="utf-8" - ) - - sys.stderr.write(process.stderr) - - # Parse the JSON output into a dict - if process.returncode == 0: - output_dict = json.loads(process.stdout) - - # Pass the dict to the Graph constructor and return the Graph object - return Graph(output_dict) - else: - raise "Command execution failed." - - -class MetricsFilter(Filter): - def process(self, data: Graph) -> Graph: - """ - Process the data to generate dependency profiles and categorize nodes. - - Args: - data (Graph): The input data. - - Returns: - Graph: The processed data with dependency profiles. - """ - parents = {e.source: e.target for e in invert(data.edges['contains'])} - dependency_profiles = {} - - calls = data.edges.get('calls', lift(data.edges['hasScript'], data.edges['invokes'], 'calls')) - - for edge in calls: - source_id, target_id = edge.source, edge.target - dependency_profiles.setdefault(source_id, []) - dependency_profiles.setdefault(target_id, []) - - if parents[source_id] != parents[target_id]: - dependency_profiles[source_id].append('out') - dependency_profiles[target_id].append('in') - - dependency_profiles = {id: Counter(profile) for id, profile in dependency_profiles.items()} - - def dependency_profile_category(inn: int, out: int) -> str: - if inn == 0 and out > 0: - return "outbound" - elif inn > 0 and out == 0: - return "inbound" - elif inn > 0 and out > 0: - return "transit" - return "hidden" - - for id, profile in dependency_profiles.items(): - data.nodes[id].properties['dependencyProfile'] = dependency_profile_category( - profile['in'], - profile['out'] - ) - - return data - - -def build_hierarchy(data: Graph) -> dict: - """Build a hierarchical structure of packages, classes, and methods.""" - methods = sorted(triplets(data.edges['contains'], data.edges['hasScript'])) - classes = sorted({(pkg, clz) for pkg, clz, _ in methods}) - packages = sorted({pkg for pkg, _ in classes}) - - hierarchy = { - pkg_id: { - cls_id: [met_id for _, c, met_id in methods if c == cls_id] - for p, cls_id in classes if p == pkg_id - } for pkg_id in packages - } - - return hierarchy - - -class LLMFilter(Filter): - def __init__(self, config: Dict[str, Dict[str, Any]]): - super().__init__(config) - self.project_name = None - self.project_desc = None - self.openai_client_args = None - - def process(self, data: Graph) -> Graph: - """ - Process the data using a language model to generate descriptions. - - Args: - data (Graph): The input data. - - Returns: - Graph: The processed data with generated descriptions. - """ - self.project_name, self.project_desc, self.openai_client_args, model, client = self.setup() - - hierarchy = build_hierarchy(data) - timestr = time.strftime("%Y%m%d-%H%M%S") - - with open(f'arcana-{timestr}.jsonl', 'a', encoding="utf-8") as file: - try: - self.process_hierarchy(data, hierarchy, client, model, file) - except StopIteration: - pass - - return data - - def setup(self): - """Setup necessary configuration and client.""" - project_name = self.config['project']['name'] - project_desc = self.config['project']['desc'] - - openai_client_args = { - 'api_key': self.config['llm'].get('apikey'), - 'base_url': self.config['llm'].get('apibase') - } - model = self.config['llm'].get('model', "gpt-4o-mini") - - client = OpenAI(**openai_client_args) - - return project_name, project_desc, openai_client_args, model, client - - def describe(self, node: dict) -> str: - """Generate a description for a given node.""" - keys = ['description', 'returns', 'reason', 'howToUse', 'howItWorks', 'assertions', 'roleStereotype', 'layer'] - return ' '.join(f"**{key}**: {str(node.properties[key])}. " for key in keys if key in node.properties) - - def process_hierarchy(self, data: Graph, hierarchy: dict, client: OpenAI, model: str, file): - """Process each package, class, and method in the hierarchy.""" - for pkg_id, pkg_data in tqdm(hierarchy.items(), desc="Processing packages"): - package = data.nodes[pkg_id] - - for cls_id, cls_data in tqdm(pkg_data.items(), desc="Processing classes", position=1, leave=False): - clasz = data.nodes[cls_id] - - class_name = clasz.properties['qualifiedName'] - class_kind = clasz.properties['kind'] - class_kind = 'enum' if class_kind == 'enumeration' else 'abstract class' if class_kind == 'abstract' else class_kind - - for met_id in tqdm(cls_data, desc='Processing methods', position=2, leave=False): - self.process_method(data, client, model, file, met_id, class_name, class_kind) - - if os.path.exists('stop'): - raise StopIteration - - self.process_class(data, client, model, file, cls_id, clasz, class_name, class_kind, cls_data) - - if os.path.exists('stop'): - raise StopIteration - - self.process_package(data, client, model, file, pkg_id, package, pkg_data) - - if os.path.exists('stop'): - raise StopIteration - - def process_method(self, data: Graph, client: OpenAI, model: str, file, met_id: str, class_name: str, - class_kind: str): - """Process a single method and generate its description.""" - method = data.nodes[met_id] - - if 'description' not in method.properties or not method.properties['description']: - method_name = method.properties['simpleName'] - method_src = remove_java_comments(method.properties['sourceText']) - - prompt = templates.script_analysis.format( - op_name=method_name, - struct_kind=class_kind, - struct_name=class_name, - op_src=method_src, - project_name=self.project_name, - project_desc=self.project_desc - ) - - description = self.generate_description(client, model, prompt) - self.update_method_properties(data, description, method) - - file.write(json.dumps({ - 'data': { - 'id': method.id, - 'labels': method.labels, - 'properties': description - } - })) - file.write('\n') - - def process_class(self, data: Graph, client: OpenAI, model: str, file, cls_id: str, clasz: dict, class_name: str, - class_kind: str, cls_data: list): - """Process a single class and generate its description.""" - ancestors, fields = self.get_class_relations(data, cls_id) - methods_descriptions = self.get_methods_descriptions(data, cls_data) - - prompt = templates.structure_analysis.format( - struct_type=class_kind, - struct_name=class_name, - ancestors="\n".join([f"- `{ancestor}`" for ancestor in ancestors]) if ancestors else "(none)", - fields="\n".join([f"- `{field}`" for field in fields]) if fields else "(none)", - methods="\n".join(methods_descriptions) if methods_descriptions else "(none)", - project_name=self.project_name, - project_desc=self.project_desc - ) - - description = self.generate_description(client, model, prompt) - self.update_class_properties(data, description, clasz) - - file.write(json.dumps({ - 'data': { - 'id': clasz.id, - 'labels': list(clasz.labels), - 'properties': description - } - })) - file.write('\n') - - def process_package(self, data: Graph, client: OpenAI, model: str, file, pkg_id: str, package: dict, - pkg_data: dict): - """Process a single package and generate its description.""" - classes_descriptions = self.get_classes_descriptions(data, pkg_data) - - prompt = templates.component_analysis.format( - pkg_name=package.properties['qualifiedName'], - classes="\n".join(classes_descriptions), - project_name=self.project_name, - project_desc=self.project_desc - ) - - description = self.generate_description(client, model, prompt) - self.update_package_properties(data, description, package) - - file.write(json.dumps({ - 'data': { - 'id': package.id, - 'labels': list(package.labels), - 'properties': description}})) - file.write('\n') - - def generate_description(self, client: OpenAI, model: str, prompt: str) -> dict: - """Generate a description using the OpenAI client.""" - try: - response = client.chat.completions.create( - model=model, - response_format={"type": "json_object"}, - messages=[{"role": "user", "content": prompt}], - max_tokens=1024, - temperature=0 - ) - description = response.choices[0].message.content - except: - description = '{}' - - try: - description = json.loads(description) - except: - description = dict() - - return description - - def update_method_properties(self, data: Graph, description: dict, method: dict): - """Update method properties with the generated description.""" - param_nodes = [ - data.nodes[edge.target] - for edge in data.edges['hasParameter'] - if edge.source == method.id - ] - - for key, value in description.items(): - if key.endswith('Reason'): - continue - key_lower = lower1(key) - if key_lower == 'parameters': - for param in value: - matching_params = [ - node - for node in param_nodes - if node.properties['simpleName'] == param['name'] - ] - if matching_params: - param_node_id = matching_params[0]['id'] - if param_node_id in data.nodes: - data.nodes[param_node_id].properties['description'] = param.get('description') - elif key_lower == 'returns': - method.properties['returns'] = value.get('description', None) if value else None - else: - method.properties[key_lower] = value - - def update_class_properties(self, data: Graph, description: dict, clasz: dict): - """Update class properties with the generated description.""" - for key in description: - if not key.endswith('Reason'): - clasz.properties[lower1(key)] = description[key] - - def update_package_properties(self, data: Graph, description: dict, package: dict): - """Update package properties with the generated description.""" - for key in description: - if not key.endswith('Reason'): - package.properties[lower1(key)] = description[key] - - def get_class_relations(self, data: Graph, cls_id: str) -> tuple: - """Retrieve class ancestors and fields.""" - ancestors = { - edge.target - for edge in data.edges['specializes'] - if edge.source == cls_id - } - fields = { - edge.target - for edge in data.edges['hasVariable'] - if edge.source == cls_id - } - fields = [ - ' '.join(remove_java_comments(data.nodes[field].properties['sourceText']).split()) - for field in fields - ] - return ancestors, fields - - def get_methods_descriptions(self, data: Graph, cls_data: list) -> list: - """Generate descriptions for methods.""" - return [ - f"- `{data.nodes[met_id].properties['simpleName']}`: {self.describe(data.nodes[met_id])}" - for met_id in cls_data - ] - - def get_classes_descriptions(self, data: Graph, pkg_data: dict) -> list: - """Generate descriptions for classes.""" - return [ - f"- {data.nodes[cls_id].properties['kind']} `{data.nodes[cls_id].properties['qualifiedName']}`: {self.describe(data.nodes[cls_id])}" - for cls_id, _ in pkg_data.items() - ] - - -def merge_node_properties(dict1: Dict[str, Node], dict2: Dict[str, Node], simplify_names=False): - for id2, obj2 in dict2.items(): - - matched_obj: Node = None - if id2 in dict1 and set(dict1[id2].labels) & set(obj2.labels): - matched_obj = dict1[id2] - - elif simplify_names: - - def simplify_name(name): - if '(' in name and name.endswith(')'): - prefix, params = name.split('(', 2) - params = [param.split('.')[-1].split('$')[-1] for param in params.split(')', 1)[0].split(',')] - return prefix + '(' + ','.join(params) + ')' - else: - return name - - dict1_name_remap = { - simplify_name(key): key - for key in dict1 - if {'Script', 'Operation', 'Constructor'} & set(dict1[key].labels) - } - - if id2 in dict1_name_remap and set(dict1[dict1_name_remap[id2]].labels) & set(obj2.labels): - matched_obj = dict1[dict1_name_remap[id2]] - - if matched_obj: - # sys.stderr.write(f"{id2}->{matched_obj['id']}\n") - # Merge properties from obj2 into matched_obj - matched_obj.properties.update(obj2.properties) - else: - # sys.stderr.write(f"{id2}->None\n") - pass - - -# Note: dict1 is updated in place, no need to return anything - -class MergeFilter(Filter): - def __init__(self, config: Dict[str, Dict[str, Any]]): - super().__init__(config) - - with open(config['merge']['input'], 'r', encoding="utf-8") as file: - data = json.load(file) - self.node_dict_to_merge = data - - def process(self, data: Graph) -> Any: - merge_node_properties(data.nodes, self.node_dict_to_merge, True) - return data +from typing import Any, Dict, List, Tuple + +def layers_to_list(d: Dict[str, Any]) -> List[Tuple[str, str]]: + result = [] + i = 1 + while True: + name_key, desc_key = f"layer{i}name", f"layer{i}desc" + if name_key not in d or desc_key not in d: + break + result.append((d[name_key], d[desc_key])) + i += 1 + return result + +from collections import OrderedDict +from typing import Any, Dict, OrderedDict as OD + +def layers_to_ordereddict(d: Dict[str, Any]) -> OD[str, str]: + layers: OD[str, str] = OrderedDict() + i = 1 + while True: + name_key = f"layer{i}name" + desc_key = f"layer{i}desc" + if name_key not in d or desc_key not in d: + break + layers[d[name_key]] = d[desc_key] + i += 1 + return layers + + +class StopProcessing(Exception): + """Raised when a stop signal is detected.""" + pass + +def check_stop() -> None: + if os.path.exists('stop'): + raise StopProcessing("Stop file detected, halting processing.") diff --git a/arcana/graph_utils.py b/arcana/graph_utils.py new file mode 100644 index 0000000..30b327e --- /dev/null +++ b/arcana/graph_utils.py @@ -0,0 +1,58 @@ +from collections import OrderedDict, defaultdict +from typing import Dict, List, Tuple + +from arcanalib.graph import Edge, triplets + + +def dependency_profile_category(inn: int, out: int) -> str: + if inn == 0 and out > 0: + return "outbound" + elif inn > 0 and out == 0: + return "inbound" + elif inn > 0 and out > 0: + return "transit" + return "hidden" + +def build_triplets(edge_list1, edge_list2) -> list: + methods = sorted(triplets(edge_list1, edge_list2)) + + return methods + +def build_hierarchy(method_triplets) -> dict: + classes = sorted({(pkg, clz) for pkg, clz, _ in method_triplets}) + packages = sorted({pkg for pkg, _ in classes}) + + hierarchy = { + pkg_id: {cls_id: [met_id for _, c, met_id in method_triplets if c == cls_id] for p, cls_id in classes if + p == pkg_id} for pkg_id in packages} + + return hierarchy + +def group_paths_by_endpoints(paths: List[List[Edge]]) -> Dict[Tuple[str, str], List[List[Edge]]]: + """ + Groups paths by the tuple of (first edge's source, last edge's target). + + Args: + paths (List[List[Edge]]): The list of paths to group. + + Returns: + Dict[Tuple[str, str], List[List[Edge]]]: A dictionary where keys are + tuples (first edge's source, last edge's target), and values are lists of paths. + """ + grouped_paths = defaultdict(list) + for path in paths: + if path: # Ensure the path is not empty + start = path[0].source + end = path[-1].target + grouped_paths[(start, end)].append(path) + return grouped_paths + +def format_layers(layers: OrderedDict): + return "\n".join(f"- **{name}**: {desc}" for name, desc in layers.items()) + +def describe_path(graph, path): + src_structure = graph.nodes[path[1].source] + src_method = graph.nodes[path[1].target] + tgt_method = graph.nodes[path[-2].source] + tgt_structure = graph.nodes[path[-2].target] + return f"{src_method.properties['kind'].capitalize()} `{src_method.properties['simpleName']}` ({src_method.properties['description']}) of {src_structure.properties['kind']} `{src_structure.properties['qualifiedName']}` invokes {tgt_method.properties['kind']} `{tgt_method.properties['simpleName']}` ({tgt_method.properties['description']}) of {tgt_structure.properties['kind']} `{tgt_structure.properties['qualifiedName']}`." \ No newline at end of file diff --git a/arcana/llm_filter/classification.py b/arcana/llm_filter/classification.py new file mode 100644 index 0000000..86b99e4 --- /dev/null +++ b/arcana/llm_filter/classification.py @@ -0,0 +1,222 @@ +from dataclasses import dataclass +from collections import OrderedDict +from collections.abc import Iterable + + +def default_layers(): + return OrderedDict([ + ('Presentation Layer', "Manages the user interface, defines UI elements and behavior, displays information, responds to user input, and updates views."), + ('Service Layer', "Controls the application flow, orchestrates domain operations, connects UI events with domain logic, and synchronizes domain changes with the UI."), + ('Domain Layer', "Handles business logic, represents domain data and behavior, and performs necessary computations for domain operations."), + ('Data Source Layer', "Interacts with databases, filesystems, hardware, messaging systems, or other data sources, performs CRUD operations, handles data conversion, and ensures data integrity."), + ]) + + +def default_role_stereotypes(): + return OrderedDict([ + ("Information Holder", "Knows facts and provides information (POJOs, beans, enums)."), + ("Service Provider", "Handles requests, performs services; implements a specific interface with a small number of methods (strategies, handlers)."), + ("Structurer", "Manages relationships among things (collections, maps)."), + ("Controller", "Makes decisions, directs flow of the program."), + ("Coordinator", "Delegates work across workers."), + ("User Interfacer", "Handles user input/output."), + ("External Interfacer", "Loads/stores from external services."), + ("Internal Interfacer", "Bridges subsystems (adapters, bridges, facades, proxies)."), + ]) + + +@dataclass +class ClassificationScheme: + name: str + dimension_id: str + dimension_name: str + dimension_kind: str + category_prefix: str + category_kind: str + prompt_label: str + response_key: str + response_reason_key: str + options: OrderedDict + undetermined_description: str + ordered: bool = False + applies_to: tuple = () + allow_multi_label: bool = False + + def options_with_undetermined(self) -> OrderedDict: + result = OrderedDict(self.options or OrderedDict()) + result['Undetermined'] = self.undetermined_description + return result + + def ordered_options(self) -> OrderedDict: + result = self.options_with_undetermined() + result.move_to_end('Undetermined', False) + return result + + def category_id(self, category_name: str) -> str: + return f"{self.category_prefix}:{category_name}" + + +def ordered_dict_from_mapping(mapping) -> OrderedDict: + if not mapping: + return OrderedDict() + if isinstance(mapping, OrderedDict): + return mapping + if isinstance(mapping, dict): + return OrderedDict(mapping) + if isinstance(mapping, Iterable): + return OrderedDict(mapping) + return OrderedDict() + + +def default_secdfd_types(): + return OrderedDict([ + ("External Entity", "Represents an external actor or system that interacts with the software."), + ("DataStore", "Represents persisted storage or a data access boundary."), + ("Process", "Represents non-trivial computation or orchestration logic."), + ("Asset", "Represents data objects with business or security value."), + ("Flow", "Represents data transfer across operations or boundaries."), + ]) + + +def stereocode_method_stereotypes(): + return OrderedDict([ + # Structural Accessors + ("get", "Returns a data member directly."), + ("predicate", "Returns a Boolean value that is not itself a data member."), + ("property", "Returns information derived from or about data members (non-Boolean)."), + ("void-accessor", "Returns information about data members through method parameters (out/ref params) rather than the return value."), + # Structural Mutators + ("set", "Modifies a single data member."), + ("command", "Performs a complex change to the object's state (e.g., modifies multiple data members); returns void."), + ("non-void-command", "Like command but also returns a value."), + # Creational + ("constructor", "Creates (or initialises) a new object instance."), + ("copy-constructor", "Creates a new object by copying an existing one."), + ("destructor", "Destroys or cleans up an object."), + ("factory", "Creates and returns an instance of another class."), + # Collaborational + ("collaborator", "Works primarily with objects belonging to classes other than itself (passed as parameter, stored as local/data member, or returned)."), + ("controller", "Changes only the state of an external object, not 'this'."), + ("wrapper", "Does not change the object's state but delegates to at least one free function call."), + # Degenerate + ("incidental", "Does not read or change the object's state and makes no calls to other class methods or free functions."), + ("stateless", "Does not read or change the object's state but has at least one call to other class methods or free functions."), + ("empty", "Has no statements at all."), + ]) + + +def stereocode_class_stereotypes(): + return OrderedDict([ + ("entity", "Encapsulates both data and behaviour; keeper of the data model and/or business logic."), + ("minimal-entity", "Special case of entity consisting only of get, set, and command methods."), + ("data-provider", "Encapsulates data and consists mainly of accessors (get/property/predicate)."), + ("commander", "Encapsulates behaviour and consists mainly of mutators (set/command)."), + ("boundary", "Communicator with a large percentage of collaborational methods and a low percentage of controller methods; few factory methods."), + ("factory", "Creator of objects; has mostly factory methods."), + ("controller", "Provides functionality to control external objects; consists mostly of controller and factory methods."), + ("pure-controller", "Special case of controller consisting only of controller and factory methods."), + ("large-class", "Contains a large number of methods combining multiple roles such as data-provider, commander, controller, and factory."), + ("lazy-class", "Consists mostly of get, set, and degenerate methods; occurrence of other methods is low."), + ("degenerate", "Consists mostly of degenerate methods that do not read or write to the object's state."), + ("data-class", "Consists only of get and set methods."), + ("small-class", "Consists of only one or two methods."), + ("empty", "Has no methods."), + ]) + + +def default_classification_schemes(layers_cfg=None, role_stereotypes_cfg=None, secdfd_enabled=False, stereocode_enabled=False): + layers = ordered_dict_from_mapping(layers_cfg) or default_layers() + role_stereotypes = ordered_dict_from_mapping(role_stereotypes_cfg) or default_role_stereotypes() + + layer_scheme = ClassificationScheme( + name="layer", + dimension_id="Architectural Layer", + dimension_name="Architectural Layer", + dimension_kind="categorical-ordered", + category_prefix="layer", + category_kind="architectural layer", + prompt_label="Possible Architectural Layers", + response_key="layer", + response_reason_key="layerReason", + options=layers, + undetermined_description="Architectural layer cannot be determined for this element.", + ordered=True, + applies_to=("operation", "type", "scope") + ) + + role_scheme = ClassificationScheme( + name="roleStereotype", + dimension_id="Role Stereotype", + dimension_name="Role Stereotype", + dimension_kind="categorical-nominal", + category_prefix="rs", + category_kind="role stereotype", + prompt_label="Possible Role Stereotypes", + response_key="roleStereotype", + response_reason_key="roleStereotypeReason", + options=role_stereotypes, + undetermined_description="Role stereotype cannot be determined for this element.", + ordered=False, + applies_to=("type",) + ) + schemes = OrderedDict([ + (layer_scheme.name, layer_scheme), + (role_scheme.name, role_scheme), + ]) + + if secdfd_enabled: + secdfd_scheme = ClassificationScheme( + name="secdfd", + dimension_id="SecDFD Type", + dimension_name="SecDFD Type", + dimension_kind="categorical-nominal", + category_prefix="secdfd", + category_kind="secdfd type", + prompt_label="Possible SecDFD Types", + response_key="secdfdTypes", + response_reason_key="secdfdEvidence", + options=default_secdfd_types(), + undetermined_description="SecDFD type cannot be determined for this element.", + ordered=False, + applies_to=("operation", "type", "variable"), + allow_multi_label=True, + ) + schemes[secdfd_scheme.name] = secdfd_scheme + + if stereocode_enabled: + stereocode_method_scheme = ClassificationScheme( + name="stereocodeMethod", + dimension_id="Stereocode Method Stereotype", + dimension_name="Stereocode Method Stereotype", + dimension_kind="categorical-nominal", + category_prefix="scm", + category_kind="stereocode method stereotype", + prompt_label="Possible Stereocode Method Stereotypes", + response_key="stereocodeStereotype", + response_reason_key="stereocodeStereotypeReason", + options=stereocode_method_stereotypes(), + undetermined_description="Stereocode method stereotype cannot be determined.", + ordered=False, + applies_to=("operation",), + allow_multi_label=True, + ) + stereocode_class_scheme = ClassificationScheme( + name="stereocodeClass", + dimension_id="Stereocode Class Stereotype", + dimension_name="Stereocode Class Stereotype", + dimension_kind="categorical-nominal", + category_prefix="scc", + category_kind="stereocode class stereotype", + prompt_label="Possible Stereocode Class Stereotypes", + response_key="stereocodeClassStereotype", + response_reason_key="stereocodeClassStereotypeReason", + options=stereocode_class_stereotypes(), + undetermined_description="Stereocode class stereotype cannot be determined.", + ordered=False, + applies_to=("type",), + allow_multi_label=False, + ) + schemes[stereocode_method_scheme.name] = stereocode_method_scheme + schemes[stereocode_class_scheme.name] = stereocode_class_scheme + + return schemes diff --git a/arcana/llm_filter/client.py b/arcana/llm_filter/client.py new file mode 100644 index 0000000..e64d2c0 --- /dev/null +++ b/arcana/llm_filter/client.py @@ -0,0 +1,137 @@ +import json +import sys +import time +import logging + +from pydantic import BaseModel + +from arcana import templates +from arcana.utils import find_first_valid_json + +logger = logging.getLogger(__name__) + +_RETRY_DELAYS = (2, 4, 8) + + +class LLMClient: + def __init__(self, llm_cfg, project_cfg): + from openai import OpenAI + self.client = OpenAI(api_key=llm_cfg['apikey'], base_url=llm_cfg.get('apibase')) + self.model = llm_cfg.get('model', 'gpt-4o-mini') + self.timeout = float(llm_cfg.get('timeout', 300)) + self.use_structured_output = str(llm_cfg.get('use_structured_output', 'true')).strip().lower() in {'1', 'true', 'yes', 'on'} + + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + + def generate_json(self, prompt: str, tool: str) -> dict: + """Generate a structured description. + + `tool` is the tool name string (e.g. "AnalyzeScript"). The method tries + the modern structured-output path first (if enabled) and falls back to + the legacy tool-calling path on failure or when disabled. + """ + model_class = templates.TOOL_MODELS.get(tool) + + if self.use_structured_output and model_class: + result = self._generate_structured(prompt, tool, model_class) + else: + result = self._generate_tool_call(prompt) + + if 'description' not in result: + result['description'] = "(no description)" + return result + + def generate_text(self, prompt: str) -> str: + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + max_tokens=4096, temperature=0, seed=42, + timeout=self.timeout, + ) + return response.choices[0].message.content + except Exception as e: + sys.stderr.write(f"Generate text error: {e}\n") + return "(no description)" + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + def _generate_structured(self, prompt: str, tool_name: str, model_class: type[BaseModel]) -> dict: + """Use OpenAI structured outputs (json_schema strict mode).""" + schema = model_class.model_json_schema() + # Remove $schema key if present — not accepted by the API + schema.pop("$schema", None) + + for attempt, delay in enumerate((*_RETRY_DELAYS, None)): + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": "You are a tool for analyzing software architecture of code implementations."}, + {"role": "user", "content": prompt}, + ], + response_format={ + "type": "json_schema", + "json_schema": { + "name": tool_name, + "schema": schema, + "strict": True, + }, + }, + temperature=0, seed=42, + timeout=self.timeout, + ) + content = response.choices[0].message.content + instance = model_class.model_validate_json(content) + return instance.model_dump() + except Exception as e: + if delay is None: + logger.warning("Structured output failed after retries (%s); falling back to tool-calling.", e) + return self._generate_tool_call(prompt) + logger.warning("Structured output attempt %d failed (%s); retrying in %ds.", attempt + 1, e, delay) + time.sleep(delay) + + return {} # unreachable + + def _generate_tool_call(self, prompt: str) -> dict: + """Legacy path: OpenAI tool-calling to constrain output format.""" + for attempt, delay in enumerate((*_RETRY_DELAYS, None)): + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": "You are a tool for analyzing software architecture of code implementations."}, + {"role": "user", "content": prompt}, + ], + tools=[ + templates.analyze_script_tool, + templates.analyze_structure_tool, + templates.analyze_component_tool, + ], + tool_choice="required", + temperature=0, seed=42, + timeout=self.timeout, + ) + + tool_calls = response.choices[0].message.tool_calls + if tool_calls: + return json.loads(tool_calls[0].function.arguments) + + content = response.choices[0].message.content + json_content = find_first_valid_json(content) + if json_content: + return json.loads(json_content) + return {} + + except Exception as e: + if delay is None: + sys.stderr.write(f"Generate JSON (tool-call) error: {e}\n") + return {} + logger.warning("Tool-call attempt %d failed (%s); retrying in %ds.", attempt + 1, e, delay) + time.sleep(delay) + + return {} # unreachable diff --git a/arcana/llm_filter/filter.py b/arcana/llm_filter/filter.py new file mode 100644 index 0000000..727c864 --- /dev/null +++ b/arcana/llm_filter/filter.py @@ -0,0 +1,195 @@ +import os +from collections import OrderedDict +from itertools import combinations +from typing import Any, Dict, List, TextIO +import logging + +from tqdm.auto import tqdm + +from arcana import templates +from arcana.checkpoint import configure_writer, load_checkpoint, writer +from arcana.filters import check_stop, layers_to_ordereddict +from arcana.graph_utils import (build_hierarchy, build_triplets, describe_path, + group_paths_by_endpoints) +from arcana.llm_filter.classification import default_classification_schemes +from arcana.llm_filter.client import LLMClient +from arcana.llm_filter.processors import (ComponentProcessor, InteractionProcessor, + ScriptProcessor, StructureProcessor, VariableProcessor) +from arcana.llm_filter.prompt import PromptBuilder +from arcana.utils import lower_first, remove_java_comments, write_jsonl +from arcanalib.graph import Edge, Graph, Node +from arcanalib.pipefilter import Filter + +logger = logging.getLogger(__name__) + + +class LLMFilter(Filter): + def __init__(self, config: Dict[str, Dict[str, Any]]): + super().__init__(config) + self.client = LLMClient(config['llm'], config['project']) + + llm_cfg = config.get('llm', {}) + self.max_workers = int(llm_cfg.get('workers', 8)) + + # Checkpoint / resume config + self.checkpoint_file = llm_cfg.get('checkpoint_file', 'checkpoints.jsonl') + self.resume = str(llm_cfg.get('resume', 'false')).strip().lower() in {'1', 'true', 'yes', 'on'} + configure_writer(self.checkpoint_file, append=self.resume) + + layer_cfg = config.get('layers') + self.layers = layers_to_ordereddict(layer_cfg) if layer_cfg else OrderedDict() + + stereo_cfg = config.get('stereotypes') + self.role_stereotypes = OrderedDict(stereo_cfg) if stereo_cfg else OrderedDict() + + self.secdfd_cfg = config.get('secdfd', {}) + self.secdfd_enabled = str(self.secdfd_cfg.get("enabled", "false")).strip().lower() in {"1", "true", "yes", "on"} + + stereocode_cfg = config.get('stereocode', {}) + self.stereocode_enabled = str(stereocode_cfg.get("enabled", "false")).strip().lower() in {"1", "true", "yes", "on"} + + classifications = default_classification_schemes( + self.layers, + self.role_stereotypes, + secdfd_enabled=self.secdfd_enabled, + stereocode_enabled=self.stereocode_enabled, + ) + self.prompt_builder = PromptBuilder(config['project'], classifications) + self.script_processor = ScriptProcessor(self.client, self.prompt_builder, self.max_workers) + self.structure_processor = StructureProcessor(self.client, self.prompt_builder, self.max_workers) + self.variable_processor = VariableProcessor(self.client, self.prompt_builder, self.secdfd_cfg, self.max_workers) + self.component_processor = ComponentProcessor(self.client, self.prompt_builder, self.max_workers) + self.interaction_processor = InteractionProcessor(self.client, self.prompt_builder, self.max_workers) + + def process(self, graph: Graph) -> Graph: + # 0. Resume: pre-load previous checkpoint so processors skip done nodes. + if self.resume and os.path.exists(self.checkpoint_file): + logger.info("Resuming from checkpoint: %s", self.checkpoint_file) + load_checkpoint(self.checkpoint_file, graph) + + # 1. Initialize classification dimension/category nodes. + self.prompt_builder.initialize_layers(graph) + + # 2. Build ancestor map for ScriptProcessor (same-name family / override context). + # Maps type_id → [all ancestor type_ids] via specializes edges. + type_ancestor_ids = self._build_type_ancestor_map(graph) + self.script_processor.type_ancestor_ids = type_ancestor_ids + + # 3. Process methods. + self.script_processor.process_all(graph) + + # 4. Process classes. + self.structure_processor.process_all(graph) + + # 5. Process variables (SecDFD). + if self.secdfd_enabled: + self.variable_processor.process_all(graph) + + # 6. Process packages. + self.component_processor.process_all(graph) + + # 7. Process interactions (stub). + self.interaction_processor.process_all(graph) + + return graph + + @staticmethod + def _build_type_ancestor_map(graph: Graph) -> dict[str, list[str]]: + """Return type_id → ordered list of ancestor type ids (parents first) via specializes.""" + specializes_edges = graph.find_edges(label='specializes') + # Build direct parent map: child_id → [parent_id, ...] + parents: dict[str, list[str]] = {} + for edge in specializes_edges: + parents.setdefault(edge.source, []).append(edge.target) + + # BFS from each type to collect all ancestors in order. + result: dict[str, list[str]] = {} + for t in graph.find_nodes('Type'): + visited: list[str] = [] + seen: set[str] = set() + queue = list(parents.get(t.id, [])) + while queue: + pid = queue.pop(0) + if pid in seen: + continue + seen.add(pid) + visited.append(pid) + queue.extend(parents.get(pid, [])) + result[t.id] = visited + return result + + # ------------------------------------------------------------------ + # Legacy interaction processing (kept from original, not currently + # wired into process() but available for future use) + # ------------------------------------------------------------------ + + def process_hierarchy(self, graph: Graph, jsonl_file, log_file): + st_contains_st = graph.find_edges(label='contains', source_label='Structure', target_label='Structure') + ct_contains_st = graph.find_edges(label='contains', target_label='Structure', where_source=lambda + node: 'Container' in node.labels and 'Structure' not in node.labels) + new_ct_sources = {edge.target: graph.find_source(graph.find_edges(label='contains'), graph.nodes[edge.target], + lambda node: 'Structure' not in node.labels, + graph.nodes[edge.source]).id for edge in st_contains_st} + ct_contains_st.extend( + [Edge(source=source, target=target, label='contains') for target, source in new_ct_sources.items()]) + + trips = build_triplets(ct_contains_st, graph.find_edges(label='hasScript')) + hierarchy = build_hierarchy(trips) + sorted_pkg_ids, pkg_deps = graph.toposorted_nodes( + graph.find_edges(label='contains', where_source=lambda node: 'Structure' not in node.labels, + where_target=lambda node: 'Structure' not in node.labels)) + + paths = graph.find_paths("contains", "hasScript", "invokes", "-hasScript", "-contains") + path_groups = group_paths_by_endpoints(paths) + + pkg_pairs = list(combinations(sorted_pkg_ids, 2)) + for pkg2_id, pkg1_id in tqdm(pkg_pairs, desc='Processing package interactions', position=0, leave=False): + check_stop() + pkg1 = graph.nodes[pkg1_id] + pkg2 = graph.nodes[pkg2_id] + if ('Structure' not in pkg1.labels) and ('Structure' not in pkg2.labels): + if path_groups[(pkg1_id, pkg2_id)]: + self.process_interactions(graph, pkg1, pkg2, path_groups[(pkg1_id, pkg2_id)], hierarchy, jsonl_file, log_file) + if path_groups[(pkg2_id, pkg1_id)]: + self.process_interactions(graph, pkg2, pkg1, path_groups[(pkg2_id, pkg1_id)], hierarchy, jsonl_file, log_file) + + def process_interactions(self, graph: Graph, c1: Node, c2: Node, path_groups: List[List[Edge]], hierarchy, + jsonl_file: TextIO, log_file: TextIO): + c1_name = c1.properties["qualifiedName"] + c2_name = c2.properties["qualifiedName"] + c1_desc = c1.properties.get("description", "") + c2_desc = c2.properties.get("description", "") + + c1_contents = hierarchy.get(c1.id, dict()) + c2_contents = hierarchy.get(c2.id, dict()) + + c1_structure_info = "\n".join( + f" - `{graph.nodes[c_id].properties['simpleName']}`: {graph.nodes[c_id].properties.get('description', '')}" + for c_id, _ in c1_contents.items()) + c2_structure_info = "\n".join( + f" - `{graph.nodes[c_id].properties['simpleName']}`: {graph.nodes[c_id].properties.get('description', '')}" + for c_id, _ in c2_contents.items()) + + dep_info = f" - Dependencies from `{c1_name}` to `{c2_name}`:\n" + "\n".join( + f" - {describe_path(graph, path)}" for path in path_groups) if path_groups else "" + + prompt = templates.interaction_analysis.format( + project_name=self.prompt_builder.project_name, + project_desc=self.prompt_builder.project_desc, + pkg1_name=c1_name, pkg2_name=c2_name, + pkg1_desc=c1_desc, pkg2_desc=c2_desc, + cls1_info=c1_structure_info, cls2_info=c2_structure_info, + dep_info=dep_info, + ) + + log_file.write(prompt) + log_file.write('\n\n======\n\n') + + description = self.client.generate_text(prompt) + pkg1_edge = Edge(source=c1.id, target=c2.id, label="dependsOn", description=description) if dep_info else None + + if pkg1_edge: + if "dependsOn" not in graph.edges: + graph.edges["dependsOn"] = [] + graph.edges["dependsOn"].append(pkg1_edge) + write_jsonl(jsonl_file, pkg1_edge.to_dict()) diff --git a/arcana/llm_filter/processors.py b/arcana/llm_filter/processors.py new file mode 100644 index 0000000..fbf407a --- /dev/null +++ b/arcana/llm_filter/processors.py @@ -0,0 +1,665 @@ +from abc import ABC, abstractmethod +from collections import OrderedDict +from collections.abc import Iterable +from concurrent.futures import Future, ThreadPoolExecutor, wait as futures_wait +import logging +import re +import threading + +from tqdm.auto import tqdm + +from arcana.checkpoint import writer +from arcana.filters import check_stop +from arcana.llm_filter.classification import ClassificationScheme +from arcana.llm_filter.client import LLMClient +from arcana.llm_filter.prompt import PromptBuilder, describe +from arcana.utils import lower_first, remove_java_comments +from arcanalib.graph import Graph, Node + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Futures-based DAG executor +# --------------------------------------------------------------------------- + +class NodeExecutor: + """Executes graph nodes concurrently while honouring dependency ordering. + + Each node is submitted once. Before running its own processing function a + worker thread waits for the futures of all direct dependencies to finish. + This is equivalent to a topological sort but without a global pre-pass and + with finer-grained parallelism: a node starts as soon as *its own* deps are + ready rather than waiting for an entire topo-level to drain. + + Starvation note: if max_workers < longest dependency chain length every + thread could block and make no progress. Default of 8 is safe for typical + Java inheritance depths. Users can raise it in config if needed. + """ + + def __init__(self, max_workers: int = 8): + self._pool = ThreadPoolExecutor(max_workers=max_workers) + self._futures: dict[str, Future] = {} + self._lock = threading.Lock() + self._completed = 0 + self._completed_lock = threading.Lock() + + def submit(self, node_id: str, dep_ids: list[str], fn) -> Future: + with self._lock: + if node_id in self._futures: + return self._futures[node_id] + # Snapshot dep futures while holding the lock so they cannot + # disappear between the check and the submit. + dep_futures = [self._futures[d] for d in dep_ids if d in self._futures] + f = self._pool.submit(self._run, dep_futures, fn, node_id) + self._futures[node_id] = f + return f + + def wait_all(self): + with self._lock: + all_futures = list(self._futures.values()) + futures_wait(all_futures) + + def shutdown(self): + self._pool.shutdown(wait=True) + + def _run(self, dep_futures: list[Future], fn, node_id: str): + for df in dep_futures: + df.result() # block until dependency is processed + fn(node_id) + with self._completed_lock: + self._completed += 1 + + @property + def completed(self) -> int: + with self._completed_lock: + return self._completed + + +# --------------------------------------------------------------------------- +# Base processor +# --------------------------------------------------------------------------- + +class Processor(ABC): + def __init__(self, client: LLMClient, prompt_builder: PromptBuilder, max_workers: int = 8): + self.client = client + self.prompt = prompt_builder + self.max_workers = max_workers + + @abstractmethod + def process_all(self, graph: Graph): + raise NotImplementedError + + def add_classification_options(self, parameters: OrderedDict, element_kind: str): + for scheme in self.prompt.classification_schemes(element_kind): + parameters[scheme.prompt_label] = scheme.options_with_undetermined() + + def apply_classifications(self, graph: Graph, element: Node, description: dict, element_kind: str): + for scheme in self.prompt.classification_schemes(element_kind): + self._apply_classification(graph, element, description, scheme) + + @staticmethod + def _apply_classification(graph: Graph, element: Node, description: dict, scheme: ClassificationScheme): + if scheme.allow_multi_label: + classifications = description.get(scheme.response_key, None) + else: + classifications = description.pop(scheme.response_key, None) + if not classifications: + return + + if isinstance(classifications, str): + classifications = [classifications] + elif isinstance(classifications, (list, tuple, set)): + classifications = list(classifications) + else: + classifications = [] + + if not scheme.allow_multi_label and classifications: + classifications = classifications[:1] + elif scheme.allow_multi_label and classifications and "primarySecdfdType" not in description: + description["primarySecdfdType"] = classifications[0] + + seen = set() + for classification in classifications: + if not classification or classification in seen: + continue + seen.add(classification) + target = graph.find_node(label="Category", where=lambda n: n.id == scheme.category_id(classification)) + if target: + impl_edge = graph.add_edge( + element.id, + target.id, + "implements", + weight=1, + reason=description.get(scheme.response_reason_key), + ) + if impl_edge: + writer().write(impl_edge.to_dict()) + + +# --------------------------------------------------------------------------- +# Helper: same-name method family (overloads in same class + overrides in ancestors) +# --------------------------------------------------------------------------- + +def collect_same_name_family( + graph: Graph, + method: Node, + enclosing_type: Node, + type_ancestor_ids: list[str], +) -> dict[str, str]: + """Return qualifiedName → describe() for related methods sharing the same simpleName. + + Covers: + - Overloads: same class, same simpleName, different node id. + - Overrides/hides: any ancestor class method with the same simpleName. + + Only includes methods that already have a description so the context is + always meaningful. + """ + name = method.properties.get('simpleName', '') + family: dict[str, str] = {} + + for op in enclosing_type.targets('encapsulates'): + if (op.has_label('Operation') + and op.id != method.id + and op.properties.get('simpleName') == name + and 'description' in op.properties): + family[op.properties['qualifiedName']] = describe(op, 'description', 'returns', 'howItWorks') + + for ancestor_id in type_ancestor_ids: + ancestor = graph.nodes.get(ancestor_id) + if not ancestor: + continue + for op in ancestor.targets('encapsulates'): + if (op.has_label('Operation') + and op.properties.get('simpleName') == name + and 'description' in op.properties): + family[op.properties['qualifiedName']] = describe(op, 'description', 'returns', 'howItWorks') + + return family + + +# --------------------------------------------------------------------------- +# ScriptProcessor +# --------------------------------------------------------------------------- + +class ScriptProcessor(Processor): + + def __init__(self, client, prompt_builder, max_workers=8): + super().__init__(client, prompt_builder, max_workers) + # Populated by LLMFilter before process_all is called. + self.type_ancestor_ids: dict[str, list[str]] = {} + + def process_all(self, graph: Graph): + methods = graph.find_nodes('Operation') + invokes_edges = graph.find_edges(label='invokes') + + # Build a dep map: method_id → [method_id, ...] of methods it invokes. + invokes_map: dict[str, list[str]] = {m.id: [] for m in methods} + for edge in invokes_edges: + if edge.source in invokes_map: + invokes_map[edge.source].append(edge.target) + + executor = NodeExecutor(max_workers=self.max_workers) + total = len(methods) + + def process_fn(met_id: str): + method = graph.nodes[met_id] + enclosers = [n for n in method.sources('encapsulates') if n.has_label('Type')] + if not enclosers: + return + clasz = enclosers[0] + self.process_one(graph, method, clasz) + check_stop() + + for method in methods: + executor.submit(method.id, invokes_map.get(method.id, []), process_fn) + + with tqdm(total=total, desc='Processing methods') as pbar: + last = 0 + while True: + done = executor.completed + if done > last: + pbar.update(done - last) + last = done + if done >= total: + break + import time; time.sleep(0.1) + + executor.wait_all() + executor.shutdown() + writer().flush() + + def process_one(self, graph: Graph, operation: Node, type: Node): + if (operation.properties.get('description') + and operation.properties['description'] != "(no description)"): + return + + op_name = operation.properties['simpleName'] + op_src = remove_java_comments(operation.properties.get('sourceText', '')) + op_kind = operation.properties.get('kind', 'function') + + typ_name = type.properties['qualifiedName'] + typ_kind = type.properties.get('kind', 'class') + typ_kind = 'enum' if typ_kind == 'enumeration' else 'abstract class' if typ_kind == 'abstract' else typ_kind + + ancestor_ids = self.type_ancestor_ids.get(type.id, []) + family = collect_same_name_family(graph, operation, type, ancestor_ids) + + base_instruction = f"Describe the following {op_kind} by using the AnalyzeScript tool.\n\n" + if family: + base_instruction = ( + "Related methods sharing the same name are listed below under " + "\"Related Methods\". If this overrides or hides a parent-class method, " + "clearly differentiate what this implementation changes or adds. " + "If this overloads another method in the same class, ensure descriptions " + "are consistent but distinguish the parameter/use-case differences.\n\n" + + base_instruction + ) + + op_parameters = OrderedDict() + op_parameters["Project Name"] = self.prompt.project_name + op_parameters["Project Description"] = self.prompt.project_desc + op_parameters[f"{op_kind.title()} to Analyze"] = f"`{op_name}` from the {typ_kind} `{typ_name}`." + op_parameters[f"{op_kind.title()} Source Code"] = op_src + + invoked_ids = [e.target for e in graph.find_edges(label='invokes') if e.source == operation.id] + op_parameters["Outgoing Dependencies (Invokes)"] = { + graph.nodes[nid].properties['qualifiedName']: describe(graph.nodes[nid], 'description', 'returns', 'howToUse', 'docComment') + for nid in invoked_ids if nid in graph.nodes + } + op_parameters["Incoming Dependencies (Invoked By)"] = [ + m.properties['qualifiedName'] for m in operation.sources('invokes') + ] + if family: + op_parameters["Related Methods (same name — overloads / overrides)"] = family + + self.add_classification_options(op_parameters, "operation") + + prompt = self.prompt.compose(base_instruction, **op_parameters) + logger.debug(prompt) + + description = self.client.generate_json(prompt, "AnalyzeScript") + self.apply_classifications(graph, operation, description, "operation") + self.update_method_properties(graph, description, operation) + writer().write({'data': {'id': operation.id, 'labels': list(operation.labels), 'properties': description}}) + + @staticmethod + def update_method_properties(data: Graph, description: dict, method: Node): + for key, value in description.items(): + if key.endswith('Reason'): + continue + key_lower = lower_first(key) + if key_lower == 'parameters' and isinstance(value, Iterable): + param_nodes = [data.nodes[edge.source] for edge in data.find_edges(label='parameterizes') + if edge.target == method.id] + for param in value: + if isinstance(param, dict): + matching = [n for n in param_nodes if n.properties['simpleName'] == param.get('name')] + if matching and matching[0].id in data.nodes: + data.nodes[matching[0].id].properties['description'] = param.get('description') + else: + data.nodes[method.id].properties[key_lower] = value + + +# --------------------------------------------------------------------------- +# StructureProcessor +# --------------------------------------------------------------------------- + +class StructureProcessor(Processor): + + def process_all(self, graph: Graph): + types = graph.find_nodes('Type') + specializes_edges = graph.find_edges(label='specializes') + + # Build dep map: class_id → [parent_id, ...] (parents = classes it specializes) + spec_map: dict[str, list[str]] = {t.id: [] for t in types} + for edge in specializes_edges: + if edge.source in spec_map: + spec_map[edge.source].append(edge.target) + + executor = NodeExecutor(max_workers=self.max_workers) + total = len(types) + + def process_fn(cls_id: str): + clasz = graph.nodes[cls_id] + enclosers = [n for n in clasz.sources('encloses') if n.has_label('Scope')] + package = enclosers[0] if enclosers else None + self.process_one(graph, clasz, package, spec_map) + check_stop() + + for t in types: + executor.submit(t.id, spec_map.get(t.id, []), process_fn) + + with tqdm(total=total, desc='Processing classes') as pbar: + last = 0 + while True: + done = executor.completed + if done > last: + pbar.update(done - last) + last = done + if done >= total: + break + import time; time.sleep(0.1) + + executor.wait_all() + executor.shutdown() + writer().flush() + + def process_one(self, graph: Graph, type: Node, scope: Node, spec_map: dict[str, list[str]]): + vars_list = StructureProcessor.get_type_relations(graph, type.id) + op_descriptions = { + method.properties['qualifiedName']: describe(method) + for method in type.targets('encapsulates') if method.has_label('Operation') + } + + typ_name = type.properties['qualifiedName'] + typ_kind = type.properties.get('kind', 'type') + typ_kind = 'enum' if typ_kind == 'enumeration' else 'abstract class' if typ_kind == 'abstract' else typ_kind + + parent_ids = spec_map.get(type.id, []) + siblings = self._already_processed_siblings(graph, type, parent_ids) + + base_instruction = f"Describe the following {typ_kind} using the AnalyzeStructure tool.\n\n" + if siblings or parent_ids: + base_instruction = ( + "The parent class description is provided below under \"Inherits From\". " + "Build on the parent's context and maintain consistent layer/stereotype " + "choices unless there is a clear reason to differ. " + + ( + "Sibling classes (sharing the same parent, already analysed) are also " + "provided — your description should be consistent in terminology with " + "them but must clearly differentiate this class's specific responsibilities. " + if siblings else "" + ) + + "\n\n" + base_instruction + ) + + typ_parameters = OrderedDict() + typ_parameters["Project Name"] = self.prompt.project_name + typ_parameters["Project Description"] = self.prompt.project_desc + + if scope: + scope_name = scope.properties['qualifiedName'] + scope_kind = scope.properties.get('kind', 'scope') + typ_parameters[f"{typ_kind.title()} to Analyze"] = f"`{typ_kind} {typ_name}` from the {scope_kind} `{scope_name}`." + else: + typ_parameters[f"{typ_kind.title()} to Analyze"] = f"`{typ_kind} {typ_name}`." + + typ_parameters[f"{typ_kind.title()} Inherits From"] = { + graph.nodes[pid].properties['qualifiedName']: describe(graph.nodes[pid], 'description', 'docComment') + for pid in parent_ids if pid in graph.nodes + } + typ_parameters["Inherited By"] = [ + f"{t.properties['kind']} {t.properties['qualifiedName']}" + for t in type.sources('specializes') + ] + if siblings: + typ_parameters["Sibling Classes (already analysed, same parent)"] = siblings + typ_parameters["Enclosed Variables/Fields"] = vars_list + typ_parameters["Enclosed Functions/Methods"] = op_descriptions + self.add_classification_options(typ_parameters, "type") + + prompt = self.prompt.compose(base_instruction, **typ_parameters) + logger.debug(prompt) + + description = self.client.generate_json(prompt, "AnalyzeStructure") + self.apply_classifications(graph, type, description, "type") + + for k, v in description.items(): + if not k.endswith('Reason'): + graph.nodes[type.id].properties[lower_first(k)] = v + + writer().write({'data': {'id': type.id, 'labels': list(type.labels), 'properties': description}}) + + @staticmethod + def _already_processed_siblings(graph: Graph, type_node: Node, parent_ids: list[str]) -> dict[str, str]: + result = {} + for parent_id in parent_ids: + parent = graph.nodes.get(parent_id) + if not parent: + continue + for sibling in parent.sources('specializes'): + if sibling.id != type_node.id and 'description' in sibling.properties: + result[sibling.properties['qualifiedName']] = describe(sibling, 'description', 'roleStereotype') + return result + + @staticmethod + def get_type_relations(data: Graph, cls_id: str) -> list: + fields = {data.nodes[edge.target] for edge in data.find_edges(label='encapsulates') if edge.source == cls_id} + return [' '.join(remove_java_comments(f.properties['sourceText']).split()) + for f in fields if f.has_label('Variable')] + + +# --------------------------------------------------------------------------- +# ComponentProcessor +# --------------------------------------------------------------------------- + +class ComponentProcessor(Processor): + + def process_all(self, graph: Graph): + scopes = graph.find_nodes('Scope', where=lambda n: not n.has_label('Type')) + encloses_edges = graph.find_edges( + label='encloses', + where_source=lambda n: n.has_label('Scope') and not n.has_label('Type'), + where_target=lambda n: n.has_label('Scope') and not n.has_label('Type'), + ) + + enc_map: dict[str, list[str]] = {s.id: [] for s in scopes} + for edge in encloses_edges: + if edge.source in enc_map: + enc_map[edge.source].append(edge.target) + + # A package depends on its sub-packages: reverse — sub-pkg must be done first. + dep_map: dict[str, list[str]] = {s.id: [] for s in scopes} + for parent_id, child_ids in enc_map.items(): + for child_id in child_ids: + if child_id in dep_map: + dep_map[child_id] # ensure key exists + # parent depends on children being done + dep_map[parent_id] = list(child_ids) + + executor = NodeExecutor(max_workers=self.max_workers) + total = len(scopes) + + def process_fn(pkg_id: str): + scope = graph.nodes[pkg_id] + self.process_one(graph, scope, enc_map) + check_stop() + + for s in scopes: + executor.submit(s.id, dep_map.get(s.id, []), process_fn) + + with tqdm(total=total, desc='Processing packages') as pbar: + last = 0 + while True: + done = executor.completed + if done > last: + pbar.update(done - last) + last = done + if done >= total: + break + import time; time.sleep(0.1) + + executor.wait_all() + executor.shutdown() + writer().flush() + + def process_one(self, graph: Graph, scope: Node, enc_map: dict[str, list[str]]): + typ_descriptions = { + f"{t.properties['kind']} {t.properties['qualifiedName']}": describe(t) + for t in scope.targets('encloses') if t.has_label('Type') + } + sub_ids = enc_map.get(scope.id, []) + subscp_descriptions = { + graph.nodes[nid].properties['qualifiedName']: describe(graph.nodes[nid], 'description', 'returns', 'howToUse', 'docComment') + for nid in sub_ids if nid in graph.nodes + } + scp_kind = scope.properties.get('kind', 'component') + + prompt_base = f"Describe the following {scp_kind} using the AnalyzeComponent tool.\n\n" + scp_parameters = OrderedDict() + scp_parameters["Project Name"] = self.prompt.project_name + scp_parameters["Project Description"] = self.prompt.project_desc + scp_parameters[f"{scp_kind.title()} to Analyze"] = scope.properties['qualifiedName'] + scp_parameters[f"Enclosed Sub-{scp_kind}s"] = subscp_descriptions + scp_parameters["Enclosed Classes"] = typ_descriptions + self.add_classification_options(scp_parameters, "scope") + + prompt = self.prompt.compose(prompt_base, **scp_parameters) + logger.debug(prompt) + + description = self.client.generate_json(prompt, "AnalyzeComponent") + self.apply_classifications(graph, scope, description, "scope") + ComponentProcessor.update_package_properties(graph, description, scope) + writer().write({'data': {'id': scope.id, 'labels': list(scope.labels), 'properties': description}}) + + @staticmethod + def update_package_properties(data: Graph, description: dict, package: Node): + for key in description: + if not key.endswith('Reason'): + data.nodes[package.id].properties[lower_first(key)] = description[key] + + +# --------------------------------------------------------------------------- +# InteractionProcessor (stub) +# --------------------------------------------------------------------------- + +class InteractionProcessor(Processor): + def process_all(self, graph: Graph): + pass + + +# --------------------------------------------------------------------------- +# VariableProcessor (SecDFD heuristic — unchanged logic) +# --------------------------------------------------------------------------- + +class VariableProcessor(Processor): + def __init__(self, client, prompt_builder, secdfd_cfg=None, max_workers=8): + super().__init__(client, prompt_builder, max_workers) + cfg = secdfd_cfg or {} + self.label_score_threshold = float(cfg.get("label_score_threshold", 0.60)) + self.process_min_out_invokes = int(cfg.get("process_min_out_invokes", 2)) + self.process_min_in_invokes = int(cfg.get("process_min_in_invokes", 2)) + self.asset_sensitive_term_hit_min = int(cfg.get("asset_sensitive_term_hit_min", 1)) + self.datastore_crud_hit_min = int(cfg.get("datastore_crud_hit_min", 1)) + self.external_entity_max_participation = int(cfg.get("external_entity_max_participation", 2)) + self.external_keywords = set("client rest entity user customer bank".split()) + self.datastore_keywords = set("db database dao repository storage cache data record table".split()) + self.asset_keywords = set("password secret policy user document card money balance account pin token key".split()) + self.flow_keywords = set("request response payload dto input output transfer amount source target".split()) + + def process_all(self, graph: Graph): + counter = 0 + signature_counts = self.build_signature_counts(graph) + for var in tqdm(graph.find_nodes("Variable"), desc="Processing variables"): + self.process_one(graph, var, signature_counts) + check_stop() + counter += 1 + if counter == 50: + writer().flush() + counter = 0 + + def process_one(self, graph: Graph, var: Node, signature_counts: dict): + description = self.infer_secdfd(graph, var, signature_counts) + if not description: + return + self.apply_classifications(graph, var, description, "variable") + for k, v in description.items(): + if not k.endswith("Reason"): + graph.nodes[var.id].properties[lower_first(k)] = v + writer().write({"data": {"id": var.id, "labels": list(var.labels), "properties": description}}) + + def infer_secdfd(self, graph: Graph, var: Node, signature_counts: dict) -> dict: + var_name = str(var.properties.get("simpleName", "")).strip() + name_tokens = self.tokenize(var_name) + signature = self.variable_signature(var) + participation = len(var.targets("parameterizes")) + len(var.sources("encapsulates")) + owners = [n for n in var.sources("encapsulates") if n.has_label("Type")] + ops = [n for n in var.targets("parameterizes") if n.has_label("Operation")] + scores = {"External Entity": 0.0, "DataStore": 0.0, "Process": 0.0, "Asset": 0.0, "Flow": 0.0} + evidence = [] + + external_hits = self.keyword_hits(name_tokens, self.external_keywords) + if external_hits: + scores["External Entity"] += 0.7 + evidence.append(f"external_keywords={','.join(sorted(external_hits))}") + if participation <= self.external_entity_max_participation and (ops or owners): + scores["External Entity"] += 0.2 + evidence.append("low_participation") + + datastore_hits = self.keyword_hits(name_tokens, self.datastore_keywords) + if len(datastore_hits) >= self.datastore_crud_hit_min: + scores["DataStore"] += 0.7 + evidence.append(f"datastore_keywords={','.join(sorted(datastore_hits))}") + if any(any(v in self.tokenize(op.properties.get("simpleName", "")) for v in {"save", "find", "delete", "create", "read", "update"}) for op in ops): + scores["DataStore"] += 0.2 + evidence.append("crud_related_operation") + + asset_hits = self.keyword_hits(name_tokens, self.asset_keywords) + if len(asset_hits) >= self.asset_sensitive_term_hit_min: + scores["Asset"] += 0.8 + evidence.append(f"asset_keywords={','.join(sorted(asset_hits))}") + if owners and not ops: + scores["Asset"] += 0.1 + evidence.append("field_like_variable") + + flow_hits = self.keyword_hits(name_tokens, self.flow_keywords) + if flow_hits: + scores["Flow"] += 0.4 + evidence.append(f"flow_keywords={','.join(sorted(flow_hits))}") + if ops: + scores["Flow"] += 0.2 + evidence.append("parameterizes_operation") + if signature_counts.get(signature, 0) > 1: + scores["Flow"] += 0.3 + evidence.append("shared_signature") + + if self.looks_like_verb(var_name): + scores["Process"] += 0.2 + evidence.append("verb_like_name") + if any(len(op.targets("invokes")) >= self.process_min_out_invokes or len(op.sources("invokes")) >= self.process_min_in_invokes for op in ops): + scores["Process"] += 0.3 + evidence.append("connected_to_high_interaction_operation") + + selected = [label for label, score in sorted(scores.items(), key=lambda x: x[1], reverse=True) if score >= self.label_score_threshold][:3] + if not selected: + selected = ["Undetermined"] + primary = "Undetermined" + else: + primary = selected[0] + + return { + "secdfdTypes": selected, + "primarySecdfdType": primary, + "secdfdConfidence": {k: round(v, 3) for k, v in scores.items() if v > 0}, + "secdfdEvidence": "; ".join(evidence) if evidence else "No strong SecDFD evidence found.", + } + + @staticmethod + def keyword_hits(tokens: set, keywords: set) -> set: + return {t for t in tokens if t in keywords} + + @staticmethod + def tokenize(text: str) -> set: + text = re.sub(r"([a-z0-9])([A-Z])", r"\1 \2", text or "") + text = text.replace("_", " ").replace("-", " ").lower() + return {t for t in re.findall(r"[a-z0-9]+", text)} + + @staticmethod + def looks_like_verb(name: str) -> bool: + l = (name or "").lower() + return l.startswith(("get", "set", "create", "save", "find", "load", "send", "fetch", "verify")) + + @staticmethod + def variable_signature(var: Node): + name = str(var.properties.get("simpleName", "")).strip().lower() + types = tuple(sorted(t.id for t in var.targets("type"))) + return name, types + + def build_signature_counts(self, graph: Graph): + counts = {} + for var in graph.find_nodes("Variable"): + sig = self.variable_signature(var) + counts[sig] = counts.get(sig, 0) + 1 + return counts diff --git a/arcana/llm_filter/prompt.py b/arcana/llm_filter/prompt.py new file mode 100644 index 0000000..8c44bc9 --- /dev/null +++ b/arcana/llm_filter/prompt.py @@ -0,0 +1,104 @@ +from collections import OrderedDict +from arcana.checkpoint import writer +from arcana.utils import remove_author, sentence +from arcana.llm_filter.classification import ClassificationScheme +from arcanalib.graph import Graph, Node + + +class PromptBuilder: + def __init__(self, project_cfg, classifications=None): + self.project_name = project_cfg['name'] + self.project_desc = project_cfg['desc'] + self.classifications = classifications or OrderedDict() + self.layers = self.classification_options('layer') + self.role_stereotypes = self.classification_options('roleStereotype') + + def classification_options(self, classification_name: str) -> OrderedDict: + scheme: ClassificationScheme = self.classifications.get(classification_name) + if not scheme: + return OrderedDict() + return scheme.options_with_undetermined() + + def classification_schemes(self, element_kind: str = None): + schemes = list(self.classifications.values()) + if not element_kind: + return schemes + return [scheme for scheme in schemes if element_kind in scheme.applies_to] + + def initialize_classifications(self, graph: Graph): + for scheme in self.classification_schemes(): + dimension = graph.add_node( + scheme.dimension_id, + "Dimension", + kind=scheme.dimension_kind, + simpleName=scheme.dimension_name, + qualifiedName=scheme.dimension_name, + ) + writer().write(dimension.to_dict()) + + categories = scheme.ordered_options() + category_names = list(categories.keys()) + for i, (name, desc) in enumerate(categories.items()): + cat_kwargs = dict( + kind=scheme.category_kind, + simpleName=name, + qualifiedName=name, + description=desc, + ) + if scheme.ordered: + cat_kwargs["order"] = i - 1 + cat = graph.add_node( + scheme.category_id(name), "Category", **cat_kwargs + ) + writer().write(cat.to_dict()) + e = graph.add_edge(cat.id, dimension.id, "composes", weight=1) + writer().write(e.to_dict()) + + if scheme.ordered: + for i in range(1, len(category_names) - 1): + src = category_names[i] + tgt = category_names[i + 1] + e = graph.add_edge( + scheme.category_id(src), scheme.category_id(tgt), "succeeds", weight=1 + ) + writer().write(e.to_dict()) + + def initialize_layers(self, graph: Graph): + # Backward-compatible alias. + self.initialize_classifications(graph) + + def compose(self, base_prompt, **parameters): + + prompt = base_prompt + for k, v in parameters.items(): + if isinstance(v, dict) and len(v): + prompt += f"## {k}\n\n" + for k1, v1 in v.items(): + if v1: + prompt += f"* {k1}: {str(v1)}\n" + prompt += "\n\n" + elif isinstance(v, list) and len(v): + prompt += f"## {k}\n\n" + for v1 in v: + if v1: + prompt += f"* {str(v1)}\n" + prompt += "\n\n" + elif v: + prompt += f"## {k}\n\n{str(v)}\n\n" + return prompt.strip() + + +def describe(node: Node, *keys) -> str: + """Generate a description for a given node.""" + sr, sn = '\r', '\n' + if not keys: + keys = ['description', 'docComment', 'returns', 'reason', 'howToUse', 'howItWorks', 'assertions', + 'roleStereotype', 'layer'] + + lines = {key: f"**{key}**: {sentence(str(node.properties[key]).replace(sr, '').replace(sn, ' '))}" for key in + keys if key in node.properties and key != 'docComment' and node.properties[key]} + if 'docComment' in keys and 'docComment' in node.properties and node.properties['docComment']: + lines[ + 'docComment'] = f"**docComment**: {sentence(remove_author(str(node.properties['docComment'])).replace(sr, '').replace(sn, ' '))} " + + return ' '.join(lines[key] for key in keys if key in lines).strip() diff --git a/arcana/merge_filter.py b/arcana/merge_filter.py new file mode 100644 index 0000000..03f9634 --- /dev/null +++ b/arcana/merge_filter.py @@ -0,0 +1,19 @@ +import json +from typing import Any, Dict + +from arcana.utils import merge_node_properties +from arcanalib.graph import Graph +from arcanalib.pipefilter import Filter + + +class MergeFilter(Filter): + def __init__(self, config: Dict[str, Dict[str, Any]]): + super().__init__(config) + + with open(config['merge']['input'], 'r', encoding="utf-8") as file: + data = json.load(file) + self.node_dict_to_merge = data + + def process(self, data: Graph) -> Any: + merge_node_properties(data.nodes, self.node_dict_to_merge, True) + return data \ No newline at end of file diff --git a/arcana/metrics.py b/arcana/metrics.py new file mode 100644 index 0000000..ed8e29d --- /dev/null +++ b/arcana/metrics.py @@ -0,0 +1,71 @@ +from collections import Counter, OrderedDict +import logging + +from arcana.graph_utils import dependency_profile_category +from arcanalib.graph import Graph, invert, lift +from arcanalib.pipefilter import Filter + + +logger = logging.getLogger(__name__) + +class MetricsFilter(Filter): + def process(self, data: Graph) -> Graph: + """ + Process the data to generate dependency profiles and categorize nodes. + + Args: + data (Graph): The input data. + + Returns: + Graph: The processed data with dependency profiles. + """ + # 1. Create a Dependency Profile dimension with its 4 categories + dim = data.add_node( + "Dependency Profile", "Dimension", + kind="categorical", + simpleName="Dependency Profile", + qualifiedName="Dependency Profile" + ) + categories = OrderedDict([ + ("outbound", "Calls leaving the module"), + ("inbound", "Calls entering the module"), + ("transit", "Both inbound and outbound"), + ("hidden", "Neither inbound nor outbound"), + ]) + cat_ids = {} + for idx, (key, desc) in enumerate(categories.items()): + cat = data.add_node( + f"dp:{key}", "Category", + kind="dependency profile", + simpleName=key, + qualifiedName=key, + description=desc, + order=idx + ) + cat_ids[key] = cat.id + data.add_edge(cat.id, dim.id, "composes", weight=1) + + # 2. Compute raw in/out counts + parents = {e.source: e.target for e in invert(data.find_edges(label='encloses'))} + dependency_profiles = {node.id:list() for node in data.find_nodes('Type')} + + calls = data.edges.get('calls', + lift(data.find_edges(label='encapsulates'), data.find_edges(label='invokes'), 'calls')) + + for edge in calls: + source_id, target_id = edge.source, edge.target + if parents.get(source_id) != parents.get(target_id): + dependency_profiles[source_id].append('out') + dependency_profiles[target_id].append('in') + + dependency_profiles = {node_id: Counter(prof) for node_id, prof in dependency_profiles.items()} + # logger.debug(dependency_profiles) + + # 3. Attach classification edges instead of setting a string property + for node_id, profile in dependency_profiles.items(): + cat_key = dependency_profile_category(profile['in'], profile['out']) + target_id = cat_ids.get(cat_key) + if target_id: + data.add_edge(node_id, target_id, "implements", weight=1) + + return data \ No newline at end of file diff --git a/arcana/seeder.py b/arcana/seeder.py new file mode 100644 index 0000000..340cffd --- /dev/null +++ b/arcana/seeder.py @@ -0,0 +1,29 @@ +import json +import subprocess +import sys + +from arcanalib.graph import Graph +from arcanalib.pipefilter import Seeder + + +class CLISeeder(Seeder): + + def __init__(self, command) -> None: + """ + Initialize the seeder with a command. + + :param command: The command to be executed. + """ + self.command = command + + def generate(self) -> Graph: + """ + Execute the command, parse the JSON output into a dict, and pass the dict to the Graph constructor. + + :return: The generated Graph object. + """ + process = subprocess.run(self.command, capture_output=True, text=True, encoding="utf-8", check=True) + if process.stderr: + sys.stderr.write(process.stderr) + output_dict = json.loads(process.stdout) + return Graph(output_dict) \ No newline at end of file diff --git a/arcana/templates.py b/arcana/templates.py index c280737..d3c04b7 100644 --- a/arcana/templates.py +++ b/arcana/templates.py @@ -1,112 +1,129 @@ -script_analysis = '''Consider a project {project_name}, {project_desc}. This is method `{op_name}` of {struct_kind} `{struct_name}`: - -```java -{op_src} -``` - -Explain the above method on the following aspects: - -{{ description: "Describe the functionality of the method in one sentence.", - parameters: [ {{ name:..., type:..., description:... }}, ... ], // empty list if there is no parameter - returns: {{ type:..., description: ... }}, // In case of a constructor, consider the constructed class as the return type. - reason: "Explain, in one sentence, the reason why the method is provided or the design rationale of the method.", - howToUse: "Describe the usage or the expected set-up of using the method in less than 3 sentences.", - howItWorks: "Describe the implementation details of the method in less than 5 sentences.", - assertions: {{ preConditions: ["pre-conditions of the method", ...], postConditions: ["pre-conditions of the method", ...] }}, - layer:..., - layerReason:... -}} - -For the `layer`, fill the value with one of the following architectural layer which functionality is exhibited by the method source code: - -- **Presentation Layer**: Manages the user interface, defines UI elements and behavior, displays information, responds to user input, and updates views. - -- **Service Layer**: Controls the application flow, orchestrates domain operations, connects UI events with domain logic, and synchronizes domain changes with the UI. - -- **Domain Layer**: Handles business logic, represents domain data and behavior, and performs necessary computations for domain operations. - -- **Data Source Layer**: Interacts with databases, filesystems, hardware, messaging systems, or other data sources, performs CRUD operations, handles data conversion, and ensures data integrity. - -In `layerReason`, explain why this method fits your layer of choice but not the other layers. - -Respond with a well-formatted JSON object. Do not use any quote marks ("'`) within the JSON values.''' - -structure_analysis = '''Consider a project {project_name}, {project_desc}. A {struct_type} `{struct_name}` specializes the following class(es) or interface(s): - -{ancestors} - -This {struct_type} contains the following field(s) and method(s): - -Fields: - -{fields} - -Methods: - -{methods} - -Explain the above {struct_type} on the following aspects: - -{{ description: "Describe the responsibility of the {struct_type} in one sentence.", - roleStereotype:..., - roleStereotypeReason:..., - layer:..., - layerReason:... }} - -For the `roleStereotype`, fill the value with one of the following role stereotypes which responsibility is exhibited by the {struct_type}: - -- **Information Holder** is responsible for knowing facts and providing information to other objects. POJOs and Java Beans are usually information holders. - -- **Service Provider** is responsible for handling requests and performing specific services. It usually implements a specific interface with a small number of methods. Concrete strategies are service providers. - -- **Structurer** is responsible for managing relationships and constraints among related things. It is usually a collection or mapping of some sort. - -- **Controller** is responsible for making decisions, directing the work of others, and handling important events. It directs the flow of the application or business process. - -- **Coordinator** is responsible for managing the actions of a group of workers and facilitating communication and work of other objects. It delegates requests to other objects. Very abstract classes and interfaces might be coordinators as they delegate the work to subclasses. - -- **User Interfacer** is responsible for transmitting user requests for action or display/render information that can be updated. It handles interactions with users. - -- **External Interfacer** is responsible for loading and storing information from/to external services, including database systems, web services, filesystems, hardware, etc. - -- **Internal Interfacer** is responsible for interfacing between two subsystems. It may bundle together information of requests from a group of objects to be sent to another object. Abstract adapters, bridges, facades, and proxies are internal interfacers. - -In `roleStereotypeReason`, explain why this {struct_type} fits your stereotype of choice but not the other stereotypes. - -For the `layer`, consider the functionalities of architectural layers below: - -- **Presentation Layer**: Manages the user interface, defines UI elements and behavior, displays information, responds to user input, and updates views. Typically (but not only) contains User Interfacers. - -- **Service Layer**: Controls the application flow, orchestrates domain operations, connects UI events with domain logic, and synchronizes domain changes with the UI. Typically (but not only) contains Coordinators and (Application) Controllers. - -- **Domain Layer**: Handles business logic, represents domain data and behavior, and performs necessary computations for domain operations. Typically (but not only) contains Information Holders, Service Providers, Structurers, Coordinators, and (Domain) Controllers. - -- **Data Source Layer**: Interacts with databases, filesystems, hardware, messaging systems, or other data sources, performs CRUD operations, handles data conversion, and ensures data integrity. Typically (but not only) contains External Interfacers. - -In `layerReason`, explain why this {struct_type} fits your layer of choice but not the other layers. - -Respond with a well-formatted JSON object. Do not use any quote marks ("'`) within the JSON values. In the `description`, do not mention the name of the role stereotype or layer.''' - -component_analysis = '''Consider a project {project_name}, {project_desc}. Given a package `{pkg_name}` containing the following classes: - -{classes} - -Explain the above package on the following aspects: - -{{ description: "Describe the purpose of the package in one sentence.", - layer:..., - layerReason:... }} - -For the `layer`, consider the functionalities of architectural layers below: - -- **Presentation Layer**: Manages the user interface, defines UI elements and behavior, displays information, responds to user input, and updates views. Typically (but not only) contains User Interfacers. - -- **Service Layer**: Controls the application flow, orchestrates domain operations, connects UI events with domain logic, and synchronizes domain changes with the UI. Typically (but not only) contains Coordinators and (Application) Controllers. - -- **Domain Layer**: Handles business logic, represents domain data and behavior, and performs necessary computations for domain operations. Typically (but not only) contains Information Holders, Service Providers, Structurers, Coordinators, and (Domain) Controllers. - -- **Data Source Layer**: Interacts with databases, filesystems, hardware, messaging systems, or other data sources, performs CRUD operations, handles data conversion, and ensures data integrity. Typically (but not only) contains External Interfacers. - -In `layerReason`, explain why this package fits your layer of choice but not the other layers. - -Respond with a well-formatted JSON object. Do not use any quote marks ("'`) within the JSON values. In the `description`, do not mention the name of the layer.''' +from typing import Literal +from pydantic import BaseModel, Field + + +# --------------------------------------------------------------------------- +# Output models +# --------------------------------------------------------------------------- + +class Parameter(BaseModel): + name: str + type: str = "" + description: str + + +class ScriptDescription(BaseModel): + description: str = Field(description="One-sentence description of the method/constructor/function functionality, in imperative mood.") + parameters: list[Parameter] = Field(default_factory=list, description="List of parameters. Empty if none.") + returns: str = Field(description="One-sentence description of the returned value. For constructors, describe the created instance.") + howToUse: str = Field(description="Usage instructions in less than three sentences.") + howItWorks: str = Field(description="Implementation details in less than five sentences.") + preConditions: list[str] = Field(default_factory=list, description="Pre-conditions for the script.") + postConditions: list[str] = Field(default_factory=list, description="Post-conditions for the script.") + stereotype: Literal["Accessor", "Mutator", "Creational", "Collaborational", "Other"] = Field(description="Design stereotype.") + stereotypeReason: str = Field(description="One-sentence explanation for the chosen stereotype.") + layer: str = Field(description="Architectural layer selected from the provided options.") + layerReason: str = Field(description="Explanation why this fits the chosen layer but not others.") + secdfdTypes: list[str] = Field(default_factory=list, description="One or more SecDFD classifications from the provided options.") + secdfdEvidence: str = Field(default="", description="Short evidence summary for the SecDFD classifications.") + stereocodeStereotype: list[str] = Field(default_factory=list, description="One or more Stereocode method stereotypes from the provided options. Methods may combine stereotypes, e.g. 'get collaborator'.") + stereocodeStereotypeReason: str = Field(default="", description="One-sentence explanation for the chosen Stereocode method stereotype(s).") + + +class StructureDescription(BaseModel): + description: str = Field(description="Up to three sentences describing the key responsibilities of the class/struct/type.") + keywords: list[str] = Field(default_factory=list, description="Important keywords related to key responsibilities.") + roleStereotype: str = Field(description="Role stereotype; options are supplied at runtime.") + roleStereotypeReason: str = Field(description="One-sentence explanation for the chosen role stereotype.") + layer: str = Field(description="Architectural layer selected from the provided options.") + layerReason: str = Field(description="Explanation why this fits the chosen layer but not others.") + secdfdTypes: list[str] = Field(default_factory=list, description="One or more SecDFD classifications from the provided options.") + secdfdEvidence: str = Field(default="", description="Short evidence summary for the SecDFD classifications.") + stereocodeClassStereotype: str = Field(default="", description="Stereocode class stereotype selected from the provided options.") + stereocodeClassStereotypeReason: str = Field(default="", description="One-sentence explanation for the chosen Stereocode class stereotype.") + + +class ComponentDescription(BaseModel): + description: str = Field(description="Describe the functionality of the component/package in up to five sentences.") + title: str = Field(description="A noun phrase describing the component/package.") + keywords: list[str] = Field(default_factory=list, description="Important keywords related to the core functionalities.") + layer: str = Field(description="Architectural layer selected from the provided options.") + layerReason: str = Field(description="Explanation why this fits the chosen layer but not others.") + + +# --------------------------------------------------------------------------- +# Tool dicts (OpenAI function-calling format, derived from Pydantic schemas) +# Used as fallback when use_structured_output = false. +# --------------------------------------------------------------------------- + +def _tool(name: str, description: str, model: type[BaseModel]) -> dict: + schema = model.model_json_schema() + # Pydantic v2 may emit $defs for nested models; OpenAI function-calling + # accepts these inline definitions without issue. + return { + "type": "function", + "function": { + "name": name, + "description": description, + "parameters": schema, + }, + } + + +analyze_script_tool = _tool( + "AnalyzeScript", + "Analyzes a program method/constructor/function given its source code and context.", + ScriptDescription, +) + +analyze_structure_tool = _tool( + "AnalyzeStructure", + "Analyzes a software class/struct/type based on its inheritance, fields, and methods.", + StructureDescription, +) + +analyze_component_tool = _tool( + "AnalyzeComponent", + "Analyzes a software component/package by examining its contents.", + ComponentDescription, +) + +# Map tool name → Pydantic model (used by the structured-output client path). +TOOL_MODELS: dict[str, type[BaseModel]] = { + "AnalyzeScript": ScriptDescription, + "AnalyzeStructure": StructureDescription, + "AnalyzeComponent": ComponentDescription, +} + + +# --------------------------------------------------------------------------- +# Interaction analysis prompt template (unchanged) +# --------------------------------------------------------------------------- + +interaction_analysis = '''## Input: + +Consider a project {project_name}, {project_desc}. + +- Package Information: + - `{pkg1_name}`: {pkg1_desc} + - `{pkg2_name}`: {pkg2_desc} + +- Class Information: + - `{pkg1_name}`: +{cls1_info} + - `{pkg2_name}`: +{cls2_info} + +- Inter-Package Dependencies: +{dep_info} + +## Task: + +Using the provided information, describe the interaction between the {pkg1_name} and {pkg2_name} packages, focusing on: + +- The purpose and nature of their dependency in terms of design. +- An abstract, high-level description of the relationship without referencing specific classes or methods. + +## Output: + +Provide a cohesive explanation of the interaction in one to two sentences. Keep the response plain text.''' diff --git a/arcana/utils.py b/arcana/utils.py new file mode 100644 index 0000000..2021b63 --- /dev/null +++ b/arcana/utils.py @@ -0,0 +1,126 @@ +import json +import re +from typing import Any, Dict, TextIO + +from arcana.custom_encoder import CustomJSONEncoder +from arcanalib.graph import Node + +def remove_author(s: str) -> str: + return "\n".join(line.strip() for line in s.splitlines() if '@author' not in line) + +_JAVA_COMMENT_RE = re.compile(r"(//.*?$)|(/\*.*?\*/)", flags=re.MULTILINE | re.DOTALL) + +def remove_java_comments(java_source: str) -> str: + return _JAVA_COMMENT_RE.sub("", java_source).strip() + +def sentence(s: str) -> str: + """ + Capitalize the first letter of a string and ensure it ends with a period. + + Args: + s (str): The input string. + + Returns: + str: The formatted string. + """ + if not s: + return "" + t = s.strip() + if not t: + return "" + if t[-1] in '.?!…~–—': + return f'{t[0].upper()}{t[1:]}' + return f'{t[0].upper()}{t[1:]}.' + +def lower_first(s: str) -> str: + """ + Lowercase the first character of a string. + + Args: + s (str): The input string. + + Returns: + str: The string with the first character lowercased. + """ + return s[0].lower() + s[1:] if s else s + +def prettify_json(obj: dict) -> str: + """ + Convert a dictionary to a pretty-printed JSON string. + + Args: + obj (dict): The input dictionary. + + Returns: + str: The pretty-printed JSON string. + """ + return json.dumps(obj, indent=2) + +def write_jsonl(file: TextIO, obj: Any) -> None: + file.write(json.dumps(obj, cls=CustomJSONEncoder) + '\n') + +def find_first_valid_json(text: str) -> str: + """ + Finds the first valid JSON substring in the given text using a stack-based approach. + + It scans the text from left to right, and when it encounters a '{', it tracks the balanced + braces until a complete JSON object is formed. Once a candidate is found, it attempts to parse + it with json.loads(). If parsing succeeds, that candidate is returned immediately. + + Args: + text (str): The input string that may contain a JSON object. + + Returns: + str: The first valid JSON substring found, or an empty string if none is found. + """ + n = len(text) + for i in range(n): + if text[i] == '{': + stack = 0 + for j in range(i, n): + if text[j] == '{': + stack += 1 + elif text[j] == '}': + stack -= 1 + if stack == 0: + candidate = text[i:j + 1] + try: + json.loads(candidate) + return candidate + except json.JSONDecodeError: + # If this candidate isn't valid JSON, break and continue scanning. + break + return "" + +def simplify_name(name): + if '(' in name and name.endswith(')'): + prefix, params = name.split('(', 2) + params = [param.split('.')[-1].split('$')[-1] for param in params.split(')', 1)[0].split(',')] + return prefix + '(' + ','.join(params) + ')' + else: + return name + +def merge_node_properties(dict1: Dict[str, Node], dict2: Dict[str, Node], simplify_names=False): + for id2, obj2 in dict2.items(): + + matched_obj = None + if id2 in dict1 and set(dict1[id2].labels) & set(obj2.labels): + matched_obj = dict1[id2] + + elif simplify_names: + + dict1_name_remap = {simplify_name(key): key for key in dict1 if + {'Script', 'Operation', 'Constructor'} & set(dict1[key].labels)} + + if id2 in dict1_name_remap and set(dict1[dict1_name_remap[id2]].labels) & set(obj2.labels): + matched_obj = dict1[dict1_name_remap[id2]] + + if matched_obj: + # sys.stderr.write(f"{id2}->{matched_obj['id']}\n") + # Merge properties from obj2 into matched_obj + matched_obj.properties.update(obj2.properties) + else: + # sys.stderr.write(f"{id2}->None\n") + pass + + diff --git a/arcanalib/__init__.py b/arcanalib/__init__.py index ead43bc..698659a 100644 --- a/arcanalib/__init__.py +++ b/arcanalib/__init__.py @@ -1,2 +1,2 @@ -from .graph import Graph, invert, compose, lift, triplets +from .graph import Graph, Node, Edge, invert, compose, lift, triplets from .pipefilter import Pipeline, Filter diff --git a/arcanalib/graph.py b/arcanalib/graph.py index 0d89981..d3fe0bb 100644 --- a/arcanalib/graph.py +++ b/arcanalib/graph.py @@ -1,4 +1,5 @@ import json +from collections import defaultdict from collections.abc import Iterable from typing import Optional, List, Dict, Union, Set, Tuple @@ -8,363 +9,458 @@ def __init__(self, _id, *labels, **properties): self.id = _id self.labels = set(labels) self.properties = properties - + + # Meta/cache references + self._graph = None # The parent Graph, for on-demand lookups + self._sources_cache = {} # edge_label -> List[Node] + self._targets_cache = {} # edge_label -> List[Node] + + def set_graph(self, graph): + self._graph = graph + self._invalidate_cache() + + def _invalidate_cache(self): + self._sources_cache.clear() + self._targets_cache.clear() + + def has_label(self, label: str) -> bool: + return label in self.labels + + def add_label(self, label: str): + self.labels.add(label) + return self + + def remove_label(self, label: str): + self.labels.discard(label) + return self + + def replace_label(self, old_label: str, new_label: str): + if old_label in self.labels: + self.labels.remove(old_label) + self.labels.add(new_label) + return self + + def has_property(self, key: str) -> bool: + return key in self.properties + + def property(self, key: str, value=None): + if value is None and value is not False: + return self.properties.get(key) + elif value is None: # allow explicit removal if value is None + self.properties.pop(key, None) + else: + self.properties[key] = value + return self + + def sources(self, edge_label: str): + if edge_label not in self._sources_cache: + if not self._graph: + return [] + es = self._graph.edges.get(edge_label, []) + self._sources_cache[edge_label] = [self._graph.nodes[e.source] for e in es if e.target == self.id] + return self._sources_cache[edge_label] + + def targets(self, edge_label: str): + if edge_label not in self._targets_cache: + if not self._graph: + return [] + es = self._graph.edges.get(edge_label, []) + self._targets_cache[edge_label] = [self._graph.nodes[e.target] for e in es if e.source == self.id] + return self._targets_cache[edge_label] + def to_dict(self): - return { - 'data': { - 'id': self.id, - 'labels': list(self.labels), - 'properties': self.properties - } - } - - def __str__(self): + return {'data': {'id': self.id, 'labels': list(self.labels), 'properties': self.properties}} + + def __repr__(self): return json.dumps(self.to_dict()) + class Edge: def __init__(self, source, target, label, **properties): self.id = f'{source}-{label}-{target}' self.source = source self.target = target - self.label = label + self.label_val = label self.properties = properties - + + # Meta/cache references + self._graph = None + self._cached_source_node = None + self._cached_target_node = None + + def set_graph(self, graph): + self._graph = graph + self._cached_source_node = None + self._cached_target_node = None + + def label(self, new_label=None): + if new_label is None: + return self.label_val + else: + self.label_val = new_label + self.id = f'{self.source}-{self.label_val}-{self.target}' + return self + + def property(self, key: str, value=None): + if value is None and value is not False: + return self.properties.get(key) + elif value is None: # remove + self.properties.pop(key, None) + else: + self.properties[key] = value + return self + + def source_node(self): + if self._cached_source_node is None and self._graph is not None: + self._cached_source_node = self._graph.nodes.get(self.source, None) + return self._cached_source_node + + def target_node(self): + if self._cached_target_node is None and self._graph is not None: + self._cached_target_node = self._graph.nodes.get(self.target, None) + return self._cached_target_node + def to_dict(self): - return { - 'data': { - 'id': self.id, - 'source': self.source, - 'target': self.target, - 'label': self.label, - 'properties': self.properties - } - } - - def __str__(self): + return {'data': {'id': self.id, 'source': self.source, 'target': self.target, 'label': self.label_val, + 'properties': self.properties}} + + def __repr__(self): return json.dumps(self.to_dict()) + def invert(edge_list: List[Edge], new_label: Optional[str] = None) -> List[Edge]: - """ - Inverts the direction of edges in the given edge list. - - Args: - edge_list (list): A list of edges to invert. - new_label (str, optional): A new label for the inverted edges. Defaults to None. - - Returns: - list: A list of inverted edges with updated labels. - """ - return [ - Edge( - source=edge.target, - target=edge.source, - label=new_label if new_label else f"inv_{edge.label}", - **edge.properties) - for edge in edge_list - ] + aggregated = [] + for edge in edge_list: + lbl = new_label if new_label else f"inv_{edge.label_val}" + e = Edge(source=edge.target, target=edge.source, label=lbl, **edge.properties) + aggregated.append(e) + return aggregated + def compose(edges1: List[Edge], edges2: List[Edge], new_label: Optional[str] = None) -> List[Edge]: - """ - Composes two lists of edges. - - Args: - edges1 (list): The first list of edges. - edges2 (list): The second list of edges. - new_label (str, optional): A new label for the composed edges. Defaults to None. - - Returns: - list: A list of composed edges. - """ - mapping = { - edge.source: { - 'target': edge.target, - 'label': edge.label, - 'weight': edge.properties.get('weight', 1) - } - for edge in edges2 - } - composed_edges = [] + mapping = defaultdict(list) + for edge in edges2: + w = edge.properties.get('weight', 1) + mapping[edge.source].append({'target': edge.target, 'label': edge.label_val, 'weight': w}) + + aggregated = {} for edge in edges1: + w1 = edge.properties.get('weight', 1) if edge.target in mapping: - new_weight = mapping[edge.target]['weight'] * edge.properties.get('weight', 1) - composed_edge = Edge( - source=edge.source, - target=mapping[edge.target]['target'], - label=new_label if new_label else f"{edge.label},{mapping[edge.target]['label']}", - weight=new_weight - ) - composed_edges.append(composed_edge) - return composed_edges + for m in mapping[edge.target]: + new_w = w1 * m['weight'] + key = f"{edge.source}-{m['target']}" + if key not in aggregated: + lbl = new_label if new_label else f"{edge.label_val}-{m['label']}" + e = Edge(source=edge.source, target=m['target'], label=lbl, weight=new_w) + aggregated[key] = e + else: + aggregated[key].properties['weight'] += new_w + return list(aggregated.values()) def lift(edges1: List[Edge], edges2: List[Edge], new_label: Optional[str] = None) -> List[Edge]: - """ - Lifts relations by composing two lists of edges and their inverses. - - Args: - edges1 (list): The first list of edges. - edges2 (list): The second list of edges. - new_label (str, optional): A new label for the lifted edges. Defaults to None. - - Returns: - list: A list of lifted edges. - """ return compose(compose(edges1, edges2), invert(edges1), new_label) def triplets(edge_list1: List[Edge], edge_list2: List[Edge]) -> Set[Tuple[str, str, str]]: - source_mapping = {edge.target: edge.source for edge in edge_list1} + source_mapping = defaultdict(list) + for edge in edge_list1: + source_mapping[edge.target].append(edge.source) paths = set() for edge in edge_list2: if edge.source in source_mapping: - source1 = source_mapping[edge.source] - triplet = (source1, edge.source, edge.target) - paths.add(triplet) - + sources = source_mapping[edge.source] + for source1 in sources: + paths.add((source1, edge.source, edge.target)) return paths class Graph: - """ - A class to represent a graph with nodes and edges. - - Attributes: - nodes (dict): A dictionary of nodes. - edges (dict): A dictionary of edges categorized by labels. - """ - - def __init__(self, graph_data: dict) -> None: - """ - Initializes the Graph with nodes and edges from the provided data. - - Args: - graph_data (dict): A dictionary containing graph data with nodes and edges. - """ - self.nodes: Dict[str, Node] = { - node['data']['id']: Node(node['data']['id'], *node['data']['labels'], **node['data']['properties']) - for node in graph_data['elements']['nodes'] - } + def __init__(self, graph_data: dict = None) -> None: + if not graph_data: + self.nodes: Dict[str, Node] = {} + self.edges: Dict[str, List[Edge]] = {} + return + + self.nodes: Dict[str, Node] = {} + for node_data in graph_data['elements']['nodes']: + n = Node(node_data['data']['id'], *node_data['data']['labels'], **node_data['data']['properties']) + self.nodes[n.id] = n + self.edges: Dict[str, List[Edge]] = {} - for edge in graph_data['elements']['edges']: - edge_data = edge['data'] - edge_obj = Edge(edge_data['source'], edge_data['target'], edge_data['label'], **edge_data['properties']) - if edge_obj.label not in self.edges: - self.edges[edge_obj.label] = [] - self.edges[edge_obj.label].append(edge_obj) + for edge_data in graph_data['elements']['edges']: + d = edge_data['data'] + e = Edge(d['source'], d['target'], d['label'], **d['properties']) + if e.label_val not in self.edges: + self.edges[e.label_val] = [] + self.edges[e.label_val].append(e) + + self._set_graph_refs() + + def _set_graph_refs(self): + for node in self.nodes.values(): + node.set_graph(self) + for elist in self.edges.values(): + for edge in elist: + edge.set_graph(self) + + def add_node(self, _id: str, *labels, **properties) -> Optional[Node]: + if _id in self.nodes: + return None + n = Node(_id, *(labels or []), **(properties or {})) + self.nodes[_id] = n + n.set_graph(self) + return n + + def add_edge(self, source_id: str, target_id: str, edge_label: str, **properties) -> Optional[Edge]: + if source_id not in self.nodes or target_id not in self.nodes: + return None + if self.find_edges(label=edge_label, where_source=lambda n: n.id == source_id, + where_target=lambda n: n.id == target_id): + return None + + e = Edge(source_id, target_id, edge_label, **(properties or {})) + if edge_label not in self.edges: + self.edges[edge_label] = [] + self.edges[edge_label].append(e) + e.set_graph(self) + + # Invalidate caching for the involved nodes + self.nodes[source_id]._invalidate_cache() + self.nodes[target_id]._invalidate_cache() + + return e def invert_edges(self, edge_label: str, new_label: Optional[str] = None) -> None: - """ - Inverts the edges with the specified label and saves them under a new label. - - Args: - edge_label (str): The label of the edges to invert. - new_label (str, optional): The label for the inverted edges. Defaults to None. - """ if edge_label in self.edges: - inverted = invert(self.edges[edge_label], new_label) - new_label = new_label or f"inv_{edge_label}" - self.edges[new_label] = inverted + inverted = invert(self.edges.get(edge_label, []), new_label) + nlabel = new_label or f"inv_{edge_label}" + self.edges[nlabel] = inverted + self._set_graph_refs() def compose_edges(self, edge_label1: str, edge_label2: str, new_label: Optional[str] = None) -> None: - """ - Composes edges with the specified labels and saves them under a new label. - - Args: - edge_label1 (str): The label of the first list of edges. - edge_label2 (str): The label of the second list of edges. - new_label (str, optional): The label for the composed edges. Defaults to None. - """ if (edge_label1 in self.edges) and (edge_label2 in self.edges): - new_label = new_label or f"{edge_label1}_{edge_label2}" - composed = compose(self.edges[edge_label1], self.edges[edge_label2], new_label) - self.edges[new_label] = composed + nlabel = new_label or f"{edge_label1}_{edge_label2}" + composed_list = compose(self.edges.get(edge_label1, []), self.edges.get(edge_label2, []), nlabel) + self.edges[nlabel] = composed_list + self._set_graph_refs() def lift_edges(self, edge_label1: str, edge_label2: str, new_label: Optional[str] = None) -> None: - """ - Lifts relations by composing edges with the specified labels and their inverses, then saves them under a new label. - - Args: - edge_label1 (str): The label of the first list of edges. - edge_label2 (str): The label of the second list of edges. - new_label (str, optional): The label for the lifted edges. Defaults to None. - """ if (edge_label1 in self.edges) and (edge_label2 in self.edges): - lifted = lift(self.edges[edge_label1], self.edges[edge_label2], new_label) - new_label = new_label or f"lifted_{edge_label1}_{edge_label2}" - self.edges[new_label] = lifted + lifted_list = lift(self.edges.get(edge_label1, []), self.edges.get(edge_label2, []), new_label) + nlabel = new_label or f"lifted_{edge_label1}_{edge_label2}" + self.edges[nlabel] = lifted_list + self._set_graph_refs() def filter_nodes_by_labels(self, labels: Union[List[str], Set[str]]) -> Dict[str, Node]: - """ - Filters nodes by the specified labels. - - Args: - labels (list or set): A list of labels to filter nodes by. - - Returns: - dict: A dictionary of filtered nodes. - """ - return { - key: node - for key, node in self.nodes.items() - if any(label in labels for label in node.labels) - } + return {k: v for k, v in self.nodes.items() if any(label in v.labels for label in labels)} def get_all_node_labels(self) -> Set[str]: - """ - Retrieves all unique node labels present in the graph. - - Returns: - set: A set of all node labels. - """ - return { - label - for node in self.nodes.values() - for label in node.labels - } + return {label for node in self.nodes.values() for label in node.labels} def get_all_edge_labels(self) -> Set[str]: - """ - Retrieves all unique edge labels present in the graph. - - Returns: - set: A set of all edge labels. - """ return set(self.edges.keys()) def get_edges_with_node_labels(self, edge_label: str, node_label: str) -> List[Edge]: - """ - Retrieves edges whose source and target nodes have the specified labels. - - Args: - edge_label (str): The label of the edges to retrieve. - node_label (str): The label of the nodes to filter by. - - Returns: - list: A list of edges that match the criteria. - """ if edge_label in self.edges: - return [ - edge - for edge in self.edges[edge_label] - if (node_label in self.nodes[edge.source].labels) - and (node_label in self.nodes[edge.target].labels) - ] + return [edge for edge in self.edges.get(edge_label, []) if + node_label in self.nodes.get(edge.source, Node(None)).labels and node_label in self.nodes.get( + edge.target, Node(None)).labels] return [] def get_edge_node_labels(self, edge: Edge) -> List[Tuple[str, str]]: - """ - Retrieves the labels of the source and target nodes for a given edge. - - Args: - edge (Edge): The edge to retrieve node labels for. - - Returns: - list: A list of tuples containing source and target node labels. - """ source_labels = self.nodes.get(edge.source, Node(None)).labels target_labels = self.nodes.get(edge.target, Node(None)).labels - return [ - (source_label, target_label) - for source_label in source_labels - for target_label in target_labels - ] - - def get_source_and_target_labels(self, edge_label: str) -> Set[str]: - """ - Retrieves the set of source and target labels for a given list of edges. - - Args: - edge_label (str): The label of the edges to retrieve labels for. - - Returns: - set: A set of source and target labels. - """ - edge_node_labels: Set[str] = { - label - for edge in self.edges[edge_label] - for label in self.get_edge_node_labels(edge) - } - return edge_node_labels - - def generate_ontology(self) -> Dict[str, Set[str]]: - """ - Generates an ontology from the graph's edges and nodes. - - Returns: - dict: A dictionary representing the ontology. - """ - return { - label: self.get_source_and_target_labels(label) - for label in self.edges - } + return [(sl, tl) for sl in source_labels for tl in target_labels] + + def get_source_and_target_labels(self, edge_label: str) -> Set[Tuple[str, str]]: + if edge_label not in self.edges: + return set() + return {(sl, tl) for e in self.edges.get(edge_label, []) for (sl, tl) in self.get_edge_node_labels(e)} + + def generate_ontology(self) -> 'Graph': + ontology_map = {label: self.get_source_and_target_labels(label) for label in self.edges} + onto_graph = Graph() + onto_graph.edges = {lbl: [Edge(src, tgt, lbl) for (src, tgt) in ontology_map[lbl]] for lbl in ontology_map} + sources = {src for lbl in ontology_map for (src, _) in ontology_map[lbl]} + targets = {tgt for lbl in ontology_map for (_, tgt) in ontology_map[lbl]} + all_ids = sources.union(targets) + onto_graph.nodes = {i: Node(i, i) for i in all_ids} + onto_graph._set_graph_refs() + return onto_graph def find_nodes(self, label=None, where=None) -> List[Node]: - return [node for node in self.nodes.values() if (not label or label in node.labels) and (not where or where(node))] - - def find_edges(self, label=None, source_label=None, target_label=None, where_edge=None, where_source=None, where_target=None): + return [node for node in self.nodes.values() if + (not label or label in node.labels) and (not where or where(node))] + + def find_node(self, label=None, where=None) -> Optional[Node]: + nodes = self.find_nodes(label, where) + if nodes: + return nodes[0] + return None + + def find_edge(self, label=None, source_label=None, target_label=None, where_edge=None, where_source=None, + where_target=None): + edges = self.find_edges(label, source_label, target_label, where_edge, where_source, where_target) + if edges: + return edges[0] + return None + + def find_edges(self, label=None, source_label=None, target_label=None, where_edge=None, where_source=None, + where_target=None): if label: edge_list = self.edges.get(label, []) else: - edge_list = [edge for edges in self.edges.values() for edge in edges] - - return [ - edge for edge in edge_list - if (not source_label or source_label in self.nodes[edge.source].labels) - and (not target_label or target_label in self.nodes[edge.target].labels) - and (not where_edge or where_edge(edge)) - and (not where_source or where_source(self.nodes[edge.source])) - and (not where_target or where_target(self.nodes[edge.target])) - ] + edge_list = [e for edges in self.edges.values() for e in edges] + + return [e for e in edge_list if (not source_label or source_label in self.nodes[e.source].labels) and ( + not target_label or target_label in self.nodes[e.target].labels) and ( + not where_edge or where_edge(e)) and ( + not where_source or where_source(self.nodes[e.source])) and ( + not where_target or where_target(self.nodes[e.target]))] + + def find_source(self, edge_list: List[Edge], start_node: Node, predicate, default: Node = None): + predecessors = defaultdict(list) + for e in edge_list: + predecessors[e.target].append(e.source) + visited = set() + stack = [start_node.id] + + while stack: + current = stack.pop() + if current in visited: + continue + visited.add(current) + if predicate(self.nodes[current]): + return self.nodes[current] + stack.extend(predecessors[current]) + return default + + @staticmethod + def _adj_list(edge_list: List[Edge]): + adj_list = {} + outdegree = {} + for e in edge_list: + s, t = e.source, e.target + if s not in adj_list: + adj_list[s] = [] + if s not in outdegree: + outdegree[s] = 0 + if t not in adj_list: + adj_list[t] = [] + if t not in outdegree: + outdegree[t] = 0 + adj_list[s].append(t) + outdegree[s] += 1 + return adj_list, outdegree + + @staticmethod + def _outdeg_leaf_nodes(outdegree): + return [n for n, c in outdegree.items() if c == 0] + + def process_nodes(self, edges: List[Edge], node_processor): + adj_list, outdegree = Graph._adj_list(edges) + results = {} + queue = Graph._outdeg_leaf_nodes(outdegree) + while queue: + next_queue = [] + for n_id in queue: + dependencies = adj_list.get(n_id, []) + resolved = {dep: results[dep] for dep in dependencies if dep in results} + results[n_id] = node_processor(self.nodes[n_id], resolved) + for caller, targets in adj_list.items(): + if n_id in targets: + outdegree[caller] -= 1 + if outdegree[caller] == 0: + next_queue.append(caller) + queue = next_queue + for n_id, deg in outdegree.items(): + if deg > 0 and n_id not in results: + dependencies = adj_list.get(n_id, []) + resolved = {dep: results[dep] for dep in dependencies if dep in results} + results[n_id] = node_processor(self.nodes[n_id], resolved) + return results + + @staticmethod + def toposorted_nodes(edges: List[Edge], nodes: List[Node] = None): + adj_list, outdegree = Graph._adj_list(edges) + sorted_nodes = [] + node_deps = {} + queue = Graph._outdeg_leaf_nodes(outdegree) + + while queue: + next_queue = [] + for n_id in queue: + dependencies = adj_list.get(n_id, []) + sorted_nodes.append(n_id) + node_deps[n_id] = dependencies + for caller, targets in adj_list.items(): + if n_id in targets: + outdegree[caller] -= 1 + if outdegree[caller] == 0: + next_queue.append(caller) + queue = next_queue + + for n_id, deg in outdegree.items(): + if deg > 0 and n_id not in sorted_nodes: + dependencies = adj_list.get(n_id, []) + sorted_nodes.append(n_id) + node_deps[n_id] = dependencies + + nodes = nodes or list() + for node in nodes: + if node.id not in sorted_nodes: + sorted_nodes.insert(0, node.id) + node_deps[node.id] = [] + + return sorted_nodes, node_deps def clean_up(self): for edge_type in list(self.edges.keys()): - self.edges[edge_type] = [ - edge for edge in self.edges[edge_type] - if edge.source in self.nodes and edge.target in self.nodes - ] - - def __str__(self): + self.edges[edge_type] = [e for e in self.edges.get(edge_type, []) if + e.source in self.nodes and e.target in self.nodes] + + def find_paths(self, *edge_sequence: str) -> List[List[Edge]]: + def get_edges(label: str) -> List[Edge]: + if label.startswith('-'): + base_label = label[1:] + if base_label in self.edges: + return invert(self.edges.get(base_label, [])) + return [] + return self.edges.get(label, []) + + def find_next(current_paths: List[List[Edge]], label: str) -> List[List[Edge]]: + result = [] + for path in current_paths: + last_node = path[-1].target if path else None + for candidate in get_edges(label): + if not path or candidate.source == last_node: + result.append(path + [candidate]) + return result + + paths = [[]] + for lbl in edge_sequence: + paths = find_next(paths, lbl) + return paths + + def __repr__(self): return json.dumps(self.to_dict()) def to_dict(self, *args: str, node_labels: Optional[Union[str, Iterable[str]]] = None) -> dict: - """ - Converts the graph into a dictionary format with specified edge and node labels. - - Args: - *args: Variable length argument list of edge labels to include. - node_labels (str or iterable, optional): Labels of nodes to include. Defaults to None. - - Returns: - dict: A dictionary representation of the graph with specified elements. - """ included_edge_labels = list(args) if args else list(self.edges.keys()) - if node_labels == 'all': included_node_labels = self.get_all_node_labels() else: - included_node_labels: Set[str] = { - node_label - for edge_label in included_edge_labels - for node_label_pair in self.get_source_and_target_labels(edge_label) - for node_label in node_label_pair - } + included_node_labels: Set[str] = {nlbl for elbl in included_edge_labels for nlbl_pair in + self.get_source_and_target_labels(elbl) for nlbl in nlbl_pair} if isinstance(node_labels, str): included_node_labels.add(node_labels) elif isinstance(node_labels, Iterable): included_node_labels.update(node_labels) - included_nodes: Dict[str, Node] = self.filter_nodes_by_labels(included_node_labels) - - included_edges: Dict[str, List[Edge]] = { - label: edge_list - for label, edge_list in self.edges.items() - if label in included_edge_labels - } - - return { - "elements": { - "nodes": [{"data": node.to_dict()['data']} for node in included_nodes.values()], - "edges": [{"data": edge.to_dict()['data']} for edge in sum(included_edges.values(), [])] - } - } + included_nodes = {k: v for k, v in self.filter_nodes_by_labels(included_node_labels).items()} + included_edges = {lbl: eds for lbl, eds in self.edges.items() if lbl in included_edge_labels} + return {"elements": {"nodes": [{"data": n.to_dict()['data']} for n in included_nodes.values()], + "edges": [{"data": e.to_dict()['data']} for e in sum(included_edges.values(), [])]}} diff --git a/arcanalib/pipefilter.py b/arcanalib/pipefilter.py index 46a0869..4e5fcf2 100644 --- a/arcanalib/pipefilter.py +++ b/arcanalib/pipefilter.py @@ -1,9 +1,10 @@ +from abc import ABC, abstractmethod from typing import Any, Dict, List, Union from arcanalib.graph import Graph -class Filter: +class Filter(ABC): def __init__(self, config: Dict[str, Dict[str, Any]]) -> None: """ Initialize the filter with a configuration. @@ -12,6 +13,7 @@ def __init__(self, config: Dict[str, Dict[str, Any]]) -> None: """ self.config = config + @abstractmethod def process(self, data: Graph) -> Any: """ Process the data. This method should be implemented by subclasses. @@ -19,36 +21,35 @@ def process(self, data: Graph) -> Any: :param data: The input data to be processed. :return: The processed data. """ - raise NotImplementedError("Subclasses must implement this method") + raise NotImplementedError class EndFilter(Filter): - """ - A special type of filter that marks the end of the pipeline processing. - """ + """A special filter that marks the end of pipeline processing.""" pass -class Seeder: +class Seeder(ABC): """ A class that generates graph data. """ + @abstractmethod def generate(self) -> Graph: """ Generate graph data. This method should be implemented by subclasses. """ - raise NotImplementedError("Subclasses must implement this method") + raise NotImplementedError class Pipeline: - def __init__(self, *args: Filter) -> None: - self.filters: List[Filter] = list(args) + def __init__(self, *filters: Filter) -> None: + self.filters: List[Filter] = list(filters) - def add_filter(self, filter: Filter) -> None: - self.filters.append(filter) + def add_filter(self, filt: Filter) -> None: + self.filters.append(filt) - def process(self, data: Union[Graph, Seeder]) -> Any: + def process(self, data: Union[Graph,Seeder]) -> Any: """ Process the data through the sequence of filters in the pipeline. If a seeder is provided instead of graph data, use the seeder to generate the graph data. @@ -56,13 +57,10 @@ def process(self, data: Union[Graph, Seeder]) -> Any: :param data: The input data to be processed or a seeder to generate the data. :return: The processed data. """ - # If a seeder is provided, use it to generate the graph data - if isinstance(data, Seeder): - data = data.generate() - - # sys.stderr.write(f"Graph stats: {len(data.nodes)} nodes, {len(data.edges)} edge types.") - for filter in self.filters: - data = filter.process(data) - if isinstance(filter, EndFilter): + d = data.generate() if isinstance(data, Seeder) else data + + for filt in self.filters: + d = filt.process(d) + if isinstance(filt, EndFilter): break - return data + return d diff --git a/config.ini.example b/config.ini.example index f6b02cb..5d3adfa 100644 --- a/config.ini.example +++ b/config.ini.example @@ -1,15 +1,67 @@ [project] name=zxing-3.5.3 desc=a Java application +# The input directory to be parsed, or the path to an existing graph JSON file if using just the LLM filter. +# If 'input' is missing or 'stdin', it reads from standard input. input=D:/Code/zxing-zxing-3.5.3/core/src/main/java/ +# The final JSON graph output path. If missing or 'stdout', prints to standard output. output=zxing-3.5.3-output.json +[seeder] +# Command string to invoke the extractor. Placeholders like {input}, {name}, and other keys will be formatted. +command={javaexe} -jar {jarfile} -i {input} -a -n {name} -f json +javaexe=./javapers/jdk-17.0.11+9-jre/bin/java.exe +jarfile=./javapers/javapers-1.1.2-jar-with-dependencies.jar + +[merge] +# Specifies the input graph for the merge_filter to process +input=another-graph-to-merge-with.json + [llm] apikey=example-apikey +# Optional custom base URL for local LLMs (e.g. Ollama, LMStudio) apibase=http://localhost:8000/v1 +# Name of the model. Defaults to 'gpt-4o-mini' if not specified. model=llama3 +# HTTP request timeout in seconds. Defaults to 300 (5 minutes). +timeout=120.0 +# Whether to use OpenAI structured outputs (requires a compatible model). Defaults to true. +use_structured_output=true +# Maximum number of parallel worker threads for LLM API calls. Defaults to 8. +workers=8 +# Path to the file where checkpoints are saved. Defaults to 'checkpoints.jsonl'. +checkpoint_file=checkpoints.jsonl +# Set to true/1/yes/on to resume processing from the checkpoint_file. Defaults to false. +resume=true -[seeder] -command={javaexe} -jar {jarfile} -i {input} -a -n {name} -f json -javaexe=./javapers/jdk-17.0.11+9-jre/bin/java.exe -jarfile=./javapers/javapers-1.1.2-jar-with-dependencies.jar +[layers] +# Defines architectural layers used during categorization +layer1name=UI +layer1desc=Handles user interface, such as instatiating, setting properties of, or laying out widget objects and capturing user interactions. +layer2name=Logic +layer2desc=Handles application and domain logic, i.e., neither UI nor data access. +layer3name=Data +layer3desc=Handles loading and storing data from/to external services, including database systems, web services, filesystems, hardware, etc. + +[stereotypes] +# Similar to [layers], defines architectural role stereotypes for components/types +stereo1name=Controller +stereo1desc=A component that orchestrates interactions and controls the flow of data. +stereo2name=Repository +stereo2desc=A component that abstracts data access from the underlying storage. + +[secdfd] +# Enables or disables the Security Data Flow Diagram processing step. Defaults to false. +enabled=true +# Float threshold for determining component categories based on semantic similarity/LLM label. Defaults to 0.60. +label_score_threshold=0.60 +# Heuristic rules thresholds for DFD type classification +process_min_out_invokes=2 +process_min_in_invokes=2 +asset_sensitive_term_hit_min=1 +datastore_crud_hit_min=1 +external_entity_max_participation=2 + +[stereocode] +# Enables or disables stereo-code generation behavior/classification. Defaults to false. +enabled=true diff --git a/legacy/arvisaninator.ipynb b/legacy/arvisaninator.ipynb index f5c1b66..12af60d 100644 --- a/legacy/arvisaninator.ipynb +++ b/legacy/arvisaninator.ipynb @@ -75,10 +75,10 @@ "metadata": {}, "outputs": [], "source": [ - "config = read_ini_file('config.ini')\n", + "config = read_ini_file('../config.ini')\n", "project_name = config['project']['name']\n", "project_desc = config['project']['desc']\n", - "ifile = config['project']['input']\n", + "ifile = config['project']['output']\n", "(project_name,project_desc,ifile)" ] }, @@ -371,15 +371,15 @@ { "cell_type": "code", "execution_count": null, + "metadata": { + "collapsed": false + }, "outputs": [], "source": [ "from arcanalib import lift\n", "\n", "edges_calls = edges['calls'] if 'calls' in edges else lift(edges['hasScript'], edges['invokes'])" - ], - "metadata": { - "collapsed": false - } + ] }, { "cell_type": "code", @@ -553,7 +553,7 @@ "outputs": [], "source": [ "# (\"id:ID\",\":LABEL\",\"fullName\",\"simpleName\",\"color\",\"dependencyProfileCategory\",\"cohesion\")\n", - "modules = [(id, 'Module', id, node['properties']['simpleName'], roleStereotypeColors[node['properties'].get('roleStereotype', 'Unknown')], dependencyProfiles.get(id, None), None)\n", + "modules = [(id, 'Module', id, node['properties']['simpleName'], roleStereotypeColors[node['properties'].get('roleStereotype', 'Unknown')], node['properties'].get('dependencyProfile', None), None)\n", " for id,node in nodes.items() if 'Structure' in node['labels'] and id != 'java.lang.String']\n", "\n", "modules" diff --git a/requirements.txt b/requirements.txt index 40a1599..be810f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ -openai>=1.32 -tqdm>=4.66 \ No newline at end of file +openai>=1.50 +pydantic>=2.0 +tqdm>=4.66 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..6218ab3 --- /dev/null +++ b/setup.py @@ -0,0 +1,12 @@ +from setuptools import setup, find_packages + +def read_requirements(): + with open('requirements.txt') as req: + return [line.strip() for line in req if line.strip() and not line.startswith('#')] + +setup( + name="arcana", + version="0.1.0", + packages=find_packages(), + install_requires=read_requirements(), +) diff --git a/update_json.py b/update_json.py new file mode 100644 index 0000000..3b6821a --- /dev/null +++ b/update_json.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +import sys +import json + +if len(sys.argv) != 3: + sys.exit(f"Usage: {sys.argv[0]} ") + +json_file = sys.argv[1] +jsonl_file = sys.argv[2] + +with open(json_file, "r") as f: + data = json.load(f) + +elements = data.get("elements", {}) +nodes = elements.get("nodes", []) +edges = elements.get("edges", []) + +node_map = {node["data"]["id"]: node for node in nodes if "data" in node and "id" in node["data"]} +edge_map = {} +for edge in edges: + edata = edge.get("data", {}) + if all(k in edata for k in ("source", "target", "label")): + key = (edata["source"], edata["target"], edata["label"]) + edge_map[key] = edge + +with open(jsonl_file, "r") as f: + for line in f: + line = line.strip() + if not line: + continue + entry = json.loads(line) + edata = entry.get("data", {}) + if all(k in edata for k in ("source", "target", "label")): + key = (edata["source"], edata["target"], edata["label"]) + new_props = edata.get("properties", {}) + if key in edge_map: + curr_props = edge_map[key]["data"].get("properties", {}) + curr_props.update(new_props) + edge_map[key]["data"]["properties"] = curr_props + else: + edges.append(entry) + edge_map[key] = entry + elif "id" in edata: + node_id = edata["id"] + new_props = edata.get("properties", {}) + if node_id in node_map: + curr_props = node_map[node_id]["data"].get("properties", {}) + curr_props.update(new_props) + node_map[node_id]["data"]["properties"] = curr_props + else: + nodes.append(entry) + node_map[node_id] = entry + +print(json.dumps(data, indent=4))