-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpm_costcalcsetup.py
More file actions
executable file
·305 lines (254 loc) · 9.61 KB
/
pm_costcalcsetup.py
File metadata and controls
executable file
·305 lines (254 loc) · 9.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
"""
Ysaac Keith
Parking Meter Cost Calculation Set Up
UPDATED: 31 January 2017
ZTM file format:
[Column Content] - ["Required Label"]
- Parking Zone Number - "PZN"
- Tariff Number - "TFN"
"""
from datetime import timedelta
import numpy as np
import pandas as pd
from yklib import (
basics_filemanagement as bfm,
pm_calculator as cr,
time_range_mgmt as trm)
def collect_header_timeranges(df):
"""
Accept a dataframe and check if the header has time ranges
This is primarily for the full map and returns an array with
the time ranges as datetime object tuples along with their index
"""
time_ranges = trm.prevent_timerange_overlap(
[trm.convert_timerangestr(df.columns[i])
for i in range(len(df.columns))
if trm.is_time_range(df.columns[i])])
return [(k, time_ranges[j])
for j,k in enumerate(
[i for i in range(len(df.columns))
if trm.is_time_range(df.columns[i])])]
def convert_none_to_nan(df):
# Accept dataframe, convert instances of string "none" to NaN value
return df.replace({'none' : np.nan})
def create_full_map(zone_tariff_map, pricing_schema):
"""
Merge zone tariff map and pricing schema into single dataframe
for easier access when performing analysis
"""
return pd.merge(zone_tariff_map.loc[:, ['pzn','tfn']],
pricing_schema,
how='inner',
left_on='tfn',
right_on='tariff')
def cycle_through_report(full_map, map_ranges, report):
"""
Cycle through the report line by line, collect individual
series into an array, concatenate all arrays into a
dataframe, then return it
"""
series_lst = []
for i in range(len(report)):
print(len(series_lst))
series_lst.append(get_each_series(
full_map, map_ranges, report.loc[str(i),:]))
return pd.concat(series_lst, axis=1).T
def format_output_not_in_map(report_series):
# if parking zone not found, format with this method
dur = trm.format_dur_only_mins(
report_series.loc['parked to'].replace(second = 0) -
report_series.loc['parked from'].replace(second = 0))
return pd.Series([report_series.loc['plate'],
report_series.loc['zone'],np.nan,np.nan,
report_series.loc['parked from'].strftime(
'%m/%d/%Y %H:%M:%S'),
report_series.loc['parked to'].strftime(
'%m/%d/%Y %H:%M:%S'),
report_series.loc['parked from'].strftime('%A'), dur,
np.nan, np.nan, np.nan, np.nan, np.nan],index=['Plate',
'Zone Name', 'Tariff Number', 'Pricing Area',
'Parked From', 'Parked To', 'Day', 'Full Parking Duration',
'Max Time', 'Minutes Over Max Time',
'Full Time Over Max Time',
'Minutes Over Grace Period (10m:59s)', 'Cost'])
def format_output_sunday(map_series, report_series):
# if parking zone found, but report says it's sunday
dur = trm.format_dur_only_mins(
report_series.loc['parked to'].replace(second = 0) -
report_series.loc['parked from'].replace(second = 0))
return pd.Series(
[report_series.loc['plate'],
report_series.loc['zone'],
map_series.loc['tfn'],
map_series.loc['price_area'],
report_series.loc['parked from'].strftime(
'%m/%d/%Y %H:%M:%S'),
report_series.loc['parked to'].strftime(
'%m/%d/%Y %H:%M:%S'),
report_series.loc['parked from'].strftime('%A'), dur,
np.nan, np.nan, np.nan, np.nan, np.nan],index=['Plate',
'Zone Name', 'Tariff Number', 'Pricing Area',
'Parked From', 'Parked To', 'Day', 'Full Parking Duration',
'Max Time', 'Minutes Over Max Time',
'Full Time Over Max Time',
'Minutes Over Grace Period (10m:59s)', 'Cost'])
def format_output_with_cost(map_series, report_series, calc_series):
dur = trm.format_dur_only_mins(
report_series.loc['parked to'].replace(second = 0) -
report_series.loc['parked from'].replace(second = 0))
return pd.Series(
[report_series.loc['plate'],
report_series.loc['zone'],
map_series.loc['tfn'],
map_series.loc['price_area'],
report_series.loc['parked from'].strftime(
'%m/%d/%Y %H:%M:%S'),
report_series.loc['parked to'].strftime(
'%m/%d/%Y %H:%M:%S'),
report_series.loc['parked from'].strftime('%A'), dur,
(map_series.loc['sat max time']
if is_saturday(report_series)
else map_series.loc['max time']),
calc_series.loc['overtime minutes'],
calc_series.loc['full overtime'],
calc_series.loc['mins over grace period'],
calc_series.loc['cost']],
index=['Plate','Zone Name', 'Tariff Number',
'Pricing Area', 'Parked From', 'Parked To', 'Day',
'Full Parking Duration', 'Max Time',
'Minutes Over Max Time', 'Full Time Over Max Time',
'Minutes Over Grace Period (10m:59s)', 'Cost'])
def get_calc_series_sat(map_series, report_series):
# get duration during active time
covered_duration = trm.compare_time_tuples(
(report_series.loc['parked from'],
report_series.loc['parked to']),
map_series.loc['sat hours'])
c_d_only_mins = trm.compare_time_tuples(
(report_series.loc['parked from'].replace(second = 0),
report_series.loc['parked to'].replace(second = 0)),
map_series.loc['sat hours'])
grace_period = timedelta(minutes = 11)
s1 = cr.get_mins_over_max_time_sat(
map_series, c_d_only_mins)
s2 = cr.get_full_over_max_time_sat(
map_series, covered_duration)
s3 = cr.get_cost_sat(
map_series, c_d_only_mins)
s4 = cr.get_mins_over_grace_per_sat(
map_series, c_d_only_mins, grace_period)
return pd.Series({'overtime minutes':s1,
'full overtime': s2, 'cost':s3,
'mins over grace period':s4})
def get_calc_series_weekday(map_series, map_ranges, report_series):
# get time range from report series
report_range = (report_series.loc['parked from'],
report_series.loc['parked to'])
r_r_only_mins = (
report_series.loc['parked from'].replace(second = 0),
report_series.loc['parked to'].replace(second = 0))
grace_period = timedelta(minutes = 11)
s1 = cr.get_mins_over_max_time_weekday(
map_series, map_ranges, r_r_only_mins)
s2 = cr.get_full_over_max_time_weekday(
map_series, map_ranges, report_range)
s3 = cr.get_cost_weekday(
map_series, map_ranges, r_r_only_mins)
s4 = cr.get_mins_over_grace_per_weekday(
map_series, map_ranges, r_r_only_mins, grace_period)
return pd.Series({'overtime minutes':s1,
'full overtime':s2, 'cost':s3,
'mins over grace period':s4})
def get_each_series(full_map, map_ranges, report_series):
# Check whether report line is in full map, return series
if is_zone_in_full_map(full_map, report_series):
# if zone found in full map, collect map series
map_series = full_map[full_map['pzn'].isin(
[report_series.loc['zone']])].iloc[0]
if is_sunday(report_series):
return format_output_sunday(map_series, report_series)
else:
if is_saturday(report_series):
return format_output_with_cost(
map_series,
report_series,
get_calc_series_sat(map_series, report_series))
else:
return format_output_with_cost(
map_series,
report_series,
get_calc_series_weekday(
map_series, map_ranges, report_series))
else:
return format_output_not_in_map(report_series)
def go(zone_tariff_map, pricing_schema, report):
# Starting point for top-level code to launch calculator
# create full map and collect time ranges
full_map = create_full_map(zone_tariff_map, pricing_schema)
full_map_ranges = collect_header_timeranges(full_map)
return cycle_through_report(full_map, full_map_ranges, report)
def is_saturday(report_series):
return trm.determine_day(report_series.loc['parked to']) == 5
def is_sunday(report_series):
return trm.determine_day(report_series.loc['parked to']) == 6
def is_zone_in_full_map(full_map, report_series):
# Accept zone name, check if in full map
return report_series.loc['zone'] in full_map['pzn'].values
def lower_dfstrings(df):
"""
Accept dataframe, find columns with
strings, lower all the strings in those columns
"""
for i in df.columns:
if df[i].dtype == 'O':
for j in range(len(df[i])):
if type(df.loc[str(j), i]) == str:
df.loc[str(j), i] = df.loc[str(j), i].lower()
return df
def lower_headercols(df):
# Accept a dataframe, ensure column labels are lowercase
for i in df.columns:
df = df.rename(index = str, columns = {i : i.lower()})
return df
def prepare_report(report_path):
"""
Read report csv/excel document into a dataframe
Clean dataframe content by lowering header and convert
all timestamp strings to datetime objects
"""
report_df = bfm.open_csv_excel_to_df(report_path)
return trm.convert_dftimestamps(
remove_1s(
lower_headercols(report_df)))
def prepare_schema(schema_path):
"""
Read schema csv/excel document into dataframe
Prepare dataframe for use by converting header to lowercase,
change "Blocked" to lowercase, change "none" to NaN
Convert all time range strings to datetime object tuples
"""
schema_df = bfm.open_csv_excel_to_df(schema_path)
schema_df = convert_none_to_nan(
lower_dfstrings(
lower_headercols(schema_df)))
return trm.convert_dftimeranges(schema_df)
def prepare_zone_tariff_map(map_path):
"""
Read zone tariff map csv/excel document into dataframe
Clean dataframe content by lowering header and all other strings
Ensure that the pzn column is string-based
"""
map_df = bfm.open_csv_excel_to_df(map_path)
map_df = lower_dfstrings(lower_headercols(map_df))
map_df['pzn'] = map_df['pzn'].astype(str)
return map_df
def remove_1s(df):
# Accept dataframe and remove '_1' so labels are clean
new_labels = []
for i in df.columns:
if '_1' in i:
new_labels.append(i.replace('_1', ''))
else:
new_labels.append(i)
df.columns = new_labels
return df