Skip to content

Commit cdfda2e

Browse files
committed
refactor: remove code overhead and redundant logging
- Remove verbose 'Received /X request' logs from all 11 route handlers - Remove redundant Arrow conversion logs from logic modules - Remove all logging from arrow_utils.py (low-level serialization noise) - Remove unused logger/logging imports from 3 extract logic modules - Remove unnecessary comments from join.py, extract modules - Remove dead _has_request_context() wrapper in service_utils.py - Remove dead metadata field from StepResult dataclass - Remove unreachable else branch and per-format logs in load.py - Remove decorative banner comments from service_utils.py - Fix deprecated datetime.utcnow() in xcom_file_utils.py - Remove unused pyarrow import from columns.py - Fix variable shadowing (l -> layer) in test_pipeline_compiler.py - Add pyyaml to CI dependencies - All 208 tests passing
1 parent 3231535 commit cdfda2e

30 files changed

Lines changed: 53 additions & 166 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
- name: Install test dependencies
3535
run: |
3636
pip install pytest pytest-cov
37-
pip install flask pyarrow pandas numpy prometheus_client xlsxwriter openpyxl requests
37+
pip install flask pyarrow pandas numpy prometheus_client xlsxwriter openpyxl requests pyyaml
3838
- name: Run unit tests
3939
run: pytest tests/unit/ -v --tb=short --cov=services --cov-report=xml
4040
- name: Upload coverage

ai_agent/pipeline_compiler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ class StepResult:
3434
duration_sec: float = 0.0
3535
data_size_bytes: int = 0
3636
error_message: str = ""
37-
metadata: dict = field(default_factory=dict)
3837

3938

4039
@dataclass

airflow/dags/xcom_file_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import logging
1212
import os
1313
import uuid
14-
from datetime import datetime
14+
from datetime import datetime, timezone
1515

1616
logger = logging.getLogger("xcom_file_utils")
1717

@@ -33,7 +33,7 @@ def save_ipc_to_shared(ipc_data: bytes, dataset_name: str, step_name: str) -> st
3333
xcom_dir = os.path.join(SHARED_DATA_ROOT, dataset_name, "xcom")
3434
os.makedirs(xcom_dir, exist_ok=True)
3535

36-
timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
36+
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
3737
unique_id = uuid.uuid4().hex[:8]
3838
filename = f"{step_name}_{timestamp}_{unique_id}.arrow"
3939
file_path = os.path.join(xcom_dir, filename)

