Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ This release includes Ghidra PyGhidra support, performance improvements, depende

- ghidra: support PyGhidra @mike-hunhoff #2788
- vmray: extract number features from whitelisted void_ptr parameters (hKey, hKeyRoot) @adeboyedn #2835
- ghidra: support analyzing existing Ghidra projects via .gpr input files @saniyafatima07 #3087

### Breaking Changes

Expand Down
4 changes: 4 additions & 0 deletions capa/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ class NonExistantFunctionError(ValueError):

class NonExistantProcessError(ValueError):
pass


class LockedProjectDatabaseError(RuntimeError):
pass
2 changes: 2 additions & 0 deletions capa/features/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True):
FORMAT_FREEZE = "freeze"
FORMAT_RESULT = "result"
FORMAT_BINJA_DB = "binja_database"
FORMAT_GHIDRA_PROJECT = "ghidra_project"
STATIC_FORMATS = {
FORMAT_SC32,
FORMAT_SC64,
Expand All @@ -512,6 +513,7 @@ def evaluate(self, features: "capa.engine.FeatureSet", short_circuit=True):
FORMAT_RESULT,
FORMAT_BINEXPORT2,
FORMAT_BINJA_DB,
FORMAT_GHIDRA_PROJECT,
}
DYNAMIC_FORMATS = {
FORMAT_CAPE,
Expand Down
12 changes: 12 additions & 0 deletions capa/ghidra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ To use the Ghidra backend, specify it with the `-b` or `--backend` flag:
$ capa -b ghidra /path/to/sample
```

capa can also analyze programs directly from Ghidra projects by specifying the project file path (`.gpr`):

```bash
$ capa /path/to/project.gpr
```

If the project contains multiple programs, set the `CAPA_GHIDRA_PROGRAM_PATH` environment variable to specify which program to analyze:

```bash
$ CAPA_GHIDRA_PROGRAM_PATH=/myprogram capa /path/to/project.gpr
```

capa will:
1. Initialize a headless Ghidra instance.
2. Create a temporary project.
Expand Down
50 changes: 49 additions & 1 deletion capa/ghidra/helpers.py

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work splitting up the code into helper functions to keep things concise.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import logging
import datetime
import contextlib
Expand All @@ -28,7 +29,11 @@
logger = logging.getLogger("capa")

# file type as returned by Ghidra
SUPPORTED_FILE_TYPES = ("Executable and Linking Format (ELF)", "Portable Executable (PE)", "Raw Binary")
SUPPORTED_FILE_TYPES = (
"Executable and Linking Format (ELF)",
"Portable Executable (PE)",
"Raw Binary",
)


def get_current_program():
Expand All @@ -43,6 +48,49 @@ def get_monitor():
return ghidra_context.get_context().monitor


def iter_program_files(folder):
yield from folder.getFiles()

for child_folder in folder.getFolders():
yield from iter_program_files(child_folder)


# Programs within a Ghidra project
def list_project_files(project):
project_data = project.getProjectData()
root_folder = project_data.getRootFolder()
return list(iter_program_files(root_folder))


def select_project_file(project):
programs = list_project_files(project)

if not programs:
raise ValueError("no programs found in Ghidra project")

if len(programs) == 1:
return programs[0]

requested_path = os.environ.get("CAPA_GHIDRA_PROGRAM_PATH")
if requested_path:
for program in programs:
if program.getPathname() == requested_path:
return program

available = "\n".join(f"- {program.getPathname()}" for program in programs)
raise ValueError(
"CAPA_GHIDRA_PROGRAM_PATH did not match any program in the Ghidra project.\n"
+ f"available programs:\n{available}"
)
Comment thread
mike-hunhoff marked this conversation as resolved.

available = "\n".join(f"- {program.getPathname()}" for program in programs)
raise ValueError(
"multiple programs found in the Ghidra project.\n"
+ f"available programs:\n{available}\n"
+ "set CAPA_GHIDRA_PROGRAM_PATH to select one"
)
Comment thread
mike-hunhoff marked this conversation as resolved.


class GHIDRAIO:
"""
An object that acts as a file-like object,
Expand Down
4 changes: 4 additions & 0 deletions capa/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
FORMAT_UNKNOWN,
FORMAT_BINJA_DB,
FORMAT_BINEXPORT2,
FORMAT_GHIDRA_PROJECT,
Format,
)

Expand All @@ -69,6 +70,7 @@
EXTENSIONS_ELF = ".elf_"
EXTENSIONS_FREEZE = ".frz"
EXTENSIONS_BINJA_DB = ".bndb"
EXTENSIONS_GHIDRA = ".gpr"

logger = logging.getLogger("capa")

Expand Down Expand Up @@ -236,6 +238,8 @@ def get_format_from_extension(sample: Path) -> str:
format_ = FORMAT_BINEXPORT2
elif sample.name.endswith(EXTENSIONS_BINJA_DB):
format_ = FORMAT_BINJA_DB
elif sample.name.endswith(EXTENSIONS_GHIDRA):
format_ = FORMAT_GHIDRA_PROJECT
return format_


Expand Down
60 changes: 50 additions & 10 deletions capa/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
from capa.engine import MatchResults
from capa.helpers import assert_never
from capa.exceptions import (
InvalidArgument,
UnsupportedOSError,
UnsupportedArchError,
UnsupportedFormatError,
LockedProjectDatabaseError,
)
from capa.features.common import (
OS_AUTO,
Expand Down Expand Up @@ -434,24 +436,52 @@ def get_extractor(
if not capa.ghidra.helpers.is_supported_ghidra_version():
raise RuntimeError("unsupported Ghidra version")

import tempfile
project_path = input_path
tmpdir = None
if input_path.suffix.lower() == ".gpr":
try:
project_cm = pyghidra.open_project(
str(project_path.parent.resolve()), project_path.stem, create=False
)
except Exception as e:
err = str(e)
if "LockException" in err or "Database is locked" in err:
msg = (
f"Ghidra project database is locked. Ensure all programs accessing "
f"{project_path.name} are closed before proceeding."
)
raise LockedProjectDatabaseError(msg) from e
raise
else:
import tempfile

tmpdir = tempfile.TemporaryDirectory()
tmpdir = tempfile.TemporaryDirectory()
project_cm = pyghidra.open_project(tmpdir.name, "CapaProject", create=True)

project_cm = pyghidra.open_project(tmpdir.name, "CapaProject", create=True)
project = project_cm.__enter__()
program, consumer = None, None
try:
Comment thread
mike-hunhoff marked this conversation as resolved.
from ghidra.util.task import TaskMonitor

monitor = TaskMonitor.DUMMY

# Import file
loader = pyghidra.program_loader().project(project).source(str(input_path)).name(input_path.name)
with loader.load() as load_results:
load_results.save(monitor)
if input_path.suffix.lower() == ".gpr":
try:
selected_program = capa.ghidra.helpers.select_project_file(project)
except ValueError as e:
raise InvalidArgument(str(e)) from e
program_path = selected_program.getPathname()
logger.debug("ghidra: selected program path: %s", program_path)
else:
# Import file
loader = pyghidra.program_loader().project(project).source(str(input_path)).name(input_path.name)
with loader.load() as load_results:
load_results.save(monitor)

program_path = "/" + input_path.name

# Open program
program, consumer = pyghidra.consume_program(project, "/" + input_path.name)
program, consumer = pyghidra.consume_program(project, program_path)

# Analyze
pyghidra.analyze(program, monitor)
Expand All @@ -478,8 +508,18 @@ def __exit__(self, exc_type, exc_val, exc_tb):
cm = GhidraContextWrapper(project_cm, program, consumer)

except Exception:
project_cm.__exit__(None, None, None)
tmpdir.cleanup()
if program is not None:
try:
program.release(consumer)
except Exception:
logger.warning("failed to release program handle", exc_info=True)
try:
project_cm.__exit__(None, None, None)
except Exception:
logger.warning("failed to close Ghidra project", exc_info=True)
if tmpdir:
with contextlib.suppress(Exception):
tmpdir.cleanup()
raise
Comment thread
mike-hunhoff marked this conversation as resolved.
Comment thread
mike-hunhoff marked this conversation as resolved.

import capa.features.extractors.ghidra.extractor
Expand Down
27 changes: 22 additions & 5 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
UnsupportedOSError,
UnsupportedArchError,
UnsupportedFormatError,
LockedProjectDatabaseError,
)
from capa.features.common import (
OS_AUTO,
Expand All @@ -93,6 +94,7 @@
DYNAMIC_FORMATS,
FORMAT_BINJA_DB,
FORMAT_BINEXPORT2,
FORMAT_GHIDRA_PROJECT,
)
from capa.capabilities.common import (
Capabilities,
Expand Down Expand Up @@ -130,6 +132,7 @@
E_UNSUPPORTED_GHIDRA_EXECUTION_MODE = 24
E_INVALID_INPUT_FORMAT = 25
E_INVALID_FEATURE_EXTRACTOR = 26
E_GHIDRA_DB_LOCKED = 27

logger = logging.getLogger("capa")

Expand Down Expand Up @@ -279,6 +282,7 @@ def install_common_args(parser, wanted=None):
(FORMAT_FREEZE, "features previously frozen by capa"),
(FORMAT_BINEXPORT2, "BinExport2"),
(FORMAT_BINJA_DB, "Binary Ninja Database"),
(FORMAT_GHIDRA_PROJECT, "Ghidra project"),
]
format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats])

Expand Down Expand Up @@ -580,6 +584,9 @@ def get_backend_from_cli(args, input_format: str) -> str:
if args.backend != BACKEND_AUTO:
return args.backend

if input_format == FORMAT_GHIDRA_PROJECT:
return BACKEND_GHIDRA

if input_format == FORMAT_CAPE:
return BACKEND_CAPE

Expand All @@ -602,7 +609,7 @@ def get_backend_from_cli(args, input_format: str) -> str:
return BACKEND_VIV


def get_sample_path_from_cli(args, backend: str) -> Optional[Path]:
def get_sample_path_from_cli(args, input_format, backend) -> Optional[Path]:
"""
Determine the path to the underlying sample, if it exists.

Expand All @@ -611,13 +618,16 @@ def get_sample_path_from_cli(args, backend: str) -> Optional[Path]:

args:
args: The parsed command line arguments from `install_common_args`.
input_format: The file format of the input file.
backend: The backend that will handle the input file.

raises:
ShouldExitError: if the program is invoked incorrectly and should exit.
"""
if backend in (BACKEND_CAPE, BACKEND_DRAKVUF, BACKEND_VMRAY):
return None
elif input_format == FORMAT_GHIDRA_PROJECT:
return None
elif backend == BACKEND_BINEXPORT2:
import capa.features.extractors.binexport2

Expand All @@ -629,14 +639,15 @@ def get_sample_path_from_cli(args, backend: str) -> Optional[Path]:
return args.input_file


def get_os_from_cli(args, backend) -> str:
def get_os_from_cli(args, input_format, backend) -> str:
"""
Determine the OS for the given sample.
Respects an override provided by the user, otherwise, use heuristics and
algorithms to detect the OS.

args:
args: The parsed command line arguments from `install_common_args`.
input_format: The file format of the input file.
backend: The backend that will handle the input file.

raises:
Expand All @@ -645,7 +656,7 @@ def get_os_from_cli(args, backend) -> str:
if args.os:
return args.os

sample_path = get_sample_path_from_cli(args, backend)
sample_path = get_sample_path_from_cli(args, input_format, backend)
if sample_path is None:
return "unknown"
return capa.loader.get_os(sample_path)
Expand Down Expand Up @@ -867,8 +878,8 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
None,
)

os_ = get_os_from_cli(args, backend)
sample_path = get_sample_path_from_cli(args, backend)
os_ = get_os_from_cli(args, input_format, backend)
sample_path = get_sample_path_from_cli(args, input_format, backend)
extractor_filters = get_extractor_filters_from_cli(args, input_format)

logger.debug("format: %s", input_format)
Expand All @@ -886,6 +897,9 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
sample_path=sample_path,
)
return apply_extractor_filters(extractor, extractor_filters)
except InvalidArgument as e:
logger.error("%s", str(e))
raise ShouldExitError(E_INVALID_INPUT_FORMAT) from e
except UnsupportedFormatError as e:
if input_format == FORMAT_CAPE:
log_unsupported_cape_report_error(str(e))
Expand All @@ -905,6 +919,9 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
except capa.loader.CorruptFile as e:
logger.error("Input file '%s' is not a valid file: %s", args.input_file, str(e))
raise ShouldExitError(E_CORRUPT_FILE) from e
except LockedProjectDatabaseError as e:
logger.error("%s", str(e))
raise ShouldExitError(E_GHIDRA_DB_LOCKED) from e


def get_extractor_filters_from_cli(args, input_format) -> FilterConfig:
Expand Down
2 changes: 1 addition & 1 deletion tests/data

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove these for now. We can add in a more robust test for this specific feature if it becomes a problem in the future.

Loading