From ffc19dadc4fd4b6d73c8c2f272f351a95a2344a0 Mon Sep 17 00:00:00 2001 From: Fredrik Borg Date: Thu, 6 Oct 2022 16:44:17 +0200 Subject: [PATCH 1/4] use attampck instead of pyattck --- act/workers/attack.py | 343 +++++++++++++++++++++++++++--------------- setup.py | 4 +- 2 files changed, 222 insertions(+), 125 deletions(-) diff --git a/act/workers/attack.py b/act/workers/attack.py index 0969aa2..03f8599 100755 --- a/act/workers/attack.py +++ b/act/workers/attack.py @@ -7,21 +7,29 @@ import sys import traceback from logging import error, info, warning -from typing import Dict, List, Optional, Text, Union +from typing import Any, Dict, List, Optional, Text, Union import act.api +import attampck.data from act.api.helpers import Act, handle_fact, handle_facts from act.api.libs import cli -from pyattck import Attck +from attampck import matrices, stixmap +from attampck.data import resolve_mitre_id +from stix2.datastore.memory import MemoryStore +from stix2.v21.sdo import AttackPattern, IntrusionSet, Malware, Tool from act.workers.libs import worker -# This is only for typing -# We can not import these matrices, because then they be loaded -# by pyattack and it fails if you need proxy -AttckMatrice = Union["Enterprise", "ICS", "MobileAttck"] +Software = Union[Tool, Malware] +Attack = Union[Software, AttackPattern, IntrusionSet, Malware, Tool] + + +MITRE_TYPES = [ + matrices.ENTERPRISE, + matrices.ICS, + matrices.MOBILE, +] -MITRE_TYPES = ["enterprise", "ics", "mobile"] DEFAULT_NOTIFY_CACHE = os.path.join(os.environ["HOME"], "act-mitre-attack-notify.cache") @@ -51,7 +59,7 @@ def parseargs() -> argparse.ArgumentParser: parser.add_argument( "--type", choices=list(MITRE_TYPES), - help="Specify a single type to download (enterprise, mobile or pre). Default is to fetch all", + help="Specify a single type to download. Default is to fetch all", ) parser.add_argument( "--notifycache", @@ -63,7 +71,20 @@ def parseargs() -> argparse.ArgumentParser: return parser -def deprecated_or_revoked(obj): +class MemoryStoreNotFound(Exception): + pass + + +def memorystore(attack: attampck.data.Attampck, matrice: matrices) -> MemoryStore: + mem = getattr(attack, f"{matrice}_memorystore") + + if not mem: + raise MemoryStoreNotFound(f"Memorystore not found: {matrice}_memorystore") + + return mem + + +def deprecated_or_revoked(obj: Any) -> Optional[bool]: """ Return true if object has a truthy "revoked" or "deprecated" attribute, otherwise False @@ -73,10 +94,11 @@ def deprecated_or_revoked(obj): def handle_techniques( client: Act, - technique: "AttckTechnique", - main_technique: Optional["AttckTechnique"], + technique: AttackPattern, + main_technique: Optional[AttackPattern], + tactic_id_map: Dict[Text, Text], output_format: Text = "json", -) -> List: +) -> List[AttackPattern]: """ Args: @@ -86,6 +108,8 @@ def handle_techniques( output_format (str): Fact output if sent to stdout (text | json) """ + technique_id = resolve_mitre_id(technique) + if deprecated_or_revoked(technique): # Object is revoked/deprecated, add to notification list but do not add to facts that should be added to the platform return [technique] @@ -93,79 +117,126 @@ def handle_techniques( if main_technique: handle_fact( client.fact("subTechniqueOf") - .source("technique", technique.id) - .destination("technique", main_technique.id), + .source("technique", technique_id) + .destination("technique", resolve_mitre_id(main_technique)), output_format=output_format, ) handle_fact( - client.fact("name", technique.name).source("technique", technique.id), + client.fact("name", technique.name).source("technique", technique_id), output_format=output_format, ) # Mitre ATT&CK Tactics are implemented in STIX as kill chain phases with kill_chain_name "mitre-attack" - for tactic in technique.tactics: + for phase in technique.kill_chain_phases: + + if phase.kill_chain_name != "mitre-attack": + continue + + tactic_id = tactic_id_map[phase.phase_name] + handle_fact( client.fact("accomplishes") - .source("technique", technique.id) - .destination("tactic", tactic.id), + .source("technique", technique_id) + .destination("tactic", tactic_id), output_format=output_format, ) + return [] + + +def add_tactics( + client: Act, + attack: attampck.data.Attampck, + matrice: matrices, + output_format: Text = "json", +) -> Dict[Text, Text]: + """ + extract objects/facts related to ATT&CK tactics and return map of shortname -> id for tactics + + Args: + attack (matrices): Attack matrice + output_format (Text): "json" or "str" output format + + """ + + shortname_id_map: Dict[Text, Text] = {} + + for tactic in attack.iterate(matrice, stixmap.TACTIC): + tactic_id = resolve_mitre_id(tactic) + + shortname_id_map[tactic.x_mitre_shortname] = tactic_id + handle_fact( - client.fact("name", tactic.name).source("tactic", tactic.id), + client.fact("name", tactic.name).source("tactic", tactic_id), output_format=output_format, ) - return [] + return shortname_id_map def add_techniques( - client: Act, matrice: AttckMatrice, output_format: Text = "json" -) -> List: + client: Act, + attack: attampck.data.Attampck, + matrice: matrices, + tactic_id_map: Dict[Text, Text], + output_format: Text = "json", +) -> List[AttackPattern]: """ extract objects/facts related to ATT&CK techniques Args: - attack (AttckMatrice): Attack matrice - output_format (Text): "json" or "str" output format + attack (matrices): Attack matrice + output_format (Text): "json" or "str" output format """ notify = [] - for technique in matrice.techniques: - notify += handle_techniques(client, technique, None, output_format) + for technique in attack.iterate(matrice, stixmap.TECHNIQUE): - for subtechnique in getattr(technique, "subtechniques", []): - # Pre Attack does not have sub techniques - notify += handle_techniques(client, subtechnique, technique, output_format) + # Subtechniques are added below + if getattr(technique, "x_mitre_is_subtechnique", False): + continue + + notify += handle_techniques( + client, technique, None, tactic_id_map, output_format + ) + + for subtechnique in attack.get_subtechniques(technique.id): + notify += handle_techniques( + client, subtechnique, technique, tactic_id_map, output_format + ) return notify def add_groups( - client: Act, matrice: AttckMatrice, output_format: Text = "json" -) -> List: + client: Act, + attack: attampck.data.Attampck, + matrice: matrices, + output_format: Text = "json", +) -> List[IntrusionSet]: """ extract objects/facts related to ATT&CK Threat Actors Args: - attack (AttckMatrice): Attack matrice - output_format (Text): "json" or "str" output format + attack (matrices): Attack matrice + output_format (Text): "json" or "str" output format """ - notify: List = [] + notify: List[IntrusionSet] = [] # ICS does not have actors - for actor in getattr(matrice, "actors", []): + for actor in attack.iterate(matrice, stixmap.GROUP): + if deprecated_or_revoked(actor): # Object is revoked, add to notification list but do not add to facts that should be added to the platform notify.append(actor) continue - for alias in actor.alias: + for alias in actor.aliases: if actor.name != alias: handle_fact( client.fact("alias").bidirectional( @@ -177,101 +248,118 @@ def add_groups( output_format=output_format, ) - for tool in actor.known_tools: - - if not tool.strip(): - # Skip empty tools found in ATT&CK - continue - - handle_facts( - act.api.fact.fact_chain( - client.fact("classifiedAs") - .source("content", "*") - .destination("tool", tool), - client.fact("observedIn") - .source("content", "*") - .destination("incident", "*"), - client.fact("attributedTo") - .source("incident", "*") - .destination("threatActor", actor.name), - ), - output_format=output_format, - ) + for uses in memorystore(attack, matrice).related_to( + actor, relationship_type="uses" + ): + + if uses.type in ("malware", "tool"): + tool_name = uses.name.strip() + + if not tool_name: + # Skip empty tools found in ATT&CK + continue + + handle_facts( + act.api.fact.fact_chain( + client.fact("classifiedAs") + .source("content", "*") + .destination("tool", tool_name), + client.fact("observedIn") + .source("content", "*") + .destination("incident", "*"), + client.fact("attributedTo") + .source("incident", "*") + .destination("threatActor", actor.name), + ), + output_format=output_format, + ) - for technique in actor.techniques: - handle_facts( - act.api.fact.fact_chain( - client.fact("observedIn") - .source("technique", technique.id) - .destination("incident", "*"), - client.fact("attributedTo") - .source("incident", "*") - .destination("threatActor", actor.name), - ), - output_format=output_format, - ) + elif uses.type == "attack-pattern": + handle_facts( + act.api.fact.fact_chain( + client.fact("observedIn") + .source("technique", resolve_mitre_id(uses)) + .destination("incident", "*"), + client.fact("attributedTo") + .source("incident", "*") + .destination("threatActor", actor.name), + ), + output_format=output_format, + ) return notify def add_software( - client: Act, matrice: AttckMatrice, output_format: Text = "json" -) -> List: + client: Act, + attack: attampck.data.Attampck, + matrice: matrices, + output_format: Text = "json", +) -> List[Software]: """ extract objects/facts related to ATT&CK Software Insert to ACT if client.baseurl is set, if not, print to stdout Args: - attack (AttckMatrice): Attack matrice - output_format (Text): "json" or "str" output format + attack (matrices): Attack matrice + output_format (Text): "json" or "str" output format """ - notify: List = [] + notify: List[Software] = [] - # Enterprise matrice has malwares and tools, but preattack has none of them - for software in getattr(matrice, "malwares", []) + getattr(matrice, "tools", []): - if deprecated_or_revoked(software): - # Object is revoked/deprecated, add to notification list but do not add to facts that should be added to the platform - notify.append(software) - continue + for stix_type in (stixmap.TOOL, stixmap.MALWARE): + # Enterprise matrice has malwares and tools, but preattack has none of them + for software in attack.iterate(matrice, stix_type): - tool_name = software.name + if deprecated_or_revoked(software): + # Object is revoked/deprecated, add to notification list but do not add to facts that should be added to the platform + notify.append(software) + continue - # Tool category - handle_fact( - client.fact("category", software.type).source("tool", tool_name), - output_format=output_format, - ) + tool_name = software.name + + # Tool category + handle_fact( + client.fact("category", software.type).source("tool", tool_name), + output_format=output_format, + ) - for alias in software.alias: - alias_name = alias + for alias in getattr(software, "x_mitre_aliases", []): + alias_name = alias + + if tool_name != alias_name: + # Tool category (alias) + handle_fact( + client.fact("category", software.type).source( + "tool", alias_name + ), + output_format=output_format, + ) + handle_fact( + client.fact("alias").bidirectional( + "tool", tool_name, "tool", alias_name + ), + output_format=output_format, + ) + + for technique in memorystore(attack, matrice).related_to( + software, relationship_type="uses" + ): + if technique.type != "attack-pattern": + continue - if tool_name != alias_name: - # Tool category (alias) handle_fact( - client.fact("category", software.type).source("tool", alias_name), + client.fact("implements") + .source("tool", software.name) + .destination("technique", resolve_mitre_id(technique)), output_format=output_format, ) - handle_fact( - client.fact("alias").bidirectional( - "tool", tool_name, "tool", alias_name - ), - output_format=output_format, - ) - - for technique in software.techniques: - handle_fact( - client.fact("implements") - .source("tool", software.name) - .destination("technique", technique.id), - output_format=output_format, - ) return notify -def notify_cache(filename: str) -> Dict: +def notify_cache(filename: str) -> Dict[Text, bool]: """ Read notify cache from filename Args: @@ -309,11 +397,11 @@ def add_to_cache(filename: str, entry: str) -> None: def send_notification( - notify: List, + notify: List[Attack], smtphost: Text, sender: Text, recipient: Text, - model: Text, + matrice: Text, ) -> List[Text]: """ Process revoked objects @@ -339,15 +427,17 @@ def send_notification( ) return [] - body = model + "\n\n" - warning("[{}]".format(model)) + body = matrice + "\n\n" + warning("[{}]".format(matrice)) for obj in notify: if getattr(obj, "revoked", None): - text = "revoked: {}:{}".format(obj.id, obj.name) + text = "revoked: {}:{}:{}".format(obj.id, resolve_mitre_id(obj), obj.name) elif getattr(obj, "deprecated", None): - text = "deprecated: {}:{}".format(obj.id, obj.name) + text = "deprecated: {}:{}:{}".format( + obj.id, resolve_mitre_id(obj), obj.name + ) else: raise NotificationError( @@ -386,29 +476,32 @@ def main() -> None: else None ) - attack = Attck(proxies=proxies) + attack = attampck.data.Attampck(proxies=proxies) - types = [args.type] if args.type else MITRE_TYPES + types = ( + [getattr(attampck.matrices, args.type.upper())] if args.type else MITRE_TYPES + ) - for mitre_type in types: - if mitre_type not in MITRE_TYPES: + for matrice in types: + if not matrice: error( "Unknown mitre type: {}. Valid types: {}".format( - mitre_type, ",".join(MITRE_TYPES) + matrice, ",".join(attampck.matrices) ) ) sys.exit(2) cache = notify_cache(args.notifycache) - model = getattr(attack, mitre_type) - - techniques_notify = add_techniques(actapi, model, args.output_format) - groups_notify = add_groups(actapi, model, args.output_format) - software_notify = add_software(actapi, model, args.output_format) + tactic_id_map = add_tactics(actapi, attack, matrice, args.output_format) + techniques_notify = add_techniques( + actapi, attack, matrice, tactic_id_map, args.output_format + ) + groups_notify = add_groups(actapi, attack, matrice, args.output_format) + software_notify = add_software(actapi, attack, matrice, args.output_format) # filter revoked objects from those allready notified - notify = [ + notify: Attack = [ notify for notify in techniques_notify + groups_notify + software_notify if notify.id not in cache @@ -416,7 +509,11 @@ def main() -> None: if notify: notified = send_notification( - notify, args.smtphost, args.sender, args.recipient, mitre_type + notify, + args.smtphost, + args.sender, + args.recipient, + matrice, ) for object_id in notified: diff --git a/setup.py b/setup.py index 3533be0..7c6b6eb 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ setup( name="act-workers", - version="2.1.3", + version="2.1.4", author="mnemonic AS", zip_safe=True, author_email="opensource@mnemonic.no", @@ -64,7 +64,7 @@ "RashlyOutlaid>=0.19", "virustotal-api", "dateparser", - "pyattck>=5.2.0", + "attampck", ], python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4", classifiers=[ From 1fcb64eafa107e74231b75ab021ec4a2c954370f Mon Sep 17 00:00:00 2001 From: Fredrik Borg Date: Fri, 7 Oct 2022 12:25:08 +0200 Subject: [PATCH 2/4] matrice -> matrix, format() -> f-string --- act/workers/attack.py | 61 ++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/act/workers/attack.py b/act/workers/attack.py index 03f8599..f47aace 100755 --- a/act/workers/attack.py +++ b/act/workers/attack.py @@ -75,11 +75,11 @@ class MemoryStoreNotFound(Exception): pass -def memorystore(attack: attampck.data.Attampck, matrice: matrices) -> MemoryStore: - mem = getattr(attack, f"{matrice}_memorystore") +def memorystore(attack: attampck.data.Attampck, matrix: matrices) -> MemoryStore: + mem = getattr(attack, f"{matrix}_memorystore") if not mem: - raise MemoryStoreNotFound(f"Memorystore not found: {matrice}_memorystore") + raise MemoryStoreNotFound(f"Memorystore not found: {matrix}_memorystore") return mem @@ -148,7 +148,7 @@ def handle_techniques( def add_tactics( client: Act, attack: attampck.data.Attampck, - matrice: matrices, + matrix: matrices, output_format: Text = "json", ) -> Dict[Text, Text]: """ @@ -162,7 +162,7 @@ def add_tactics( shortname_id_map: Dict[Text, Text] = {} - for tactic in attack.iterate(matrice, stixmap.TACTIC): + for tactic in attack.iterate(matrix, stixmap.TACTIC): tactic_id = resolve_mitre_id(tactic) shortname_id_map[tactic.x_mitre_shortname] = tactic_id @@ -178,7 +178,7 @@ def add_tactics( def add_techniques( client: Act, attack: attampck.data.Attampck, - matrice: matrices, + matrix: matrices, tactic_id_map: Dict[Text, Text], output_format: Text = "json", ) -> List[AttackPattern]: @@ -193,7 +193,7 @@ def add_techniques( notify = [] - for technique in attack.iterate(matrice, stixmap.TECHNIQUE): + for technique in attack.iterate(matrix, stixmap.TECHNIQUE): # Subtechniques are added below if getattr(technique, "x_mitre_is_subtechnique", False): @@ -214,7 +214,7 @@ def add_techniques( def add_groups( client: Act, attack: attampck.data.Attampck, - matrice: matrices, + matrix: matrices, output_format: Text = "json", ) -> List[IntrusionSet]: """ @@ -229,7 +229,7 @@ def add_groups( notify: List[IntrusionSet] = [] # ICS does not have actors - for actor in attack.iterate(matrice, stixmap.GROUP): + for actor in attack.iterate(matrix, stixmap.GROUP): if deprecated_or_revoked(actor): # Object is revoked, add to notification list but do not add to facts that should be added to the platform @@ -248,7 +248,7 @@ def add_groups( output_format=output_format, ) - for uses in memorystore(attack, matrice).related_to( + for uses in memorystore(attack, matrix).related_to( actor, relationship_type="uses" ): @@ -293,7 +293,7 @@ def add_groups( def add_software( client: Act, attack: attampck.data.Attampck, - matrice: matrices, + matrix: matrices, output_format: Text = "json", ) -> List[Software]: """ @@ -309,8 +309,8 @@ def add_software( notify: List[Software] = [] for stix_type in (stixmap.TOOL, stixmap.MALWARE): - # Enterprise matrice has malwares and tools, but preattack has none of them - for software in attack.iterate(matrice, stix_type): + # Enterprise matrix has malwares and tools, but preattack has none of them + for software in attack.iterate(matrix, stix_type): if deprecated_or_revoked(software): # Object is revoked/deprecated, add to notification list but do not add to facts that should be added to the platform @@ -343,7 +343,7 @@ def add_software( output_format=output_format, ) - for technique in memorystore(attack, matrice).related_to( + for technique in memorystore(attack, matrix).related_to( software, relationship_type="uses" ): if technique.type != "attack-pattern": @@ -375,9 +375,7 @@ def notify_cache(filename: str) -> Dict[Text, bool]: if line: cache[line.strip()] = True except FileNotFoundError: - warning( - "Cache file {} not found, will be created if necessary".format(filename) - ) + warning("Cache file %s not found, will be created if necessary", filename) return cache @@ -401,7 +399,7 @@ def send_notification( smtphost: Text, sender: Text, recipient: Text, - matrice: Text, + matrix: Text, ) -> List[Text]: """ Process revoked objects @@ -427,17 +425,16 @@ def send_notification( ) return [] - body = matrice + "\n\n" - warning("[{}]".format(matrice)) + body = matrix + "\n\n" + warning("[%s]", matrix) for obj in notify: + obj_description = f"{obj.id}:{resolve_mitre_id(obj)}:{obj.name}" if getattr(obj, "revoked", None): - text = "revoked: {}:{}:{}".format(obj.id, resolve_mitre_id(obj), obj.name) + text = f"revoked: {obj_description}" elif getattr(obj, "deprecated", None): - text = "deprecated: {}:{}:{}".format( - obj.id, resolve_mitre_id(obj), obj.name - ) + text = f"deprecated: {obj_description}" else: raise NotificationError( @@ -482,23 +479,23 @@ def main() -> None: [getattr(attampck.matrices, args.type.upper())] if args.type else MITRE_TYPES ) - for matrice in types: - if not matrice: + for matrix in types: + if not matrix: error( "Unknown mitre type: {}. Valid types: {}".format( - matrice, ",".join(attampck.matrices) + matrix, ",".join(attampck.matrices) ) ) sys.exit(2) cache = notify_cache(args.notifycache) - tactic_id_map = add_tactics(actapi, attack, matrice, args.output_format) + tactic_id_map = add_tactics(actapi, attack, matrix, args.output_format) techniques_notify = add_techniques( - actapi, attack, matrice, tactic_id_map, args.output_format + actapi, attack, matrix, tactic_id_map, args.output_format ) - groups_notify = add_groups(actapi, attack, matrice, args.output_format) - software_notify = add_software(actapi, attack, matrice, args.output_format) + groups_notify = add_groups(actapi, attack, matrix, args.output_format) + software_notify = add_software(actapi, attack, matrix, args.output_format) # filter revoked objects from those allready notified notify: Attack = [ @@ -513,7 +510,7 @@ def main() -> None: args.smtphost, args.sender, args.recipient, - matrice, + matrix, ) for object_id in notified: From 55f7432ef076a78315e2f821df8be67c7be5db11 Mon Sep 17 00:00:00 2001 From: Fredrik Borg Date: Fri, 7 Oct 2022 13:41:11 +0200 Subject: [PATCH 3/4] use %s in logging and switch to attampck get_stix2_memorystore --- act/workers/attack.py | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/act/workers/attack.py b/act/workers/attack.py index f47aace..639c6e8 100755 --- a/act/workers/attack.py +++ b/act/workers/attack.py @@ -15,7 +15,6 @@ from act.api.libs import cli from attampck import matrices, stixmap from attampck.data import resolve_mitre_id -from stix2.datastore.memory import MemoryStore from stix2.v21.sdo import AttackPattern, IntrusionSet, Malware, Tool from act.workers.libs import worker @@ -71,19 +70,6 @@ def parseargs() -> argparse.ArgumentParser: return parser -class MemoryStoreNotFound(Exception): - pass - - -def memorystore(attack: attampck.data.Attampck, matrix: matrices) -> MemoryStore: - mem = getattr(attack, f"{matrix}_memorystore") - - if not mem: - raise MemoryStoreNotFound(f"Memorystore not found: {matrix}_memorystore") - - return mem - - def deprecated_or_revoked(obj: Any) -> Optional[bool]: """ Return true if object has a truthy "revoked" or "deprecated" attribute, @@ -248,7 +234,7 @@ def add_groups( output_format=output_format, ) - for uses in memorystore(attack, matrix).related_to( + for uses in attack.get_stix2_memorystore(matrix).related_to( actor, relationship_type="uses" ): @@ -343,7 +329,7 @@ def add_software( output_format=output_format, ) - for technique in memorystore(attack, matrix).related_to( + for technique in attack.get_stix2_memorystore(matrix).related_to( software, relationship_type="uses" ): if technique.type != "attack-pattern": @@ -438,7 +424,7 @@ def send_notification( else: raise NotificationError( - "object is not deprecated or revoked: {}:{}".format(obj.id, obj.name) + f"object is not deprecated or revoked: {obj.id}:{obj.name}" ) notified.append(obj.id) @@ -453,7 +439,7 @@ def send_notification( "Revoked/deprecated objects from MITRE/ATT&CK", body, ) - info("Email sent to {}".format(recipient)) + info("Email sent to %s", recipient) return notified @@ -482,9 +468,9 @@ def main() -> None: for matrix in types: if not matrix: error( - "Unknown mitre type: {}. Valid types: {}".format( - matrix, ",".join(attampck.matrices) - ) + "Unknown mitre type: %s. Valid types: %s", + matrix, + ",".join(attampck.matrices), ) sys.exit(2) @@ -523,7 +509,7 @@ def main_log_error() -> None: try: main() except Exception: - error("Unhandled exception: {}".format(traceback.format_exc())) + error("Unhandled exception: %s", traceback.format_exc()) raise From d9e481e36d1b6440327c12b17a16d369e6b0ed0f Mon Sep 17 00:00:00 2001 From: Fredrik Borg Date: Mon, 10 Oct 2022 10:40:47 +0200 Subject: [PATCH 4/4] cache relationships --- act/workers/attack.py | 44 ++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/act/workers/attack.py b/act/workers/attack.py index 639c6e8..0803dd6 100755 --- a/act/workers/attack.py +++ b/act/workers/attack.py @@ -6,6 +6,7 @@ import os import sys import traceback +from collections import defaultdict from logging import error, info, warning from typing import Any, Dict, List, Optional, Text, Union @@ -15,7 +16,8 @@ from act.api.libs import cli from attampck import matrices, stixmap from attampck.data import resolve_mitre_id -from stix2.v21.sdo import AttackPattern, IntrusionSet, Malware, Tool +from stix2 import (AttackPattern, Filter, IntrusionSet, Malware, Relationship, + Tool) from act.workers.libs import worker @@ -201,6 +203,7 @@ def add_groups( client: Act, attack: attampck.data.Attampck, matrix: matrices, + uses: Dict[Text, List[Relationship]], output_format: Text = "json", ) -> List[IntrusionSet]: """ @@ -234,12 +237,10 @@ def add_groups( output_format=output_format, ) - for uses in attack.get_stix2_memorystore(matrix).related_to( - actor, relationship_type="uses" - ): + for target in uses.get(actor.id, []): - if uses.type in ("malware", "tool"): - tool_name = uses.name.strip() + if target.type in ("malware", "tool"): + tool_name = target.name.strip() if not tool_name: # Skip empty tools found in ATT&CK @@ -260,11 +261,11 @@ def add_groups( output_format=output_format, ) - elif uses.type == "attack-pattern": + elif target.type == "attack-pattern": handle_facts( act.api.fact.fact_chain( client.fact("observedIn") - .source("technique", resolve_mitre_id(uses)) + .source("technique", resolve_mitre_id(target)) .destination("incident", "*"), client.fact("attributedTo") .source("incident", "*") @@ -280,14 +281,14 @@ def add_software( client: Act, attack: attampck.data.Attampck, matrix: matrices, + uses: Dict[Text, List[Relationship]], output_format: Text = "json", ) -> List[Software]: """ extract objects/facts related to ATT&CK Software Insert to ACT if client.baseurl is set, if not, print to stdout - Args: - attack (matrices): Attack matrice + Args: attack (matrices): Attack matrice output_format (Text): "json" or "str" output format """ @@ -329,16 +330,14 @@ def add_software( output_format=output_format, ) - for technique in attack.get_stix2_memorystore(matrix).related_to( - software, relationship_type="uses" - ): - if technique.type != "attack-pattern": + for target in uses.get(software.id, []): + if target.type != "attack-pattern": continue handle_fact( client.fact("implements") .source("tool", software.name) - .destination("technique", resolve_mitre_id(technique)), + .destination("technique", resolve_mitre_id(target)), output_format=output_format, ) @@ -476,12 +475,23 @@ def main() -> None: cache = notify_cache(args.notifycache) + uses = defaultdict(list) + + for rel in attack.get_stix2_memorystore(matrix).query( + [ + Filter("type", "=", "relationship"), + Filter("relationship_type", "=", "uses"), + ] + ): + uses[rel.source_ref].append(attack.get(rel.target_ref)) + tactic_id_map = add_tactics(actapi, attack, matrix, args.output_format) techniques_notify = add_techniques( actapi, attack, matrix, tactic_id_map, args.output_format ) - groups_notify = add_groups(actapi, attack, matrix, args.output_format) - software_notify = add_software(actapi, attack, matrix, args.output_format) + + groups_notify = add_groups(actapi, attack, matrix, uses, args.output_format) + software_notify = add_software(actapi, attack, matrix, uses, args.output_format) # filter revoked objects from those allready notified notify: Attack = [