-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNotes
More file actions
119 lines (96 loc) · 5.1 KB
/
Notes
File metadata and controls
119 lines (96 loc) · 5.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
1. depends_on_past && wait_for_downstream
depends_on_past: (boolean) when set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded. only apply for a single task
Example: if you set epends_on_past=True in the task C, then Task C in the Dag run2 will only run when Task C in the Dag run1 succeed or skipped.
Dag run1: A-> B-> C
Dag run2: A-> B-> C
wait_for_downstream: (boolean) when set to True, it set depends_on_past=True by default. will prevent downstream tasks of the current dagrun from running if the upstream task instances of previous dagrun are not in the success state.
Example:if you set wait_for_downstream=true in the Task A, then Task A in the Dag run2 will not run until the Task A and its downstream Task B in the Dag run1 has succeed or skipped
Dag run1: A-> B-> C
Dag run2: A-> B-> C
2. Inside the docker web-server container, you can turn on/off the toggle to pause & unpause the dags
command: airflow dags pause <dag_id>
command: airflow dags unpause <dag_id>
3. Trigger your dag in the docker container
command: airflow dags trigger -e
4. List your dags
command: airflow dags list
5. List tasks in airflow Dags:
command: airflow tasks list example_dag
6. Test your dags
If you add a new task there, you can run below command to test
command: airflow tasks test example_dag bash_print_date1 2021-01-01
7. Rerun your past dag runs (backfill)
command: airflow dags backfill -s 2021-01-01 -e 2021-01-05 --reset-dagruns example_dag
8. Pool
definition: the pool allows you to limit concurrency for a set of tasks by default. With Airflow, you can run up to 32 tasks at the same time.
use case: when some specific tasks that consuming running very long time. You might want to limit concurrency for those tasks to 1 to execute them sequentially, one after the other.
Task running = 1 worker slot taken
All tasks go to default_pool (128 slots), you can find it from Admin -> pool
How to use it:
If you want to run some big tasks one by one, then go to set up a new pool. Set the slots to be 1, which means all tasks will take up only one slots (run 1 by 1). Add pool pararmeter into the Operator such as pool = "<pool name>"
9. Use cross_downstream to solve the task dependencies between two lists
Airflow doesn't allow: such as [A,B] >> [C,D,E] >> F
So you need to import:
from airflow.utils.helpers import cross_downstream
cross_downstream([A,B], [C,D,E])
[C,D,E] >> F
10. Task priority:
You want to prioritize one task to go first. Add priority_weight parameter into your operator. But make sure all tasks should be in the same pool. The default value of priority_weight is 1.
Example:
process_a = BashOperator(
task_id = 'process_a',
bash_command="echo '{{ ti.try_number }}' && sleep 20",
priority_weight=2,
pool="process_tasks"
)
process_b = BashOperator(
task_id = 'process_b',
bash_command="echo '{{ ti.try_number }}' && sleep 20",
pool="process_tasks"
)
process_c = BashOperator(
task_id = 'process_c',
bash_command="echo '{{ ti.try_number }}' && sleep 20",
priority_weight=3,
pool="process_tasks"
)
You might have different numbers but the point is, the priority_weights should be set as follow:
process_c > process_a > process_b
Notice that by default, process_b has a priority_weight equals to 1 so no need to specify it here.
11. Trigger_rules
- all_success
Example: [t1, t2] >> t3
Explain: If add rule to t3, then t3 will be triggered only when t1 and t2 are succcessful. If t2 is skipped, then t3 will also be skipped.
- all_failed
Explain: All upstreaming tasks failed then triggered otherwise skip.
- all_done
Explain: not care the upstreaming tasks state just trigger to run when upstreaming tasks are done
- one_failed
- one_success: one_success triggers your task as soon as one parent succeeds. it does not wait for all parents to complete.
- non_failed
Example: [t1, t2] >> t3
Explain: if t1 and t2 are skipped, then t3 trigger.
- non_failed_or_skipped
Example: [t1, t2] >> t3
Explain: if t1 and t2 are skipped, then t3 skip.
- non_skipped
12. Timeout
# Add this to the operator. If the task runs over xxx seconds, then it will mark as failure.
execution_timeout = timedelta(seconds = 12)
13 Providers
After Airflow version > 2.0, you can go to registry.astronomer.io to search the provider you want, for example search s3keysensor, you wll be told how to install Amazon dependencies.
14 Check installed providers in Airflow
Copy scheduler container id, then type docker exec <scheduler container id> airflow info
15 How to install a new provider
1. Search the provider name and version from registry.astronomer.io
2. Create a requirements.txt file, and put the provider name and version in it, such as airflow-provder-great-expectations==0.0.6
3. Build your own docker image. At the same path of docker-compose.yml, create a Dockerfile.
FROM apache/airflow:2.1.0
COPY requirements.txt .
RUN pip install -r requirements.txt
4. Go back to docker-compose.yml, instead of image: apache/airflow:2.1.0, change to
version: '3'
x-airflow-common:
&airflow-common
build:
context: .