Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,19 @@ In repository, run:
```bash
pip3 install --user -e .
```

## Tests

To run tests, you need `pyteest` and `pytest-mock-server`:

```bash
pip install pytest-mock-server
```

```bash
pytestpytest
```

# search
A worker to run graph queries is also included. A sample search config is inscluded in `etc/searc_jobs.ini`:

Expand Down
7 changes: 7 additions & 0 deletions act/workers/etc/act.ini
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ access-mode = Public

# user-agent = Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36

[unit42-playbooks]
origin-name = unit42-playbooks
origin-description = Palo Alto Unit42 Playbooks
access-mode = Public

# playbook-uri = https://github.com/pan-unit42/playbook_viewer/archive/master.zip

[veris]
origin-name = vcdb
origin-description = Veris Community Database
Expand Down
66 changes: 66 additions & 0 deletions act/workers/libs/stix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from typing import Set, Text, Tuple

import stix2
from stix2patterns.v20.pattern import Pattern


class SingleStixPatternEqualError(Exception):
pass


class NotEqualOperator(SingleStixPatternEqualError):
pass


class MultipleComparisons(SingleStixPatternEqualError):
pass


class TypesNotString(SingleStixPatternEqualError):
pass


def expand_techniques(attack_pattern: stix2.v20.AttackPattern) -> Set[Text]:
"""Expand Mitre Attack Techniques from Stix Attack Pattern"""
if not isinstance(attack_pattern, stix2.v20.AttackPattern):
raise TypeError(f"{attack_pattern} is not an AttackPattern")

return {
ref.external_id
for ref in attack_pattern.external_references
if ref.source_name == "mitre-attack"
}


def single_stix_equal_pattern(pattern: Text) -> Tuple[Text, Text]:

"""
Extract type/value from Stix Indicator patterns like this:

[domain-name:value = 'sampwn.anondns.net']
[url:value = 'https://teamtnt.red']

"""

pattern_data = Pattern(pattern).inspect()
comparisons = pattern_data.comparisons

if len(comparisons) > 1:
raise MultipleComparisons

main_type = list(comparisons.keys())[0]

if len(comparisons[main_type]) > 1:
raise MultipleComparisons

(sub_type, oper, value) = comparisons[main_type][0]

if oper != "=":
raise NotEqualOperator

if not all([isinstance(t, str) for t in sub_type]):
raise TypesNotString

pattern_type = f"{main_type}:{'.'.join(sub_type)}"

