-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPython_Operator.py
More file actions
22 lines (18 loc) · 940 Bytes
/
Python_Operator.py
File metadata and controls
22 lines (18 loc) · 940 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow.operators.python_operator import PythonOperator
def download_rates():
with open('/usr/local/airflow/dags/files/forex_currencies.csv') as forex_currencies:
reader = csv.DictReader(forex_currencies, delimiter=';')
for row in reader:
base = row['base']
with_pairs = row['with_pairs'].split(' ')
indata = requests.get('https://api.exchangeratesapi.io/latest?base=' + base).json()
outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
for pair in with_pairs:
outdata['rates'][pair] = indata['rates'][pair]
with open('/usr/local/airflow/dags/files/forex_rates.json', 'a') as outfile:
json.dump(outdata, outfile)
outfile.write('\n')
downloading_rates = PythonOperator(
task_id="downloading_rates",
python_callable=download_rates
)