diff --git a/act/workers/attack.py b/act/workers/attack.py index 0969aa2..0803dd6 100755 --- a/act/workers/attack.py +++ b/act/workers/attack.py @@ -6,22 +6,31 @@ import os import sys import traceback +from collections import defaultdict from logging import error, info, warning -from typing import Dict, List, Optional, Text, Union +from typing import Any, Dict, List, Optional, Text, Union import act.api +import attampck.data from act.api.helpers import Act, handle_fact, handle_facts from act.api.libs import cli -from pyattck import Attck +from attampck import matrices, stixmap +from attampck.data import resolve_mitre_id +from stix2 import (AttackPattern, Filter, IntrusionSet, Malware, Relationship, + Tool) from act.workers.libs import worker -# This is only for typing -# We can not import these matrices, because then they be loaded -# by pyattack and it fails if you need proxy -AttckMatrice = Union["Enterprise", "ICS", "MobileAttck"] +Software = Union[Tool, Malware] +Attack = Union[Software, AttackPattern, IntrusionSet, Malware, Tool] + + +MITRE_TYPES = [ + matrices.ENTERPRISE, + matrices.ICS, + matrices.MOBILE, +] -MITRE_TYPES = ["enterprise", "ics", "mobile"] DEFAULT_NOTIFY_CACHE = os.path.join(os.environ["HOME"], "act-mitre-attack-notify.cache") @@ -51,7 +60,7 @@ def parseargs() -> argparse.ArgumentParser: parser.add_argument( "--type", choices=list(MITRE_TYPES), - help="Specify a single type to download (enterprise, mobile or pre). Default is to fetch all", + help="Specify a single type to download. Default is to fetch all", ) parser.add_argument( "--notifycache", @@ -63,7 +72,7 @@ def parseargs() -> argparse.ArgumentParser: return parser -def deprecated_or_revoked(obj): +def deprecated_or_revoked(obj: Any) -> Optional[bool]: """ Return true if object has a truthy "revoked" or "deprecated" attribute, otherwise False @@ -73,10 +82,11 @@ def deprecated_or_revoked(obj): def handle_techniques( client: Act, - technique: "AttckTechnique", - main_technique: Optional["AttckTechnique"], + technique: AttackPattern, + main_technique: Optional[AttackPattern], + tactic_id_map: Dict[Text, Text], output_format: Text = "json", -) -> List: +) -> List[AttackPattern]: """ Args: @@ -86,6 +96,8 @@ def handle_techniques( output_format (str): Fact output if sent to stdout (text | json) """ + technique_id = resolve_mitre_id(technique) + if deprecated_or_revoked(technique): # Object is revoked/deprecated, add to notification list but do not add to facts that should be added to the platform return [technique] @@ -93,79 +105,127 @@ def handle_techniques( if main_technique: handle_fact( client.fact("subTechniqueOf") - .source("technique", technique.id) - .destination("technique", main_technique.id), + .source("technique", technique_id) + .destination("technique", resolve_mitre_id(main_technique)), output_format=output_format, ) handle_fact( - client.fact("name", technique.name).source("technique", technique.id), + client.fact("name", technique.name).source("technique", technique_id), output_format=output_format, ) # Mitre ATT&CK Tactics are implemented in STIX as kill chain phases with kill_chain_name "mitre-attack" - for tactic in technique.tactics: + for phase in technique.kill_chain_phases: + + if phase.kill_chain_name != "mitre-attack": + continue + + tactic_id = tactic_id_map[phase.phase_name] + handle_fact( client.fact("accomplishes") - .source("technique", technique.id) - .destination("tactic", tactic.id), + .source("technique", technique_id) + .destination("tactic", tactic_id), output_format=output_format, ) + return [] + + +def add_tactics( + client: Act, + attack: attampck.data.Attampck, + matrix: matrices, + output_format: Text = "json", +) -> Dict[Text, Text]: + """ + extract objects/facts related to ATT&CK tactics and return map of shortname -> id for tactics + + Args: + attack (matrices): Attack matrice + output_format (Text): "json" or "str" output format + + """ + + shortname_id_map: Dict[Text, Text] = {} + + for tactic in attack.iterate(matrix, stixmap.TACTIC): + tactic_id = resolve_mitre_id(tactic) + + shortname_id_map[tactic.x_mitre_shortname] = tactic_id + handle_fact( - client.fact("name", tactic.name).source("tactic", tactic.id), + client.fact("name", tactic.name).source("tactic", tactic_id), output_format=output_format, ) - return [] + return shortname_id_map def add_techniques( - client: Act, matrice: AttckMatrice, output_format: Text = "json" -) -> List: + client: Act, + attack: attampck.data.Attampck, + matrix: matrices, + tactic_id_map: Dict[Text, Text], + output_format: Text = "json", +) -> List[AttackPattern]: """ extract objects/facts related to ATT&CK techniques Args: - attack (AttckMatrice): Attack matrice - output_format (Text): "json" or "str" output format + attack (matrices): Attack matrice + output_format (Text): "json" or "str" output format """ notify = [] - for technique in matrice.techniques: - notify += handle_techniques(client, technique, None, output_format) + for technique in attack.iterate(matrix, stixmap.TECHNIQUE): + + # Subtechniques are added below + if getattr(technique, "x_mitre_is_subtechnique", False): + continue + + notify += handle_techniques( + client, technique, None, tactic_id_map, output_format + ) - for subtechnique in getattr(technique, "subtechniques", []): - # Pre Attack does not have sub techniques - notify += handle_techniques(client, subtechnique, technique, output_format) + for subtechnique in attack.get_subtechniques(technique.id): + notify += handle_techniques( + client, subtechnique, technique, tactic_id_map, output_format + ) return notify def add_groups( - client: Act, matrice: AttckMatrice, output_format: Text = "json" -) -> List: + client: Act, + attack: attampck.data.Attampck, + matrix: matrices, + uses: Dict[Text, List[Relationship]], + output_format: Text = "json", +) -> List[IntrusionSet]: """ extract objects/facts related to ATT&CK Threat Actors Args: - attack (AttckMatrice): Attack matrice - output_format (Text): "json" or "str" output format + attack (matrices): Attack matrice + output_format (Text): "json" or "str" output format """ - notify: List = [] + notify: List[IntrusionSet] = [] # ICS does not have actors - for actor in getattr(matrice, "actors", []): + for actor in attack.iterate(matrix, stixmap.GROUP): + if deprecated_or_revoked(actor): # Object is revoked, add to notification list but do not add to facts that should be added to the platform notify.append(actor) continue - for alias in actor.alias: + for alias in actor.aliases: if actor.name != alias: handle_fact( client.fact("alias").bidirectional( @@ -177,101 +237,114 @@ def add_groups( output_format=output_format, ) - for tool in actor.known_tools: - - if not tool.strip(): - # Skip empty tools found in ATT&CK - continue - - handle_facts( - act.api.fact.fact_chain( - client.fact("classifiedAs") - .source("content", "*") - .destination("tool", tool), - client.fact("observedIn") - .source("content", "*") - .destination("incident", "*"), - client.fact("attributedTo") - .source("incident", "*") - .destination("threatActor", actor.name), - ), - output_format=output_format, - ) + for target in uses.get(actor.id, []): + + if target.type in ("malware", "tool"): + tool_name = target.name.strip() + + if not tool_name: + # Skip empty tools found in ATT&CK + continue + + handle_facts( + act.api.fact.fact_chain( + client.fact("classifiedAs") + .source("content", "*") + .destination("tool", tool_name), + client.fact("observedIn") + .source("content", "*") + .destination("incident", "*"), + client.fact("attributedTo") + .source("incident", "*") + .destination("threatActor", actor.name), + ), + output_format=output_format, + ) - for technique in actor.techniques: - handle_facts( - act.api.fact.fact_chain( - client.fact("observedIn") - .source("technique", technique.id) - .destination("incident", "*"), - client.fact("attributedTo") - .source("incident", "*") - .destination("threatActor", actor.name), - ), - output_format=output_format, - ) + elif target.type == "attack-pattern": + handle_facts( + act.api.fact.fact_chain( + client.fact("observedIn") + .source("technique", resolve_mitre_id(target)) + .destination("incident", "*"), + client.fact("attributedTo") + .source("incident", "*") + .destination("threatActor", actor.name), + ), + output_format=output_format, + ) return notify def add_software( - client: Act, matrice: AttckMatrice, output_format: Text = "json" -) -> List: + client: Act, + attack: attampck.data.Attampck, + matrix: matrices, + uses: Dict[Text, List[Relationship]], + output_format: Text = "json", +) -> List[Software]: """ extract objects/facts related to ATT&CK Software Insert to ACT if client.baseurl is set, if not, print to stdout - Args: - attack (AttckMatrice): Attack matrice - output_format (Text): "json" or "str" output format + Args: attack (matrices): Attack matrice + output_format (Text): "json" or "str" output format """ - notify: List = [] + notify: List[Software] = [] - # Enterprise matrice has malwares and tools, but preattack has none of them - for software in getattr(matrice, "malwares", []) + getattr(matrice, "tools", []): - if deprecated_or_revoked(software): - # Object is revoked/deprecated, add to notification list but do not add to facts that should be added to the platform - notify.append(software) - continue + for stix_type in (stixmap.TOOL, stixmap.MALWARE): + # Enterprise matrix has malwares and tools, but preattack has none of them + for software in attack.iterate(matrix, stix_type): - tool_name = software.name + if deprecated_or_revoked(software): + # Object is revoked/deprecated, add to notification list but do not add to facts that should be added to the platform + notify.append(software) + continue - # Tool category - handle_fact( - client.fact("category", software.type).source("tool", tool_name), - output_format=output_format, - ) + tool_name = software.name + + # Tool category + handle_fact( + client.fact("category", software.type).source("tool", tool_name), + output_format=output_format, + ) - for alias in software.alias: - alias_name = alias + for alias in getattr(software, "x_mitre_aliases", []): + alias_name = alias + + if tool_name != alias_name: + # Tool category (alias) + handle_fact( + client.fact("category", software.type).source( + "tool", alias_name + ), + output_format=output_format, + ) + handle_fact( + client.fact("alias").bidirectional( + "tool", tool_name, "tool", alias_name + ), + output_format=output_format, + ) + + for target in uses.get(software.id, []): + if target.type != "attack-pattern": + continue - if tool_name != alias_name: - # Tool category (alias) handle_fact( - client.fact("category", software.type).source("tool", alias_name), + client.fact("implements") + .source("tool", software.name) + .destination("technique", resolve_mitre_id(target)), output_format=output_format, ) - handle_fact( - client.fact("alias").bidirectional( - "tool", tool_name, "tool", alias_name - ), - output_format=output_format, - ) - - for technique in software.techniques: - handle_fact( - client.fact("implements") - .source("tool", software.name) - .destination("technique", technique.id), - output_format=output_format, - ) return notify -def notify_cache(filename: str) -> Dict: +def notify_cache(filename: str) -> Dict[Text, bool]: """ Read notify cache from filename Args: @@ -287,9 +360,7 @@ def notify_cache(filename: str) -> Dict: if line: cache[line.strip()] = True except FileNotFoundError: - warning( - "Cache file {} not found, will be created if necessary".format(filename) - ) + warning("Cache file %s not found, will be created if necessary", filename) return cache @@ -309,11 +380,11 @@ def add_to_cache(filename: str, entry: str) -> None: def send_notification( - notify: List, + notify: List[Attack], smtphost: Text, sender: Text, recipient: Text, - model: Text, + matrix: Text, ) -> List[Text]: """ Process revoked objects @@ -339,19 +410,20 @@ def send_notification( ) return [] - body = model + "\n\n" - warning("[{}]".format(model)) + body = matrix + "\n\n" + warning("[%s]", matrix) for obj in notify: + obj_description = f"{obj.id}:{resolve_mitre_id(obj)}:{obj.name}" if getattr(obj, "revoked", None): - text = "revoked: {}:{}".format(obj.id, obj.name) + text = f"revoked: {obj_description}" elif getattr(obj, "deprecated", None): - text = "deprecated: {}:{}".format(obj.id, obj.name) + text = f"deprecated: {obj_description}" else: raise NotificationError( - "object is not deprecated or revoked: {}:{}".format(obj.id, obj.name) + f"object is not deprecated or revoked: {obj.id}:{obj.name}" ) notified.append(obj.id) @@ -366,7 +438,7 @@ def send_notification( "Revoked/deprecated objects from MITRE/ATT&CK", body, ) - info("Email sent to {}".format(recipient)) + info("Email sent to %s", recipient) return notified @@ -386,29 +458,43 @@ def main() -> None: else None ) - attack = Attck(proxies=proxies) + attack = attampck.data.Attampck(proxies=proxies) - types = [args.type] if args.type else MITRE_TYPES + types = ( + [getattr(attampck.matrices, args.type.upper())] if args.type else MITRE_TYPES + ) - for mitre_type in types: - if mitre_type not in MITRE_TYPES: + for matrix in types: + if not matrix: error( - "Unknown mitre type: {}. Valid types: {}".format( - mitre_type, ",".join(MITRE_TYPES) - ) + "Unknown mitre type: %s. Valid types: %s", + matrix, + ",".join(attampck.matrices), ) sys.exit(2) cache = notify_cache(args.notifycache) - model = getattr(attack, mitre_type) + uses = defaultdict(list) + + for rel in attack.get_stix2_memorystore(matrix).query( + [ + Filter("type", "=", "relationship"), + Filter("relationship_type", "=", "uses"), + ] + ): + uses[rel.source_ref].append(attack.get(rel.target_ref)) + + tactic_id_map = add_tactics(actapi, attack, matrix, args.output_format) + techniques_notify = add_techniques( + actapi, attack, matrix, tactic_id_map, args.output_format + ) - techniques_notify = add_techniques(actapi, model, args.output_format) - groups_notify = add_groups(actapi, model, args.output_format) - software_notify = add_software(actapi, model, args.output_format) + groups_notify = add_groups(actapi, attack, matrix, uses, args.output_format) + software_notify = add_software(actapi, attack, matrix, uses, args.output_format) # filter revoked objects from those allready notified - notify = [ + notify: Attack = [ notify for notify in techniques_notify + groups_notify + software_notify if notify.id not in cache @@ -416,7 +502,11 @@ def main() -> None: if notify: notified = send_notification( - notify, args.smtphost, args.sender, args.recipient, mitre_type + notify, + args.smtphost, + args.sender, + args.recipient, + matrix, ) for object_id in notified: @@ -429,7 +519,7 @@ def main_log_error() -> None: try: main() except Exception: - error("Unhandled exception: {}".format(traceback.format_exc())) + error("Unhandled exception: %s", traceback.format_exc()) raise diff --git a/setup.py b/setup.py index 3533be0..7c6b6eb 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ setup( name="act-workers", - version="2.1.3", + version="2.1.4", author="mnemonic AS", zip_safe=True, author_email="opensource@mnemonic.no", @@ -64,7 +64,7 @@ "RashlyOutlaid>=0.19", "virustotal-api", "dateparser", - "pyattck>=5.2.0", + "attampck", ], python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4", classifiers=[