-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPostgresOperator.py
More file actions
56 lines (43 loc) · 1.41 KB
/
PostgresOperator.py
File metadata and controls
56 lines (43 loc) · 1.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# How to import parameters into the PostgreSQL
from airflow.models import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import date, datetime, timedelta
from airflow.operators.python import PythonOperator
class CustomPostgresOperator(PostgresOperator):
template_fields = ('sql', 'parameters')
# Variables
dt = (datetime.now() + timedelta(days=0)).strftime("%Y-%m-%d")
def _task():
return dt
default_args = {
'owner': 'Ximeng',
'depends_on_past': False,
'start_date': datetime(2021, 8, 9),
'email': ['xxx@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=0.5)
}
with DAG('my_postgres_dag', default_args=default_args, schedule_interval='@daily', catchup=False) as dag:
create_table = PostgresOperator(
task_id = "create_table",
postgres_conn_id = "postgres",
sql = "sql/create_my_table.sql"
)
my_task = PythonOperator(
task_id = "my_task",
python_callable = _task
)
store = CustomPostgresOperator(
task_id = "store",
postgres_conn_id = "postgres",
sql = [
"sql/insert_into_my_table.sql",
"SELECT * FROM my_table"
],
parameters = {
'filename': '{{ti.xcom_pull(task_ids=["my_task"])[0]}}'
}
)
create_table >> my_task >> store