-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathETL.py
More file actions
89 lines (71 loc) · 2.67 KB
/
ETL.py
File metadata and controls
89 lines (71 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import glob #this module helps in selecting files
import pandas as pd #this module helps in processing CSV files
import xml.etree.ElementTree as ET
from datetime import datetime
from zipfile import ZipFile #module helps in extracting files
#extract in current directory
with ZipFile("datasource.zip","r") as f:
f.extractall()
#Set Paths
tmpfile = "temp.tmp" # file used to store all extracted data
logfile = "logfile.txt" # all event logs will be stored in this file
targetfile = "transformed_data.csv" # file where transformed data is stored
# Extract
#CSV Extract function
def extract_from_csv(file_to_extract):
data = pd.read_csv(file_to_extract)
return data
#Json Extract Function
def extract_from_json(file_to_extract):
data = pd.read_json(file_to_extract,line = True)
return data
#XML Extract Function
def extract_from_xml(file_to_extract):
data = pd.DataFrame(columns=['car_model', 'year_of_manufacture','price', 'fuel'])
tree = ET.parse(file_to_extract)
root = tree.getroot()
for x in root:
model = x.find('car_model').text
year = int(x.find('year_of_manufacture').text)
price = float(x.find('price').text)
fule = x.find('fuel').text
data = data.append({'car_model':model, 'year_of_manufacture':year,'price':price, 'fuel':fuel})
return data
#Extract Function
def extract():
data = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel']) # create an empty data frame to hold extracted data
#process all csv files
for csvfile in glob.glob("dealership_data/*.csv"):
data = data.append(extract_from_csv(csvfile), ignore_index=True)
#process all json files
for jsonfile in glob.glob("dealership_data/*.json"):
data = data.append(extract_from_json(jsonfile), ignore_index=True)
#process all xml files
for xmlfile in glob.glob("dealership_data/*.xml"):
data = data.append(extract_from_xml(xmlfile), ignore_index=True)
return data
#Transform
def transform(data):
data['price']=round(data.price,2)
return(data)
#Loading
def load(targetfile,file_to_load):
file_to_load.to_csv(targetfile)
#Logging
def log(message):
timestamp_format = "%Y-%h-%d-%H:%M:%S"
now = datetime.now()
timestamp = now.strftime(timestamp_format)
with open("logfile.txt","a") as f:
f.write(timestamp + ',' + message + '\n')
log("ETL Job Started")
log("Extract phase Started")
data = extract()
log("Extract phase Ended")
log("Transform phase Started")
transformed_data = transform(data)
log("Transform phase Ended")
log("Load phase Started")
load(targetfile,transformed_data)
log("Load phase Ended")
log("ETL Job Ended")