Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.44.1] - 2026-05-21
### Fixed
- `trigger_github_checks()` 404 error caused by duplicated `projects/locations/instances` path segments in the `runAnalysis` API request URL
- The full parser resource name returned by `list_parsers()` is now correctly parsed to extract only the parser ID, which is used to build a relative `endpoint_path`

## [0.44.0] - 2026-04-29
### Added
- Automatic `x-goog-api-client` header on all API requests for client telemetry and tracing
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "secops"
version = "0.44.0"
version = "0.44.1"
description = "Python SDK for wrapping the Google SecOps API for common use cases"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
24 changes: 21 additions & 3 deletions src/secops/chronicle/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,21 +530,39 @@ def trigger_github_checks(
f"Failed to fetch parsers for log type {log_type}: {e}"
) from e

parser_id = "-" # Default to '-' for the fallback case
if not parsers:
logging.info(
"No parsers found for log type %s. Using fallback parser ID.",
log_type,
)
parser_name = f"logTypes/{log_type}/parsers/-"
# parser_id remains "-"
else:
if len(parsers) > 1:
logging.warning(
"Multiple parsers found for log type %s. Using the first one.",
log_type,
)
parser_name = parsers[0]["name"]
full_parser_resource_name = parsers[0]["name"]
# Expected format:
# projects/P/locations/L/instances/I/logTypes/LT/parsers/PA
parts = full_parser_resource_name.split("/")
if len(parts) == 10 and parts[8] == "parsers":
parser_id = parts[9]
logging.info(
"Using parser ID '%s' for log type %s.", parser_id, log_type
)
else:
logging.warning(
"Could not parse parser ID from resource name: %s. "
"Falling back to '-'.",
full_parser_resource_name,
)
# parser_id remains "-"

# Construct the endpoint_path using the derived parser_id
endpoint_path = f"logTypes/{log_type}/parsers/{parser_id}:runAnalysis"