return pattern_type, value.strip("'")
34 changes: 30 additions & 4 deletions act/workers/libs/worker.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""Common worker library"""

import argparse
import io
import json
import smtplib
import socket
import sys
import urllib.parse
import zipfile
from email.mime.text import MIMEText
from logging import warning
from typing import Any, Optional
from typing import IO, Any, Iterator, Optional, Tuple

import act.api
import requests
Expand Down Expand Up @@ -108,7 +110,11 @@ def init_act(


def fetch(
url: str, proxy_string: Optional[str], timeout: int = 60, verify_https: bool = False
url: str,
proxy_string: Optional[str],
timeout: int = 60,
verify_https: bool = False,
bytes: bool = False,
) -> Any:
"""Fetch remote URL and return content
url (string): File or URL to fetch
Expand All @@ -129,7 +135,10 @@ def fetch(

# No scheme - assume this is a file
if not parsed.scheme:
return open(url).read()
if bytes:
return open(url, "rb").read()
else:
return open(url).read()

if not parsed.scheme.lower() in ("http", "https"):
raise UnsupportedScheme(f"Unsupported scheme in {url}")
Expand All @@ -152,7 +161,10 @@ def fetch(
errmsg = "status_code: {0.status_code}: {0.content}"
raise FetchError(errmsg.format(req))

return req.text
if bytes:
return req.content
else:
return req.text


def fetch_json(
Expand All @@ -170,6 +182,20 @@ def fetch_json(
raise FetchError(f"Cannot parse as json {e}, {content} {url}")


def zip_files(
url: str, proxy_string: Optional[str], timeout: int = 60, verify_https: bool = False
) -> Iterator[Tuple[str, IO[bytes]]]:
"""
Open ZIP file (from disk or URL) and extract its contents in memory
yields (filename, file-like object) pairs
"""
content = fetch(url, proxy_string, timeout, verify_https, bytes=True)
with zipfile.ZipFile(io.BytesIO(content)) as zip:
for zipinfo in zip.infolist():
with zip.open(zipinfo) as zf:
yield zipinfo.filename, zf


def sendmail(
smtphost: str, sender: str, recipient: str, subject: str, body: str
) -> None:
Expand Down
204 changes: 204 additions & 0 deletions act/workers/unit42_playbooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#!/usr/bin/env python3

"""url unshortener worker for the ACT platform

Copyright 2018 the ACT project <opensource@mnemonic.no>

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.
"""


import argparse
import re
import traceback
from logging import debug, error, warning
from typing import List, Set, Text, Tuple, Union

import act.api
import stix2
import urllib3
from act.api.libs import cli
from stix2 import parse
from stix2.v20.sro import Relationship

import act
from act.workers.libs import worker
from act.workers.libs.stix import (SingleStixPatternEqualError,
expand_techniques,
single_stix_equal_pattern)

StixObject = Union[
stix2.v20.sdo.AttackPattern,
stix2.v20.sdo.Campaign,
stix2.v20.sdo.CourseOfAction,
stix2.v20.sdo.Identity,
stix2.v20.sdo.Indicator,
stix2.v20.sdo.IntrusionSet,
stix2.v20.sdo.Malware,
stix2.v20.sdo.ObservedData,
stix2.v20.sdo.Report,
stix2.v20.sdo.ThreatActor,
stix2.v20.sdo.Tool,
stix2.v20.sdo.Vulnerability,
]


def parseargs() -> argparse.Namespace:
parser = worker.parseargs("Unit42 Playbook worker")

parser.add_argument(
"--playbook-uri",
default="https://github.com/pan-unit42/playbook_viewer/archive/master.zip",
help="URI to zip dump of playbook repository. Supports URL and local files.",
)

return cli.handle_args(parser)


def resolve_triplets(
playbook: stix2.v20.bundle.Bundle,
) -> List[Tuple[StixObject, Relationship, StixObject]]:
objects = {
obj.id: obj for obj in playbook.objects if not obj.type == "relationship"
}

triplets = []

for obj in playbook.objects:
if obj.type != "relationship":
continue

triplets.append((objects[obj.source_ref], obj, objects[obj.target_ref]))

return triplets


def handle_indicator(
api: act.api.Act,
source: StixObject,
relationship: Relationship,
target: StixObject,
output_format: Text,
) -> None:
try:
pattern_type, pattern_value = single_stix_equal_pattern(source.pattern)
except (StingleStixPatternEqualError):
debug("Unsupported indicator pattern: %s", source.pattern)
return

types = (pattern_type, relationship.relationship_type, target.type)

if types == ("url:value", "indicates", "campaign"):
act.api.helpers.handle_uri(api, pattern_value, output_format=output_format)
act.api.helpers.handle_fact(
api.fact("observedIn")
.source("uri", pattern_value)
.destination("incident", target.id),
output_format=output_format,
)

else:
debug("Unsupported indicator types: %s", types)


def handle_campaign(
api: act.api.Act, campaign: stix2.v20.sdo.Campaign, output_format: Text
) -> None:
act.api.helpers.handle_fact(
api.fact("name", campaign.name).source("incident", campaign.id),
output_format=output_format,
)


def handle_campaign_techniques(
api: act.api.Act,
campaign: stix2.v20.sdo.Campaign,
techniques: Set[Text],
output_format: Text,
) -> None:

handle_campaign(api, campaign, output_format)

for technique in techniques:
act.api.helpers.handle_fact(
api.fact("observedIn")
.source("technique", technique)
.destination("incident", campaign.id),
output_format=output_format,
)


def process(
api: act.api.Act,
playbook_uri: Text,
proxy_string: Text,
output_format: Text = "json",
) -> None:
"""Retrieve playbooks and handle objects"""

for filename, file in worker.zip_files(playbook_uri, proxy_string):

if re.search(r"/playbook_json/.*.json$", filename):
playbook = parse(file, allow_custom=True)

triplets = resolve_triplets(playbook)

for source, rel, target in triplets:

# Get source type, relationship type and target type
types = (source.type, rel.relationship_type, target.type)

# print(types)

if types == ("course-of-action", "mitigates", "attack-pattern"):
pass
elif types == ("campaign", "uses", "attack-pattern"):
handle_campaign_techniques(
api, source, expand_techniques(target), output_format
)
elif types == ("campaign", "targets", "identity"):
pass
elif types == ("report", "attributed-to", "intrusion-set"):
pass
elif types == ("campaign", "attributed-to", "intrusion-set"):
pass

elif source.type == "indicator":
handle_indicator(api, source, rel, target, output_format)
else:
warning("Unsupported types: %s", types)


def main() -> None:
"""Main function"""
# Look for default ini file in "/etc/act.ini" and
# ~/config/act/act.ini (or replace .config with
# $XDG_CONFIG_DIR if set)
args = parseargs()

actapi = worker.init_act(args)

process(actapi, args.playbook_uri, args.proxy_string, args.output_format)


def main_log_error() -> None:
"Main function wrapper. Log all exceptions to error"
try:
main()
except Exception:
error("Unhandled exception: {}".format(traceback.format_exc()))
raise


if __name__ == "__main__":
main_log_error()
Loading