Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ jobs:

- name: Lint with flake8
run: |
flake8 autosinapi tests --count --select=E9,F63,F7,F82 --show-source --statistics --ignore=E203,W503
flake8 autosinapi tests --count --select=E9,F63,F7,F82 --show-source --statistics --ignore=E203,W503
flake8 autosinapi tests --count --max-complexity=10 --max-line-length=88 --statistics --ignore=E203,W503
156 changes: 156 additions & 0 deletions autosinapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,160 @@
"DownloadError",
"ProcessingError",
"DatabaseError",
"run_etl"
]

import os
import logging
import uuid # Added for run_id generation
from contextlib import contextmanager
from typing import Dict, Any

from .etl_pipeline import PipelineETL, setup_logging


# Configure a logger for this module
logger = logging.getLogger(__name__)

@contextmanager
def set_env_vars(env_vars: Dict[str, str]):
"""Temporarily sets environment variables."""
original_env = {key: os.getenv(key) for key in env_vars}
for key, value in env_vars.items():
os.environ[key] = str(value) # Ensure value is string for env vars
try:
yield
finally:
for key, value in original_env.items():
if value is None:
del os.environ[key]
else:
os.environ[key] = value

def run_etl(db_config: Dict[str, Any] = None, sinapi_config: Dict[str, Any] = None, mode: str = 'local', log_level: str = 'INFO'):
# Generate a unique run_id for this execution
run_id = str(uuid.uuid4())[:8]

# Read skip_download from environment variable
skip_download_env = os.getenv('AUTOSINAPI_SKIP_DOWNLOAD', 'False').lower()
skip_download = (skip_download_env == 'true' or skip_download_env == '1')

# If configs are not provided, try to load from environment variables
if db_config is None:
try:
db_config = {
'host': os.getenv('POSTGRES_HOST', 'db'),
'port': int(os.getenv('POSTGRES_PORT', 5432)),
'database': os.getenv('POSTGRES_DB'),
'user': os.getenv('POSTGRES_USER'),
'password': os.getenv('POSTGRES_PASSWORD')
}
# Basic validation for required DB vars
if not all(db_config.get(k) for k in ['database', 'user', 'password']):
raise ValueError("Variáveis de ambiente do banco de dados incompletas.")
except (ValueError, TypeError) as e:
logger.error(f"Erro ao carregar db_config de variáveis de ambiente: {e}", exc_info=True)
return {
"status": "failed",
"message": f"Erro de configuração do banco de dados: {e}. Verifique as variáveis de ambiente POSTGRES_.",
"tables_updated": [],
"records_inserted": 0
}

if sinapi_config is None:
try:
sinapi_config = {
'year': int(os.getenv('AUTOSINAPI_YEAR')),
'month': int(os.getenv('AUTOSINAPI_MONTH')),
'type': os.getenv('AUTOSINAPI_TYPE', 'REFERENCIA'),
'duplicate_policy': os.getenv('AUTOSINAPI_POLICY', 'substituir')
}
# Basic validation for required SINAPI vars
if not all(sinapi_config.get(k) for k in ['year', 'month']):
raise ValueError("Variáveis de ambiente SINAPI incompletas.")
except (ValueError, TypeError) as e:
logger.error(f"Erro ao carregar sinapi_config de variáveis de ambiente: {e}", exc_info=True)
return {
"status": "failed",
"message": f"Erro de configuração SINAPI: {e}. Verifique as variáveis de ambiente AUTOSINAPI_.",
"tables_updated": [],
"records_inserted": 0
}

# Validate inputs (after potentially loading from env vars)
if not isinstance(db_config, dict) or not db_config:
return {
"status": "failed",
"message": "Erro de validação: db_config inválido ou vazio.",
"tables_updated": [],
"records_inserted": 0
}
if not isinstance(sinapi_config, dict) or not sinapi_config:
return {
"status": "failed",
"message": "Erro de validação: sinapi_config inválido ou vazio.",
"tables_updated": [],
"records_inserted": 0
}
if mode not in ['local', 'server']:
return {
"status": "failed",
"message": "Erro de validação: mode deve ser 'local' ou 'server'.",
"tables_updated": [],
"records_inserted": 0
}
if log_level.upper() not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
return {
"status": "failed",
"message": f"Erro de validação: log_level inválido: {log_level}. Use 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'.",
"tables_updated": [],
"records_inserted": 0
}

