Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions app/objects/c_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@
link = [link for link in self.chain if link.id == link_id][0]
if link.can_ignore():
self.add_ignored_link(link.id)
member = [member for member in self.agents if member.paw == link.paw][0]
members = [member for member in self.agents if member.paw == link.paw]
Comment on lines 258 to +261
if not members:
continue
Comment on lines 258 to +263
member = members[0]
while not (link.finish or link.can_ignore()):
await asyncio.sleep(5)
if not member.trusted:
Expand Down Expand Up @@ -293,6 +296,8 @@
abilities_by_agent = await self._get_all_possible_abilities_by_agent(data_svc)
skipped_abilities = []
for agent in self.agents:
if agent.paw not in abilities_by_agent:
continue
agent_skipped = defaultdict(dict)
agent_executors = agent.executors
agent_ran = set([link.ability.ability_id for link in self.chain if link.paw == agent.paw and link.finish])
Expand All @@ -311,12 +316,17 @@

async def report(self, file_svc, data_svc, output=False):
try:
report = dict(name=self.name, host_group=[a.display for a in self.agents],
agent_paw_map = {a.paw: a for a in self.agents}
chain_paws = {link.paw for link in self.chain}
all_paws = set(agent_paw_map.keys()) | chain_paws
host_group = [agent_paw_map[p].display if p in agent_paw_map else dict(paw=p)

Check warning on line 322 in app/objects/c_operation.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this constructor call with a literal.

See more on https://sonarcloud.io/project/issues?id=mitre_caldera&issues=AZz3A88IBUkVLY-odxEj&open=AZz3A88IBUkVLY-odxEj&pullRequest=3341
for p in all_paws]
Comment on lines +319 to +323
report = dict(name=self.name, host_group=host_group,

Check warning on line 324 in app/objects/c_operation.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this constructor call with a literal.

See more on https://sonarcloud.io/project/issues?id=mitre_caldera&issues=AZz3A88IBUkVLY-odxEk&open=AZz3A88IBUkVLY-odxEk&pullRequest=3341
start=self.start.strftime(self.TIME_FORMAT),
steps=[], finish=self.finish, planner=self.planner.name, adversary=self.adversary.display,
jitter=self.jitter, objectives=self.objective.display,
facts=[f.display for f in await self.all_facts()])
agents_steps = {a.paw: {'steps': []} for a in self.agents}
agents_steps = {paw: {'steps': []} for paw in all_paws}
for step in self.chain:
step_report = dict(link_id=step.id,
ability_id=step.ability.ability_id,
Expand Down
75 changes: 75 additions & 0 deletions tests/objects/test_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,81 @@ def test_resolve_fact_no_match_returns_original(self, adversary):
result = op._resolve_fact(stub, fact_list)
assert result is stub

async def test_report_with_empty_agents_and_chain_links(
self, operation_adversary, executor, ability, operation_link,
encoded_command, parse_datestring, file_svc, data_svc, knowledge_svc, fire_event_mock):
"""report() should succeed when self.agents is empty but self.chain has links (deleted agents)."""
from app.objects.c_planner import Planner
from app.objects.c_objective import Objective

op = Operation(name='deleted-agent-test', agents=[], adversary=operation_adversary)
op.set_start_details()
op.planner = Planner(planner_id='tp', name='test_planner', module='test', params=None)
op.objective = Objective(id='obj1', name='test objective')

exe = executor(name='psh', platform='windows', command='whoami')
ab = ability(ability_id='del123', tactic='discovery', technique_id='T0000',
technique_name='test technique', name='test ability',
description='test desc', executors=[exe])

link1 = operation_link(
command=encoded_command('whoami'), plaintext_command=encoded_command('whoami'),
paw='deleted-paw-1', ability=ab, executor=exe, status=0, host='HOST1', pid=1,
decide=parse_datestring(LINK1_DECIDE_TIME),
)
op.chain = [link1]

report = await op.report(file_svc, data_svc, output=False)
assert report is not None
assert 'deleted-paw-1' in report['steps']
assert len(report['steps']['deleted-paw-1']['steps']) == 1

async def test_report_with_partial_deleted_agents(
self, operation_agent, operation_adversary, executor, ability, operation_link,
encoded_command, parse_datestring, file_svc, data_svc, knowledge_svc, fire_event_mock):
"""report() should include steps from both present agents and deleted agents."""
from app.objects.c_planner import Planner
from app.objects.c_objective import Objective

op = Operation(name='partial-delete-test', agents=[operation_agent], adversary=operation_adversary)
op.set_start_details()
op.planner = Planner(planner_id='tp', name='test_planner', module='test', params=None)
op.objective = Objective(id='obj2', name='test objective')

exe = executor(name='psh', platform='windows', command='whoami')
ab = ability(ability_id='pd123', tactic='discovery', technique_id='T0000',
technique_name='test technique', name='test ability',
description='test desc', executors=[exe])

known_link = operation_link(
command=encoded_command('whoami'), plaintext_command=encoded_command('whoami'),
paw=operation_agent.paw, ability=ab, executor=exe, status=0,
host=operation_agent.host, pid=1,
decide=parse_datestring(LINK1_DECIDE_TIME),
)
deleted_paw = 'deleted-agent-paw'
deleted_link = operation_link(
command=encoded_command('hostname'), plaintext_command=encoded_command('hostname'),
paw=deleted_paw, ability=ab, executor=exe, status=0, host='GONE', pid=2,
decide=parse_datestring(LINK2_DECIDE_TIME),
)
op.chain = [known_link, deleted_link]

report = await op.report(file_svc, data_svc, output=False)
assert report is not None
assert operation_agent.paw in report['steps']
assert deleted_paw in report['steps']
assert len(report['steps'][operation_agent.paw]['steps']) == 1
assert len(report['steps'][deleted_paw]['steps']) == 1
# host_group should contain entries for both present and deleted agents
paws_in_host_group = []
for entry in report['host_group']:
if isinstance(entry, dict) and 'paw' in entry:
paws_in_host_group.append(entry['paw'])
elif hasattr(entry, 'get') and entry.get('paw'):
paws_in_host_group.append(entry['paw'])
assert deleted_paw in paws_in_host_group

async def test_init_source_seeds_relationship_with_resolved_facts(self, knowledge_svc, fire_event_mock, adversary):
"""Relationships in a fact source that use trait-only fact references should be seeded
with the resolved (non-null) fact values from the source's fact list. Regression test
Expand Down
Loading