endpoint_path = f"{parser_name}:runAnalysis"
payload = {
"report_type": "GITHUB_PARSER_VALIDATION",
"pull_request": associated_pr,
Expand Down
105 changes: 91 additions & 14 deletions tests/chronicle/test_log_type_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
They interact with real Chronicle API endpoints.
"""

import os
import pytest

from secops import SecOpsClient
Expand All @@ -28,24 +29,30 @@
@pytest.mark.integration
def test_log_type_lifecycle_integration():
"""Test the complete log-type lifecycle."""
client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON)
# Use service account if private key is provided, otherwise fallback to ADC
sa_info = SERVICE_ACCOUNT_JSON if SERVICE_ACCOUNT_JSON.get("private_key") else None
client = SecOpsClient(service_account_info=sa_info)

# 5. Testing parser validation workflow
dummy_project_id = "140410331797"
dummy_customer_id = "ebdc4bb9-878b-11e7-8455-10604b7cb5c1"
project_id = CHRONICLE_CONFIG.get("project_id") or "140410331797"
customer_id = CHRONICLE_CONFIG.get("customer_id") or "1234-53411-24552"
log_type = os.getenv("CHRONICLE_TEST_LOG_TYPE", "DUMMY_LOGTYPE")
associated_pr = os.getenv("CHRONICLE_TEST_PR", "chronicle/content-hub/pull/701")

dummy_chronicle = client.chronicle(
project_id=dummy_project_id,
customer_id=dummy_customer_id,
chronicle = client.chronicle(
project_id=project_id,
customer_id=customer_id,
region=CHRONICLE_CONFIG.get("region", "us"),
)
print(f"\nDEBUG: project_id={project_id!r}")
print(f"DEBUG: customer_id={customer_id!r}")

print("\nTesting trigger_github_checks with dummy data")
print(f"\nTesting trigger_github_checks with log type {log_type}")
try:
# Trigger checks for a dummy PR and log type
result = dummy_chronicle.trigger_github_checks(
associated_pr="google/secops-wrapper/pull/617",
log_type="DUMMY_LOGTYPE",
# Trigger checks for the configured PR and log type
result = chronicle.trigger_github_checks(
associated_pr=associated_pr,
log_type=log_type,
)
assert isinstance(result, dict)
print("Successfully triggered checks (or received valid JSON response)")
Expand All @@ -62,11 +69,11 @@ def test_log_type_lifecycle_integration():
f"Server gracefully handled the dummy trigger data: {error_msg.strip()}"
)

print("\nTesting get_analysis_report with dummy data")
print(f"\nTesting get_analysis_report with log type {log_type}")
try:
# Request a report for dummy resource names
report = dummy_chronicle.get_analysis_report(
log_type="DUMMY_LOGTYPE", parser_id="xyz", report_id="123"
report = chronicle.get_analysis_report(
log_type=log_type, parser_id="22223231", report_id="5c3f03cf-b90c-4ad3-91b6-4894c50e640d"
)
assert isinstance(report, dict)
print("Successfully retrieved report")
Expand All @@ -83,5 +90,75 @@ def test_log_type_lifecycle_integration():
)


@pytest.mark.integration
def test_log_type_parser_found_integration():
"""Integration test for trigger_github_checks when at least one parser exists.

It dynamically finds an existing parser in the instance, and uses it
to trigger checks, verifying that the path is constructed correctly
and doesn't result in a 404 due to duplication.
"""
sa_info = SERVICE_ACCOUNT_JSON if SERVICE_ACCOUNT_JSON.get("private_key") else None
client = SecOpsClient(service_account_info=sa_info)

# Use the configured project/customer IDs if available, otherwise fallback to dummy
project_id = CHRONICLE_CONFIG.get("project_id") or "140410331797"
customer_id = CHRONICLE_CONFIG.get("customer_id") or "1234-53411-24552"

chronicle = client.chronicle(
project_id=project_id,
customer_id=customer_id,
region=CHRONICLE_CONFIG.get("region", "us"),
)

print("\nListing all parsers to find a candidate for integration test...")
try:
# List all parsers across all log types
parsers = chronicle.list_parsers(log_type="-")
except APIError as e:
pytest.skip(f"Skipping test: Failed to list parsers (check config/auth): {e}")
return

if not parsers:
pytest.skip("Skipping test: No parsers found in the Chronicle instance to test the 'parser found' path.")
return

# Find a parser that has a valid name format we can parse
target_parser = None
target_log_type = None
target_parser_id = None

for p in parsers:
name = p.get("name", "")
parts = name.split("/")
if len(parts) == 10 and parts[6] == "logTypes" and parts[8] == "parsers":
target_parser = p
target_log_type = parts[7]
target_parser_id = parts[9]
break

if not target_parser:
pytest.skip("Skipping test: Found parsers but none matched the expected resource name format.")
return

print(f"Found candidate parser: {target_parser_id} for log type {target_log_type}")

print(f"Triggering checks for {target_log_type} (should use parser {target_parser_id})")
try:
result = chronicle.trigger_github_checks(
associated_pr="chronicle/content-hub/pull/701",
log_type=target_log_type,
)
assert isinstance(result, dict)
print("Successfully triggered checks (or received valid response from server)")
except APIError as e:
# If it fails, it should NOT be because of a 404 Not Found with duplicated path.
# It might fail with 400 Bad Request if the PR is invalid or other reasons,
# but a 404 with duplicated path indicates a regression.
error_msg = str(e)
assert "projects/" not in error_msg or error_msg.count("instances/") < 2
print(f"Server responded (graceful failure or expected error): {error_msg.strip()}")


if __name__ == "__main__":
pytest.main(["-v", __file__, "-m", "integration"])
153 changes: 152 additions & 1 deletion tests/chronicle/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
get_parser,
list_parsers,
run_parser,
trigger_github_checks,
)
from secops.exceptions import APIError
from secops.exceptions import APIError, SecOpsError


@pytest.fixture
Expand Down Expand Up @@ -1312,3 +1313,153 @@ def test_run_parser_statedump_parsing_multiple_results(
assert statedump_results[0]["statedumpResult"]["state"]["log"] == "1"
assert isinstance(statedump_results[1]["statedumpResult"], dict)
assert statedump_results[1]["statedumpResult"]["state"]["log"] == "2"


# --- trigger_github_checks Tests ---

def test_trigger_github_checks_no_parsers(chronicle_client):
"""Test trigger_github_checks when no parsers are found (fallback to '-')."""
log_type = "MY_LOGTYPE"
associated_pr = "owner/repo/pull/123"

with patch("secops.chronicle.parser.list_parsers", return_value=[]) as mock_list:
with patch("secops.chronicle.parser.chronicle_request", return_value={"status": "success"}) as mock_req:
result = trigger_github_checks(chronicle_client, associated_pr, log_type)

mock_list.assert_called_once_with(chronicle_client, log_type=log_type)
mock_req.assert_called_once_with(
client=chronicle_client,
method="POST",
api_version="v1alpha",
endpoint_path=f"logTypes/{log_type}/parsers/-:runAnalysis",
json={"report_type": "GITHUB_PARSER_VALIDATION", "pull_request": associated_pr},
timeout=60,
)
assert result == {"status": "success"}


def test_trigger_github_checks_one_parser(chronicle_client):
"""Test trigger_github_checks when one parser is found."""
log_type = "MY_LOGTYPE"
associated_pr = "owner/repo/pull/123"
parser_name = "projects/P/locations/L/instances/I/logTypes/MY_LOGTYPE/parsers/pa_123"
parsers = [{"name": parser_name}]

with patch("secops.chronicle.parser.list_parsers", return_value=parsers) as mock_list:
with patch("secops.chronicle.parser.chronicle_request", return_value={"status": "success"}) as mock_req:
result = trigger_github_checks(chronicle_client, associated_pr, log_type)

mock_list.assert_called_once_with(chronicle_client, log_type=log_type)
mock_req.assert_called_once_with(
client=chronicle_client,
method="POST",
api_version="v1alpha",
endpoint_path=f"logTypes/{log_type}/parsers/pa_123:runAnalysis",
json={"report_type": "GITHUB_PARSER_VALIDATION", "pull_request": associated_pr},
timeout=60,
)
assert result == {"status": "success"}


def test_trigger_github_checks_multiple_parsers(chronicle_client, caplog):
"""Test trigger_github_checks when multiple parsers are found."""
log_type = "MY_LOGTYPE"
associated_pr = "owner/repo/pull/123"
parsers = [
{"name": "projects/P/locations/L/instances/I/logTypes/MY_LOGTYPE/parsers/pa_123"},
{"name": "projects/P/locations/L/instances/I/logTypes/MY_LOGTYPE/parsers/pa_456"},
]

with patch("secops.chronicle.parser.list_parsers", return_value=parsers):
with patch("secops.chronicle.parser.chronicle_request", return_value={"status": "success"}) as mock_req:
import logging
with caplog.at_level(logging.WARNING):
result = trigger_github_checks(chronicle_client, associated_pr, log_type)

assert "Multiple parsers found" in caplog.text
mock_req.assert_called_once_with(
client=chronicle_client,
method="POST",
api_version="v1alpha",
endpoint_path=f"logTypes/{log_type}/parsers/pa_123:runAnalysis",
json={"report_type": "GITHUB_PARSER_VALIDATION", "pull_request": associated_pr},
timeout=60,
)
assert result == {"status": "success"}


def test_trigger_github_checks_malformed_parser_name(chronicle_client, caplog):
"""Test trigger_github_checks when parser name is malformed (fallback to '-')."""
log_type = "MY_LOGTYPE"
associated_pr = "owner/repo/pull/123"
parsers = [{"name": "invalid_name_format"}]

with patch("secops.chronicle.parser.list_parsers", return_value=parsers):
with patch("secops.chronicle.parser.chronicle_request", return_value={"status": "success"}) as mock_req:
import logging
with caplog.at_level(logging.WARNING):
result = trigger_github_checks(chronicle_client, associated_pr, log_type)

assert "Could not parse parser ID from resource name" in caplog.text
mock_req.assert_called_once_with(
client=chronicle_client,
method="POST",
api_version="v1alpha",
endpoint_path=f"logTypes/{log_type}/parsers/-:runAnalysis",
json={"report_type": "GITHUB_PARSER_VALIDATION", "pull_request": associated_pr},
timeout=60,
)
assert result == {"status": "success"}


def test_trigger_github_checks_invalid_input(chronicle_client):
"""Test trigger_github_checks raises SecOpsError for invalid input."""
with pytest.raises(SecOpsError) as exc_info:
trigger_github_checks(chronicle_client, "invalid_pr", "MY_LOGTYPE")
assert "associated_pr must be in the format" in str(exc_info.value)

with pytest.raises(SecOpsError) as exc_info:
trigger_github_checks(chronicle_client, "owner/repo/pull/123", "")
assert "log_type must be a valid string" in str(exc_info.value)


def test_trigger_github_checks_regression_404(chronicle_client):
"""Regression test for 404 error due to duplicated path segments.

Verifies that when list_parsers returns a parser with a full resource name,
only the parser ID is extracted and used in a relative endpoint path,
preventing the duplication of projects/locations/instances in the URL.
"""
log_type = "DUMMY_LOGTYPE"
associated_pr = "owner/repo/pull/123"

# Exact values from the reported error
project_id = "478140460956"
instance_id = "ebdc4bb9-878b-11e7-8455-10604b7cb5c1"
parser_id = "3895599384024317953"

full_parser_name = (
f"projects/{project_id}/locations/us/instances/{instance_id}"
f"/logTypes/{log_type}/parsers/{parser_id}"
)
parsers = [{"name": full_parser_name}]

with patch("secops.chronicle.parser.list_parsers", return_value=parsers) as mock_list:
with patch("secops.chronicle.parser.chronicle_request", return_value={"status": "success"}) as mock_req:
result = trigger_github_checks(chronicle_client, associated_pr, log_type)

mock_list.assert_called_once_with(chronicle_client, log_type=log_type)

# The key assertion is that the endpoint_path is relative and only contains the parser_id,
# NOT the full resource name which would cause duplication in chronicle_request.
expected_endpoint_path = f"logTypes/{log_type}/parsers/{parser_id}:runAnalysis"

mock_req.assert_called_once_with(
client=chronicle_client,
method="POST",
api_version="v1alpha",
endpoint_path=expected_endpoint_path,
json={"report_type": "GITHUB_PARSER_VALIDATION", "pull_request": associated_pr},
timeout=60,
)
assert result == {"status": "success"}
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def cli_env():
"""Set up environment for CLI tests."""
env = os.environ.copy()
# Add any environment variables needed for testing
bindir = os.path.dirname(sys.executable)
if "PATH" in env:
env["PATH"] = f"{bindir}{os.pathsep}{env['PATH']}"
else:
env["PATH"] = bindir
return env


Expand Down