-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsetup.py
More file actions
90 lines (69 loc) · 2.39 KB
/
setup.py
File metadata and controls
90 lines (69 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# python libs
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# api lib
from api.comercio.models import Comercio
from api.processamento.models import Processamento
from api.exportacao.models import Exportacao
from api.importacao.models import Importacao
from api.producao.models import Producao
from api.config import settings
from api.database import Base
# data lib
from data import (
comercio_pipeline,
processamento_pipeline,
exportacao_pipeline,
importacao_pipeline,
producao_pipeline
)
# const
from const import (
comercio_url,
processamento_urls,
exportacao_urls,
importacao_urls,
producao_url
)
async def create_tables(engine):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
def run_pipe(pipeline, url_path):
return pipeline(url_path)
# Define async function for batch insertion into multiple tables
async def insert_data(session, data, table):
async with session.begin():
# Create the insert statement
insert_stmt = table.__table__.insert().values(data)
# Execute the insert statement
await session.execute(insert_stmt)
async def main():
engine = create_async_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
pool_recycle=3600
)
SessionLocal = sessionmaker(
bind=engine,
expire_on_commit=False,
future=True,
class_=AsyncSession
)
# create tables on db
await create_tables(engine)
comercio = comercio_pipeline(comercio_url).to_dict(orient='records')
processamento = processamento_pipeline(processamento_urls).to_dict(orient='records')
exportacao = exportacao_pipeline(exportacao_urls).to_dict(orient='records')
importacao = importacao_pipeline(importacao_urls).to_dict(orient='records')
producao = producao_pipeline(producao_url).to_dict(orient='records')
async with SessionLocal() as session:
await insert_data(session, comercio, Comercio)
await insert_data(session, processamento, Processamento)
await insert_data(session, exportacao, Exportacao)
await insert_data(session, importacao, Importacao)
await insert_data(session, producao, Producao)
await engine.dispose()
if __name__ == '__main__':
# Run the main function synchronously
asyncio.run(main())