# Prepare environment variables
env_vars_to_set = {
'DOCKER_ENV': 'true', # Assuming API runs in a docker-like environment
'POSTGRES_HOST': db_config.get('host'),
'POSTGRES_PORT': db_config.get('port'),
'POSTGRES_DB': db_config.get('database'),
'POSTGRES_USER': db_config.get('user'),
'POSTGRES_PASSWORD': db_config.get('password'),
'AUTOSINAPI_YEAR': sinapi_config.get('year'),
'AUTOSINAPI_MONTH': sinapi_config.get('month'),
'AUTOSINAPI_TYPE': sinapi_config.get('type', 'REFERENCIA'),
'AUTOSINAPI_POLICY': sinapi_config.get('duplicate_policy', 'substituir'),
'AUTOSINAPI_MODE': mode # Pass the mode
}

# Filter out None values
env_vars_to_set = {k: v for k, v in env_vars_to_set.items() if v is not None}

# Set up logging for the pipeline run
# The setup_logging function in autosinapi_pipeline.py takes debug_mode.
# We need to map log_level to debug_mode.
debug_mode = (log_level.upper() == 'DEBUG')
setup_logging(run_id=run_id, debug_mode=debug_mode)

try:
with set_env_vars(env_vars_to_set):
logger.info(f"Iniciando execução do pipeline com modo: {mode}"
f"e nível de log: {log_level}")
pipeline = PipelineETL(debug_mode=debug_mode, run_id=run_id) # Pass run_id to PipelineETL
result = pipeline.run()
logger.info("Pipeline executado com sucesso.")
return result
except Exception as e:
logger.error(f"Erro ao executar o pipeline: {e}", exc_info=True)
# Re-raise the exception to indicate task failure, or return a structured error
# based on the user's request for run_etl to return a dictionary on failure.
# Since pipeline.run() already returns a dictionary on failure,
# this outer exception block should only catch errors *before* pipeline.run() is called
# or unexpected errors not caught by pipeline.run().
# For consistency, we'll return a structured error here too.
return {
"status": "failed",
"message": f"Erro inesperado antes ou durante a inicialização do pipeline: {e}",
"tables_updated": [],
"records_inserted": 0
}

138 changes: 118 additions & 20 deletions autosinapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@

Este módulo define a classe `Config`, responsável por centralizar, validar e gerenciar
todas as configurações necessárias para a execução do pipeline de ETL.

