This repository was archived by the owner on Feb 26, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Add cloudwatch reporting script to tools #40
Open
dylanlee
wants to merge
2
commits into
main
Choose a base branch
from
reporting
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,281 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| import argparse | ||
| import datetime | ||
| import json | ||
| import re | ||
| import sys | ||
| import time | ||
| from pathlib import Path | ||
| from typing import Dict, List, Set | ||
|
|
||
| import boto3 | ||
| from botocore.exceptions import ClientError | ||
|
|
||
|
|
||
| def setup_arguments(): | ||
| """Sets up and parses command-line arguments.""" | ||
| parser = argparse.ArgumentParser( | ||
| description="Analyzes pipeline logs to categorize AOIs, providing detailed intermediate outputs for verification.", | ||
| epilog="""Examples: | ||
| # Use default 7 days back | ||
| %(prog)s run_list.txt batch_name output_dir | ||
|
|
||
| # Use specific date-hour range | ||
| # For >10k results, use the AWS console to find message timestamps, then query smaller ranges. | ||
| %(prog)s run_list.txt batch_name output_dir 2025-08-06-12 2025-08-09-18 | ||
| """, | ||
| formatter_class=argparse.RawDescriptionHelpFormatter, | ||
| ) | ||
| parser.add_argument( | ||
| "run_list", | ||
| type=Path, | ||
| help="Path to file containing list of AOIs to process", | ||
| ) | ||
| parser.add_argument( | ||
| "batch_name", | ||
| help="Batch name for filtering logs (e.g., fim100_huc12_10m_2025-08-06-12)", | ||
| ) | ||
| parser.add_argument( | ||
| "output_dir", | ||
| type=Path, | ||
| help="Directory where ALL output files will be written", | ||
| ) | ||
| parser.add_argument( | ||
| "start_datetime", | ||
| nargs="?", | ||
| help="Start date-time in YYYY-MM-DD-HH format (optional)", | ||
| ) | ||
| parser.add_argument( | ||
| "end_datetime", | ||
| nargs="?", | ||
| help="End date-time in YYYY-MM-DD-HH format (optional)", | ||
| ) | ||
| return parser.parse_args() | ||
|
|
||
|
|
||
| def run_query(logs_client, log_group: str, start_time: int, end_time: int, query: str) -> List[Dict]: | ||
| """Runs a CloudWatch Logs Insights query and returns the results.""" | ||
| try: | ||
| start_query_response = logs_client.start_query( | ||
| logGroupName=log_group, | ||
| startTime=start_time, | ||
| endTime=end_time, | ||
| queryString=query, | ||
| ) | ||
| query_id = start_query_response["queryId"] | ||
| status = "Running" | ||
| while status in ["Running", "Scheduled"]: | ||
| print(f" Query status: {status}. Waiting...") | ||
| time.sleep(5) | ||
| results_response = logs_client.get_query_results(queryId=query_id) | ||
| status = results_response["status"] | ||
|
|
||
| if status != "Complete": | ||
| print(f"Error: Query failed with status '{status}'", file=sys.stderr) | ||
| return [] | ||
|
|
||
| results = results_response.get("results", []) | ||
| if len(results) == 10000: | ||
| print(" WARNING: Query returned 10,000 results (CloudWatch limit). Results may be incomplete.") | ||
|
|
||
| return results | ||
| except ClientError as e: | ||
| print(f"An AWS error occurred: {e}", file=sys.stderr) | ||
| return [] | ||
|
|
||
|
|
||
| def extract_aois_from_results(results: List[Dict]) -> Set[str]: | ||
| """Extracts unique AOI names from structured query results.""" | ||
| aois = set() | ||
| aoi_pattern = re.compile(r"aoi_name=([^,\]]+)") | ||
| for row in results: | ||
| for field in row: | ||
| if field["field"] == "@logStream": | ||
| match = aoi_pattern.search(field["value"]) | ||
| if match: | ||
| aois.add(match.group(1)) | ||
| return aois | ||
|
|
||
|
|
||
| def write_data_to_json(file_path: Path, data: List[Dict]): | ||
| """Writes raw query results to a pretty-printed JSON file.""" | ||
| print(f" Writing {len(data)} raw results to {file_path}") | ||
| with file_path.open("w") as f: | ||
| json.dump(data, f, indent=2) | ||
|
|
||
|
|
||
| def write_aois_to_file(file_path: Path, aois: Set[str]): | ||
| """Writes a sorted set of AOIs to a text file.""" | ||
| print(f" Writing {len(aois)} AOIs to {file_path}") | ||
| with file_path.open("w") as f: | ||
| for aoi in sorted(list(aois)): | ||
| f.write(f"{aoi}\n") | ||
|
|
||
|
|
||
| def main(): | ||
| """Main function to orchestrate the pipeline analysis.""" | ||
| args = setup_arguments() | ||
|
|
||
| if not args.run_list.is_file(): | ||
| print(f"Error: Run list file not found: {args.run_list}", file=sys.stderr) | ||
| sys.exit(1) | ||
|
|
||
| # Determine time range | ||
| if args.start_datetime and args.end_datetime: | ||
| try: | ||
| start_dt = datetime.datetime.strptime(args.start_datetime, "%Y-%m-%d-%H") | ||
| end_dt = datetime.datetime.strptime(args.end_datetime, "%Y-%m-%d-%H").replace(minute=59, second=59) | ||
| time_range_desc = f"{args.start_datetime} to {args.end_datetime}" | ||
| except ValueError: | ||
| print( | ||
| "Error: Invalid datetime format. Use YYYY-MM-DD-HH.", | ||
| file=sys.stderr, | ||
| ) | ||
| sys.exit(1) | ||
| else: | ||
| end_dt = datetime.datetime.now(datetime.timezone.utc) | ||
| start_dt = end_dt - datetime.timedelta(days=7) | ||
| time_range_desc = "last 7 days" | ||
|
|
||
| start_time_ts = int(start_dt.timestamp()) | ||
| end_time_ts = int(end_dt.timestamp()) | ||
|
|
||
| args.output_dir.mkdir(parents=True, exist_ok=True) | ||
| print(f"Starting analysis for batch: {args.batch_name}\nTime range: {time_range_desc}") | ||
|
|
||
| log_group = "/aws/ec2/nomad-client-linux-test" | ||
| queries = { | ||
| "success": """ | ||
| fields @logStream | ||
| | filter @logStream like /pipeline\\/dispatch-\\[batch_name={0}/ | ||
| | filter strcontains(@message, "Pipeline SUCCESS") | ||
| | dedup @logStream | ||
| """.strip(), | ||
| "early_exit": """ | ||
| fields @logStream | ||
| | filter @logStream like /pipeline\\/dispatch-\\[batch_name={0}/ | ||
| | filter strcontains(@message, "Pipeline exiting early") | ||
| | dedup @logStream | ||
| """.strip(), | ||
| "failed": """ | ||
| fields @logStream | ||
| | filter @logStream like /pipeline\\/dispatch-\\[batch_name={0}/ | ||
| | filter strcontains(@message, "Pipeline FAILED") | ||
| | dedup @logStream | ||
| """.strip(), | ||
| "job_errors": """ | ||
| fields @logStream, @message | ||
| | filter @logStream like /pipeline\\/dispatch-\\[batch_name={0}/ | ||
| | filter @message like /OOM|HTTPConnectionPool|BaseNomadException|JobStatus\\.(LOST|STOPPED|CANCELLED)|botocore\\.exceptions\\.ClientError/ | ||
| | dedup @logStream | ||
| """.strip(), | ||
| "agr_mos_errors": """ | ||
| fields @logStream, @message | ||
| | filter @logStream like /(agreement_maker|fim_mosaicker)\\/dispatch-\\[batch_name={0}/ | ||
| | filter (@message like /[Ee][Rr][Rr][Oo][Rr]/ or @message like /[Ww][Aa][Rr][Nn]/) | ||
| and @message not like /Agreement map contains no valid data/ | ||
| and @message not like /distributed\\.shuffle/ | ||
| and @message not like /No features found/ | ||
| and @message not like /Worker is at/ | ||
| and @message not like /distributed\\.worker\\.memory - WARNING - Unmanaged memory use is high/ | ||
| and @message not like /gc\\.collect/ | ||
| and @message not like /Warning 1: Request for \\d+-\\d+ failed with response_code=\\d+/ | ||
| and @message not like /Failed to communicate with scheduler during heartbeat/ | ||
| and @message not like /NotGeoreferencedWarning/ | ||
| and @message not like /distributed\\.comm\\.core\\.CommClosedError/ | ||
| and @message not like /UserWarning: Sending large graph/ | ||
| and @message not like /warnings\\.warn/ | ||
| and @message not like /has GPKG application_id/ | ||
| and @message not like /distributed\\.nanny - WARNING - Worker process still alive/ | ||
| | dedup @logStream | ||
| """.strip(), | ||
| "inundate_errors": """ | ||
| fields @logStream, @message | ||
| | filter @logStream like /hand_inundator\\/dispatch-\\[batch_name={0}/ | ||
| | filter (@message like /[Ee][Rr][Rr][Oo][Rr]/ or @message like /[Ww][Aa][Rr][Nn]/) | ||
| and @message not like /No matching forecast data/ | ||
| and @message not like /'NoneType' is not iterable/ | ||
| and @message not like /No catchments with negative LakeID/ | ||
| | dedup @logStream | ||
| """.strip(), | ||
| "ignorable_errors": """ | ||
| fields @logStream, @message | ||
| | filter @logStream like /(agreement_maker|fim_mosaicker)\\/dispatch-\\[batch_name={0}/ | ||
| | filter @message like /Agreement map contains no valid data/ | ||
| | dedup @logStream | ||
| """.strip(), | ||
| } | ||
|
|
||
| # --- Execute all queries, write intermediate files, and gather AOIs --- | ||
| logs_client = boto3.client("logs", region_name="us-east-1") | ||
| aoi_sets = {} | ||
| for name, query_template in queries.items(): | ||
| print(f"\nRunning query for '{name}' pipelines...") | ||
| query = query_template.format(args.batch_name) | ||
| results = run_query(logs_client, log_group, start_time_ts, end_time_ts, query) | ||
|
|
||
| # Write intermediate files for this query | ||
| write_data_to_json(args.output_dir / f"{name}_results.json", results) | ||
| aois = extract_aois_from_results(results) | ||
| # don't write ignorable errors here since need to write truly ignorable errored out aoi's after taking into account aoi's that have both ignorable errrors and valid errors | ||
| # don't write success or failed aois since we write final results at the end | ||
| if name not in ["ignorable_errors", "success", "failed"]: | ||
| write_aois_to_file(args.output_dir / f"{name}_aois.txt", aois) | ||
|
|
||
| aoi_sets[name] = aois | ||
|
|
||
| # --- Categorize AOIs using set logic, writing key intermediate sets --- | ||
| print("\nCategorizing all AOIs...") | ||
| initial_aois = {line.strip() for line in args.run_list.open() if line.strip()} | ||
| write_aois_to_file(args.output_dir / "sorted-run-list.txt", initial_aois) | ||
|
|
||
| successful_aois = aoi_sets["success"] | ||
| successful_aois.update(aoi_sets["early_exit"]) | ||
|
|
||
| failed_aois = aoi_sets["failed"] | ||
| all_found_aois = successful_aois.union(failed_aois) | ||
| silent_failures = initial_aois - all_found_aois | ||
| # Write the silent failures list, a key intermediate output | ||
| write_aois_to_file(args.output_dir / "silent_failures.txt", silent_failures) | ||
|
|
||
| failed_aois.update(silent_failures) | ||
| failed_aois.update(aoi_sets["job_errors"]) | ||
| failed_aois.update(aoi_sets["agr_mos_errors"]) | ||
| failed_aois.update(aoi_sets["inundate_errors"]) | ||
|
|
||
| successful_aois -= failed_aois | ||
|
|
||
| # AOIs with ignorable errors but no real errors (agr_mos or inundate) should be considered successful | ||
| real_error_aois = aoi_sets["agr_mos_errors"].union(aoi_sets["inundate_errors"]) | ||
| truly_ignorable = aoi_sets["ignorable_errors"] - real_error_aois | ||
| write_aois_to_file(args.output_dir / "ignorable_errors_aois.txt", truly_ignorable) | ||
| successful_aois.update(truly_ignorable) | ||
| failed_aois -= truly_ignorable | ||
|
|
||
| # --- Final Summary and Output --- | ||
| print("\n" + "=" * 10 + " SUMMARY " + "=" * 10) | ||
| print(f"Time range queried: {time_range_desc}") | ||
| print(f"Input AOIs: {len(initial_aois)}") | ||
| print(f"Successful AOIs: {len(successful_aois)}") | ||
| print(f"Failed AOIs: {len(failed_aois)}") | ||
| total_output = len(successful_aois) + len(failed_aois) | ||
| print(f"Total processed: {total_output}\n") | ||
|
|
||
| # Write final output files | ||
| write_aois_to_file(args.output_dir / "unique_success_aoi_names.txt", successful_aois) | ||
| write_aois_to_file(args.output_dir / "unique_fail_aoi_names.txt", failed_aois) | ||
|
|
||
| if len(initial_aois) != total_output: | ||
| missing_aois = initial_aois - successful_aois - failed_aois | ||
| print(f"WARNING: Input count ({len(initial_aois)}) does not match output count ({total_output})!") | ||
| print(f"Missing AOIs: {len(missing_aois)}") | ||
| write_aois_to_file(args.output_dir / "missing_aois.txt", missing_aois) | ||
| else: | ||
| print("All AOIs accounted for!") | ||
|
|
||
| print("\nAnalysis complete!") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a type hint for logs_client as well?