services/clean-nan-service/app/clean.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ def apply_transformations(arrow_table, strategy="drop", fill_value=None, columns
3333

3434
try:
3535
df = arrow_table.to_pandas()
36-
logger.info(f"Converted to Pandas with shape {df.shape}, strategy='{strategy}'")
3736

3837
total_cells = df.size
3938
total_null_before = int(df.isna().sum().sum())
@@ -74,11 +73,6 @@ def apply_transformations(arrow_table, strategy="drop", fill_value=None, columns
7473
total_null_after = int(df_cleaned.isna().sum().sum())
7574
nulls_handled = total_null_before - total_null_after
7675

77-
logger.info(
78-
f"Strategy '{strategy}': handled {nulls_handled} nulls. "
79-
f"Rows {arrow_table.num_rows} -> {df_cleaned.shape[0]}"
80-
)
81-
8276
cleaned_arrow_table = pa.Table.from_pandas(df_cleaned)
8377
return (cleaned_arrow_table, nulls_handled, total_null_before, total_cells)
8478
except Exception as e:

services/clean-nan-service/app/routes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ def clean_nan():
2828
correlation_id = get_correlation_id()
2929
try:
3030
REQUEST_COUNTER.inc()
31-
logger.info("Received /clean-nan request.", extra={"correlation_id": correlation_id})
3231

3332
params = parse_x_params()
3433
dataset_name = params.get('dataset_name')

services/common/arrow_utils.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
1-
import logging
2-
31
import pyarrow as pa
42
import pyarrow.ipc as pa_ipc
53

6-
logger = logging.getLogger(__name__)
74

85
def ipc_to_table(ipc_data: bytes) -> pa.Table:
96
"""
107
Deserialize bytes of Arrow IPC in Arrow Table.
118
"""
129
try:
1310
reader = pa_ipc.open_stream(pa.BufferReader(ipc_data))
14-
table = reader.read_all()
15-
logger.info(f"Deserialized Arrow Table with {table.num_rows} rows and {table.num_columns} columns.")
16-
return table
11+
return reader.read_all()
1712
except Exception as e:
18-
logger.error(f"Failed to parse Arrow IPC data: {e}")
19-
raise
13+
raise ValueError(f"Failed to parse Arrow IPC data: {e}") from e
2014

2115
def table_to_ipc(table: pa.Table) -> bytes:
2216
"""
@@ -26,9 +20,6 @@ def table_to_ipc(table: pa.Table) -> bytes:
2620
sink = pa.BufferOutputStream()
2721
with pa_ipc.new_stream(sink, table.schema) as writer:
2822
writer.write_table(table)
29-
ipc_bytes = sink.getvalue().to_pybytes()
30-
logger.info(f"Serialized Arrow Table to IPC format, {len(ipc_bytes)} bytes.")
31-
return ipc_bytes
23+
return sink.getvalue().to_pybytes()
3224
except Exception as e:
33-
logger.error(f"Failed to serialize Arrow Table to IPC format: {e}")
34-
raise
25+
raise ValueError(f"Failed to serialize Arrow Table to IPC: {e}") from e

services/common/service_utils.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from flask import Response, g, has_request_context, jsonify, request
2424
from prometheus_client import Counter, generate_latest
2525

26-
# ── Prometheus helpers ──────────────────────────────────────────────
2726

2827
def create_service_counters(service_slug):
2928
"""
@@ -42,7 +41,6 @@ def create_service_counters(service_slug):
4241
)
4342

4443

45-
# ── Standard endpoints ─────────────────────────────────────────────
4644

4745
def register_standard_endpoints(bp, service_name):
4846
"""
@@ -62,7 +60,6 @@ def metrics():
6260
return Response(generate_latest(), mimetype="text/plain")
6361

6462

65-
# ── Request helpers ─────────────────────────────────────────────────
6663

6764
def get_correlation_id():
6865
"""
@@ -92,7 +89,6 @@ def parse_x_params():
9289
raise ValueError(f"Malformed JSON in X-Params header: {exc}") from exc
9390

9491

95-
# ── Metadata helper ────────────────────────────────────────────────
9692

9793
def save_metadata(service_name, dataset_name, extra_fields=None, start_time=None):
9894
"""
@@ -118,7 +114,7 @@ def save_metadata(service_name, dataset_name, extra_fields=None, start_time=None
118114
"service_name": service_name,
119115
"dataset_name": dataset_name,
120116
"timestamp": timestamp,
121-
"correlation_id": get_correlation_id() if _has_request_context() else None,
117+
"correlation_id": get_correlation_id() if has_request_context() else None,
122118
}
123119
if start_time is not None:
124120
metadata["duration_sec"] = round(time.time() - start_time, 3)
@@ -130,7 +126,3 @@ def save_metadata(service_name, dataset_name, extra_fields=None, start_time=None
130126

131127
return metadata_path
132128

133-
134-
def _has_request_context():
135-
"""Check whether we're inside a Flask request context."""
136-
return has_request_context()

services/data-quality-service/app/routes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ def data_quality():
5252
correlation_id = get_correlation_id()
5353
try:
5454
REQUEST_COUNTER.inc()
55-
logger.info("Received /data-quality request.", extra={"correlation_id": correlation_id})
5655

5756
header_data = parse_x_params()
5857
dataset_name = header_data.get('dataset_name')

services/delete-columns-service/app/columns.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import logging
22

3-
import pyarrow as pa
4-
53
logger = logging.getLogger('delete-columns-service')
64

75
def drop_columns_arrow(arrow_table, columns_to_delete):

services/delete-columns-service/app/routes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ def delete_columns():
2828
correlation_id = get_correlation_id()
2929
try:
3030
REQUEST_COUNTER.inc()
31-
logger.info("Received /delete-columns request.", extra={"correlation_id": correlation_id})
3231

3332
header_data = parse_x_params()
3433
columns_raw = header_data.get('columns', '')

0 commit comments

Comments
 (0)