A classe garante que todas as chaves obrigatórias para a conexão com o banco de dados
e para os parâmetros do SINAPI sejam fornecidas, levantando um erro claro em caso de
configurações ausentes.
"""

from typing import Any, Dict
Expand All @@ -17,54 +13,156 @@
class Config:
"""Gerenciador de configurações do AutoSINAPI."""

# --- Seção de Constantes Padrão ---
# Usado como fallback se não for fornecida uma configuração customizada.
# Permite que o comportamento do pipeline seja extensivamente personalizado.
DEFAULT_CONSTANTS = {
# --- Constantes do Downloader ---
"BASE_URL": "https://www.caixa.gov.br/Downloads/sinapi-a-vista-composicoes",
"VALID_TYPES": ["REFERENCIA", "DESONERADO"],
"TIMEOUT": 30,
"ALLOWED_LOCAL_FILE_EXTENSIONS": [".xlsx", ".xls"],
"DOWNLOAD_FILENAME_TEMPLATE": "SINAPI_{type}_{month}_{year}",
"DOWNLOAD_FILE_EXTENSION": ".zip",

# --- Constantes do ETL Pipeline ---
"REFERENCE_FILE_KEYWORD": "Referência",
"MAINTENANCE_FILE_KEYWORD": "Manuten",
"MAINTENANCE_DEACTIVATION_KEYWORD": "%DESATIVAÇÃO%",

"TEMP_CSV_DIR": "csv_temp",
"ZIP_FILENAME_TEMPLATE": "SINAPI-{year}-{month}-formato-xlsx.zip",
"DB_POLICY_APPEND": "append",
"DB_POLICY_UPSERT": "upsert",
"DEFAULT_PLACEHOLDER_UNIT": "UN",
"PLACEHOLDER_INSUMO_DESC_TEMPLATE": "INSUMO_DESCONHECIDO_{code}",
"PLACEHOLDER_COMPOSICAO_DESC_TEMPLATE": "COMPOSICAO_DESCONHECIDA_{code}",
"STATUS_SUCCESS": "SUCESSO",
"STATUS_SUCCESS_NO_DATA": "SUCESSO (SEM DADOS)",
"STATUS_FAILURE": "FALHA",

# --- Constantes do Pre-Processor ---
"SHEETS_TO_CONVERT": ['CSD', 'CCD', 'CSE'],
"PREPROCESSOR_CSV_SEPARATOR": ";",

# --- Constantes do Processor ---
"COMPOSICAO_ITENS_SHEET_KEYWORD": "Analítico",
"COMPOSICAO_ITENS_SHEET_EXCLUDE_KEYWORD": "Custo",
"MANUTENCOES_HEADER_KEYWORDS": ["REFERENCIA", "TIPO", "CODIGO", "DESCRICAO", "MANUTENCAO"],
"CUSTOS_HEADER_KEYWORDS": ["Código da Composição", "Descrição", "Unidade"],
"SHEET_MAP": {
"ISD": ("precos", "NAO_DESONERADO"), "ICD": ("precos", "DESONERADO"),
"ISE": ("precos", "SEM_ENCARGOS"), "CSD": ("custos", "NAO_DESONERADO"),
"CCD": ("custos", "DESONERADO"), "CSE": ("custos", "SEM_ENCARGOS"),
},
"ID_COL_STANDARDIZE_MAP": {
"CODIGO_DO_INSUMO": "CODIGO", "DESCRICAO_DO_INSUMO": "DESCRICAO",
"CODIGO_DA_COMPOSICAO": "CODIGO", "DESCRICAO_DA_COMPOSICAO": "DESCRICAO",
},
"MANUTENCOES_COL_MAP": {
"REFERENCIA": "data_referencia", "TIPO": "tipo_item", "CODIGO": "item_codigo",
"DESCRICAO": "descricao_item", "MANUTENCAO": "tipo_manutencao",
},
"ORIGINAL_COLS": {
"TIPO_ITEM": "TIPO_ITEM", "CODIGO_COMPOSICAO": "CODIGO_DA_COMPOSICAO",
"CODIGO_ITEM": "CODIGO_DO_ITEM", "COEFICIENTE": "COEFICIENTE",
"DESCRICAO_ITEM": "DESCRICAO", "UNIDADE_ITEM": "UNIDADE",
},

"HEADER_SEARCH_LIMIT": 20,
"MANUTENCOES_SHEET_INDEX": 0,
"MANUTENCOES_DATE_FORMAT": "%m/%Y",
"COMPOSICAO_ITENS_HEADER_ROW": 9,
"PRECOS_HEADER_ROW": 9,
"CUSTOS_CODIGO_REGEX": r",(\d+)\)$",
"UNPIVOT_VALUE_PRECO": "preco_mediano",
"UNPIVOT_VALUE_CUSTO": "custo_total",
"FINAL_CATALOG_COLUMNS": {
"CODIGO": "codigo", "DESCRICAO": "descricao", "UNIDADE": "unidade"
},

# --- Constantes do Database ---
"DB_TABLE_INSUMOS": "insumos",
"DB_TABLE_COMPOSICOES": "composicoes",
"DB_TABLE_MANUTENCOES": "manutencoes_historico",
"DB_TABLE_COMPOSICAO_INSUMOS": "composicao_insumos",
"DB_TABLE_COMPOSICAO_SUBCOMPOSICOES": "composicao_subcomposicoes",
"DB_TABLE_PRECOS_INSUMOS": "precos_insumos_mensal",
"DB_TABLE_CUSTOS_COMPOSICOES": "custos_composicoes_mensal",
"ITEM_TYPE_INSUMO": "INSUMO",
"ITEM_TYPE_COMPOSICAO": "COMPOSICAO",
"DB_DIALECT": "postgresql",
"DB_TEMP_TABLE_PREFIX": "temp_",
"DB_DEFAULT_ITEM_STATUS": "ATIVO",
"DB_POLICY_REPLACE": "substituir",
}

REQUIRED_DB_KEYS = {"host", "port", "database", "user", "password"}
REQUIRED_SINAPI_KEYS = {"state", "month", "year", "type"}
OPTIONAL_SINAPI_KEYS = {"input_file"} # Arquivo XLSX local opcional

def __init__(
self, db_config: Dict[str, Any], sinapi_config: Dict[str, Any], mode: str
self, db_config: Dict[str, Any], sinapi_config: Dict[str, Any], mode: str, custom_constants: Dict[str, Any] = None
):
"""
Inicializa as configurações do AutoSINAPI.
Inicializa e valida todas as configurações do AutoSINAPI.

