-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhook_dag.py
More file actions
35 lines (29 loc) · 1.42 KB
/
hook_dag.py
File metadata and controls
35 lines (29 loc) · 1.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depend_on_past': False,
'start_date': datetime(2018, 11, 5, 10, 00, 00),
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
def get_activated_sources():
request = "SELECT * FROM course.source"
pg_hook = PostgresHook(postgres_conn_id="postgre_sql", schema="airflow_mdb") # This connection must be set from the Connection view in Airflow UI
connection = pg_hook.get_conn() # Gets the connection from PostgreHook
cursor = connection.cursor() # Gets a cursor to the postgreSQL database. Look at https://www.postgresql.org/docs/9.2/plpgsql-cursors.html for more information
cursor.execute(request) # Executes the request
sources = cursor.fetchall() # Fetchs all the data from the executed request
for source in sources: # Does a simple print of each source to show that the hook works well. More on the next lesson
print("Source: {0} - activated: {1}".format(source[0], source[1]))
return sources
with DAG('hook_dag',
default_args=default_args,
schedule_interval='@once',
catchup=False) as dag:
start_task = DummyOperator(task_id='start_task')
hook_task = PythonOperator(task_id='hook_task', python_callable=get_activated_sources)
start_task >> hook_task