Skip to content

Commit 998b744

Browse files
committed
add logs:import
1 parent 9f21767 commit 998b744

1 file changed

Lines changed: 143 additions & 0 deletions

File tree

ingestion/v1alpha/logs_import.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
#!/usr/bin/env python3
2+
3+
# Copyright 2025 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
"""API Sample to import logs."""
18+
import argparse
19+
import base64
20+
import datetime
21+
import json
22+
import logging
23+
from typing import Mapping, Any
24+
25+
from common import chronicle_auth
26+
from common import project_instance
27+
from common import project_id
28+
from common import regions
29+
from google.auth.transport import requests
30+
31+
SCOPES = [
32+
"https://www.googleapis.com/auth/cloud-platform",
33+
]
34+
35+
36+
def logs_import(
37+
http_session: requests.AuthorizedSession,
38+
logs_file,
39+
proj_id: str,
40+
region: str,
41+
project_instance: str,
42+
forwarder_id: str) -> Mapping[str, Any]:
43+
"""Imports logs to Chronicle using the GCP CLOUDAUDIT log type.
44+
45+
Args:
46+
http_session: Authorized session for HTTP requests.
47+
logs_file: File-like object containing the logs to import.
48+
proj_id: Google Cloud project ID.
49+
region: Chronicle region.
50+
project_instance: Chronicle instance.
51+
forwarder_id: UUID4 of the forwarder.
52+
53+
Returns:
54+
dict: JSON response from the API.
55+
56+
Raises:
57+
requests.HTTPError: If the request fails.
58+
"""
59+
log_type = "GCP_CLOUDAUDIT"
60+
parent = (f"projects/{proj_id}/"
61+
f"locations/{region}/"
62+
f"instances/{project_instance}/"
63+
f"logTypes/{log_type}")
64+
url = (f"https://{region}-chronicle.googleapis.com/"
65+
f"v1alpha/{parent}/logs:import")
66+
logs = logs_file.read()
67+
# Reset file pointer to beginning in case it needs to be read again
68+
logs_file.seek(0)
69+
logs = base64.b64encode(logs.encode("utf-8")).decode("utf-8")
70+
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
71+
body = {
72+
"inline_source": {
73+
"logs": [
74+
{
75+
"data": logs,
76+
"log_entry_time": now,
77+
"collection_time": now,
78+
}
79+
],
80+
"forwarder": (f"projects/{proj_id}/"
81+
f"locations/{region}/"
82+
f"instances/{project_instance}/"
83+
f"forwarders/{forwarder_id}")
84+
}
85+
}
86+
response = http_session.request("POST", url, json=body)
87+
if response.status_code >= 400:
88+
logging.error("Error response: %s", response.text)
89+
response.raise_for_status()
90+
logging.info("Request successful with status code: %d", response.status_code)
91+
return response.json()
92+
93+
94+
def main():
95+
"""Main entry point for the logs import script."""
96+
# Configure logging
97+
logging.basicConfig(
98+
level=logging.INFO,
99+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
100+
)
101+
logger = logging.getLogger(__name__)
102+
103+
parser = argparse.ArgumentParser(description="Import logs to Chronicle.")
104+
# common
105+
chronicle_auth.add_argument_credentials_file(parser)
106+
project_instance.add_argument_project_instance(parser)
107+
project_id.add_argument_project_id(parser)
108+
regions.add_argument_region(parser)
109+
# local
110+
parser.add_argument(
111+
"--forwarder_id",
112+
type=str,
113+
required=True,
114+
help="UUID4 of the forwarder")
115+
parser.add_argument(
116+
"--logs_file",
117+
type=argparse.FileType("r"),
118+
required=True,
119+
help="path to a log file (or \"-\" for STDIN)")
120+
args = parser.parse_args()
121+
auth_session = chronicle_auth.initialize_http_session(
122+
args.credentials_file,
123+
SCOPES,
124+
)
125+
try:
126+
result = logs_import(
127+
auth_session,
128+
args.logs_file,
129+
args.project_id,
130+
args.region,
131+
args.project_instance,
132+
args.forwarder_id
133+
)
134+
logging.info("Import operation completed successfully")
135+
print(json.dumps(result, indent=2))
136+
except Exception as e: # pylint: disable=broad-except
137+
logging.error("Import operation failed: %s", str(e))
138+
return 1
139+
return 0
140+
141+
142+
if __name__ == "__main__":
143+
main()

0 commit comments

Comments
 (0)