Args:
db_config: Configurações do banco de dados
sinapi_config: Configurações do SINAPI
mode: Modo de operação ('server' ou 'local')

Raises:
ConfigurationError: Se as configurações forem inválidas
db_config: Dicionário com as configurações do banco de dados.
sinapi_config: Dicionário com os parâmetros da extração SINAPI.
mode: Modo de operação ('server' ou 'local').
custom_constants: Dicionário opcional para sobrescrever as constantes padrão.
"""
# Valida e armazena configurações brutas
self._validate_db_config(db_config)
self._validate_sinapi_config(sinapi_config)
self.db_config = db_config
self.sinapi_config = sinapi_config

# Valida e define o modo de operação
self.mode = self._validate_mode(mode)
self.db_config = self._validate_db_config(db_config)
self.sinapi_config = self._validate_sinapi_config(sinapi_config)

# --- Expõe as configurações como atributos de alto nível ---
self.DOWNLOAD_DIR = "./downloads"
self.YEAR = sinapi_config["year"]
self.MONTH = sinapi_config["month"]
self.STATE = sinapi_config["state"]
self.TYPE = sinapi_config["type"]
self.DB_HOST = db_config["host"]
self.DB_PORT = db_config["port"]
self.DB_NAME = db_config["database"]
self.DB_USER = db_config["user"]
self.DB_PASSWORD = db_config["password"]

# --- Carrega as constantes (customizadas ou padrão) ---
# Isso permite que o usuário personalize nomes de tabelas, arquivos, etc.
constants = self.DEFAULT_CONSTANTS.copy()
if custom_constants:
constants.update(custom_constants)

for key, value in constants.items():
setattr(self, key, value)

def _validate_mode(self, mode: str) -> str:
"""Valida o modo de operação."""
if mode not in ("server", "local"):
raise ConfigurationError(f"Modo inválido: {mode}. Use 'server' ou 'local'")
return mode

def _validate_db_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Valida as configurações do banco de dados."""
missing = self.REQUIRED_DB_KEYS - set(config.keys())
if missing:
raise ConfigurationError(f"Configurações de banco ausentes: {missing}")
return config

def _validate_sinapi_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Valida as configurações do SINAPI."""
missing = self.REQUIRED_SINAPI_KEYS - set(config.keys())
if missing:
raise ConfigurationError(f"Configurações do SINAPI ausentes: {missing}")
return config

@property
def is_server_mode(self) -> bool:
"""Retorna True se o modo for 'server'."""
return self.mode == "server"

@property
def is_local_mode(self) -> bool:
"""Retorna True se o modo for 'local'."""
return self.mode == "local"
Loading
Loading