diff --git a/meatools/meatools.egg-info/PKG-INFO b/meatools/meatools.egg-info/PKG-INFO new file mode 100644 index 0000000..916d23a --- /dev/null +++ b/meatools/meatools.egg-info/PKG-INFO @@ -0,0 +1,9 @@ +Metadata-Version: 2.1 +Name: meatools +Version: 0.1.0 +Author: Jiangyuan John Feng, Shicheng Xu +Requires: python +Requires: numpy +Requires: cantera +Requires: matplotlib +Requires: scipy diff --git a/meatools/meatools.egg-info/SOURCES.txt b/meatools/meatools.egg-info/SOURCES.txt new file mode 100644 index 0000000..daba9f8 --- /dev/null +++ b/meatools/meatools.egg-info/SOURCES.txt @@ -0,0 +1,20 @@ +setup.py +meatools/__init__.py +meatools/cli.py +meatools/ecsa_dry.py +meatools/ecsa_normal.py +meatools/impedence_calc.py +meatools/lsv.py +meatools/test_squence.py +meatools.egg-info/PKG-INFO +meatools.egg-info/SOURCES.txt +meatools.egg-info/dependency_links.txt +meatools.egg-info/entry_points.txt +meatools.egg-info/top_level.txt +meatools/subcomands/__init__.py +meatools/subcomands/ecsa_dry.py +meatools/subcomands/ecsa_normal.py +meatools/subcomands/impedence_calc.py +meatools/subcomands/lsv.py +meatools/subcomands/mea_proccess.py +meatools/subcomands/test_squence.py \ No newline at end of file diff --git a/meatools/meatools.egg-info/dependency_links.txt b/meatools/meatools.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/meatools/meatools.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/meatools/meatools.egg-info/entry_points.txt b/meatools/meatools.egg-info/entry_points.txt new file mode 100644 index 0000000..796a7b9 --- /dev/null +++ b/meatools/meatools.egg-info/entry_points.txt @@ -0,0 +1,2 @@ +[console_scripts] +mea = meatools.cli:main diff --git a/meatools/meatools.egg-info/top_level.txt b/meatools/meatools.egg-info/top_level.txt new file mode 100644 index 0000000..4cef0b2 --- /dev/null +++ b/meatools/meatools.egg-info/top_level.txt @@ -0,0 +1 @@ +meatools diff --git a/meatools/meatools/__init__.py b/meatools/meatools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/meatools/meatools/__pycache__/__init__.cpython-312.pyc b/meatools/meatools/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..18db2a3 Binary files /dev/null and b/meatools/meatools/__pycache__/__init__.cpython-312.pyc differ diff --git a/meatools/meatools/__pycache__/cli.cpython-312.pyc b/meatools/meatools/__pycache__/cli.cpython-312.pyc new file mode 100644 index 0000000..0605fed Binary files /dev/null and b/meatools/meatools/__pycache__/cli.cpython-312.pyc differ diff --git a/meatools/meatools/cli.py b/meatools/meatools/cli.py new file mode 100644 index 0000000..d914893 --- /dev/null +++ b/meatools/meatools/cli.py @@ -0,0 +1,38 @@ +import argparse +from .subcomands import mea_proccess + +def main(): + parser=argparse.ArgumentParser(prog="mea", description="MEATOOLs CLI Toolkit") + subparsers=parser.add_subparsers(dest="command") + + lte_parser=subparsers.add_parser("ttseq",help='Run test sequence analysis') + lte_parser.set_defaults(func=mea_proccess.run_test_sequence) + + lte_parser=subparsers.add_parser("otr",help='Run OTR/Impedance analysis') + lte_parser.set_defaults(func=mea_proccess.run_otr) + + lte_parser=subparsers.add_parser("ecsa",help='Run ECSA analysis') + lte_parser.set_defaults(func=mea_proccess.run_ecsa) + + lte_parser=subparsers.add_parser("ecsadry",help='Run ECSA-dry analysis') + lte_parser.set_defaults(func=mea_proccess.run_ecsa_dry) + + lte_parser=subparsers.add_parser("lsv",help='Run LSV analysis') + lte_parser.set_defaults(func=mea_proccess.run_lsv) + + lte_parser=subparsers.add_parser("conclude",help='Extract key value from results directory') + lte_parser.set_defaults(func=mea_proccess.run_conclude) + + lte_parser=subparsers.add_parser("all",help='Run all analysis') + lte_parser.set_defaults(func=mea_proccess.run_all) + + + lte_parser=subparsers.add_parser("eis",help='Run EIS analysis') + lte_parser.set_defaults(func=mea_proccess.run_eis) + + + args=parser.parse_args() + if hasattr(args,'func'): + args.func(args) + else: + parser.print_help() \ No newline at end of file diff --git a/meatools/meatools/ecsa_dry.py b/meatools/meatools/ecsa_dry.py new file mode 100644 index 0000000..eb9f7d4 --- /dev/null +++ b/meatools/meatools/ecsa_dry.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python +import numpy as np +import matplotlib.pyplot as plt +import re,os,json +import os +from scipy import interpolate +from scipy.stats import mstats +from glob import glob +from datetime import datetime +from pathlib import Path + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + +def find_and_sort_load_dta_files(root_folder='.'): + files = glob(os.path.join(root_folder, '**', '*cv*.DTA'), recursive=True) + file_info = [(os.path.getmtime(f), f) for f in files] + file_info.sort(reverse=False) + return file_info + +def find_cv_subfolders(root_dir): + root_path = Path(root_dir) + csv_folders = {str(p.parent.relative_to(root_path)) + for p in root_path.glob('**/'+searchKey)} + log.write("Subfolders containing CV DTA files:\n") + csv_folders=sorted(csv_folders) + for folder in csv_folders: + log.write(f"- {folder}\n" if folder != '.' else "- (root directory)\n") + return csv_folders + +def plot_COtripping(file_info): + data_dump={} + for i, (mtime, filepath) in enumerate(file_info, 1): + dump={} + readable_time = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S') + dump["time_stamp"]=readable_time + dump["file"]=filepath + + with open(filepath, 'r') as f: + for _ in range(65): + next(f) + content_from_line65 = f.read() + + u = re.split('CURVE', content_from_line65) + oldUpper = x_CO*0 + + for j in range(len(u)): + with open('temp', 'w') as fileID: + fileID.write(u[j]) + A = np.loadtxt('temp', skiprows=2, usecols=range(8)) + plt.subplot(2,3,j+1) + plt.plot(A[:,2],A[:,3]) + if j>0 and j<(len(u)-1): + scanRate=np.median(np.abs(np.diff(A[:,2])/np.diff(A[:,1]))) + # print('rate='+str(scanRate)+' V/s') + # print('Vmin='+str(np.min(A[:,2]))+' V' + + CVall = A[:, [2, 3]] + startV = CVall[0, 0] + updData = CVall[:, :2] + mask1 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + double1 = updData[mask1, :] + mask2 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & (np.concatenate(([0], np.diff(updData[:, 0]))) < 0) + double2 = updData[mask2, :] + + double1 = double1[np.argsort(double1[:, 0])] + double2 = double2[np.argsort(double2[:, 0])] + double1 = double1[np.concatenate(([True], np.diff(double1[:, 0]) != 0)), :] + double2 = double2[np.concatenate(([True], np.diff(double2[:, 0]) != 0)), :] + + # CO stripping area + maskCO = (updData[:, 0] > 0.5) & (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + topLimits = updData[maskCO, :] + fCO = interpolate.interp1d(topLimits[:, 0], topLimits[:, 1], bounds_error=False) + newUpper = fCO(x_CO) + plt.subplot(2,3,1) + plt.plot(x_CO,newUpper) + oldUpper =np.maximum(oldUpper,newUpper) + + # Interpolation + x_new = np.arange(0.35, 0.451, 0.001) + f1 = interpolate.interp1d(double1[:, 0], double1[:, 1], bounds_error=False) + double1_interp = f1(x_new) + f2 = interpolate.interp1d(double2[:, 0], double2[:, 1], bounds_error=False) + double2_interp = f2(x_new) + + ddouble = np.abs(double1_interp - double2_interp) + ddouble = ddouble[~np.isnan(ddouble)] + doubleMean = np.median(ddouble) + # print(f"dd: {doubleMean}") + + # Process updData + updData = updData[np.concatenate(([True], np.diff(updData[:, 0]) > 0)), :] + base = mstats.mquantiles(updData[(updData[:, 0] > 0.4) & (updData[:, 0] < 0.6), 1], 0.25) + updData[:, 1] = updData[:, 1] - base + updData = updData[updData[:, 1] > 0, :] + updData = updData[(updData[:, 0] <= 0.4) & (updData[:, 0] > ECAcutoff), :] + updData = updData[np.argsort(updData[:, 0]), :] + + area = np.sum(np.diff(updData[:, 0]) * updData[:-1, 1]) # in mAV + QH = area / scanRate + ECA = QH / 2.1e-4 + # print(f"ECA: {ECA}") + dump[f"curve_{str(j)}"]={} + dump[f"curve_{str(j)}"]["rate (V/s)"]=scanRate + dump[f"curve_{str(j)}"]["Vmin (V)"]=np.min(A[:,2]) + dump[f"curve_{str(j)}"]["dd"]=doubleMean + dump[f"curve_{str(j)}"]["ECA"]=ECA + + elif j==0: + CVall = A[:, [2, 3]] + updData = CVall[:, :2] + maskCO = (updData[:, 0] > 0.5) & (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + topLimits = updData[maskCO, :] + fCO = interpolate.interp1d(topLimits[:, 0], topLimits[:, 1], bounds_error=False) + COdesorb = fCO(x_CO) + if os.path.exists('temp'): + os.remove('temp') + data_dump[f"file_{str(i)}"]=dump + + return oldUpper, COdesorb, data_dump + + +if __name__=="__main__": + os.makedirs('logs',exist_ok=True) + os.makedirs('results/ecsa_dry/',exist_ok=True) + log=open('logs/ecsa_normal.log','w') + index=1 + results={} + + searchKey='Cathode CO*/*cv*.DTA' + ECAcutoff = 0.08 + x_CO = np.arange(0.5, 0.95, 0.001) + u_subfolders=find_cv_subfolders('.') + + for i, folderName in enumerate(u_subfolders,start=1): + results[f"dir_{str(i)}"]={} + + plt.figure(index) + index+=1 + file_info = find_and_sort_load_dta_files(u_subfolders[i-1]) + [oldUpper,COdesorb,data]=plot_COtripping(file_info) + plt.savefig(f'results/ecsa_dry/ECSA_Dry_{i}-1.png') + + plt.figure(index) + index+=1 + plt.plot(x_CO,oldUpper,'r--') + plt.plot(x_CO,COdesorb,'b-') + COECA=np.nansum( np.mean(np.diff(x_CO))*(COdesorb-oldUpper)) + plt.savefig(f'results/ecsa_dry/ECSA_Dry_{i}-2.png') + + results[f"dir_{str(i)}"]["data"]=data + results[f"dir_{str(i)}"]["COECA"]=COECA + with open('results/ecsa_dry/ecsa_results.json','w') as results_file: + json.dump(results,results_file,indent=2,cls=NumpyEncoder) + + log.close() \ No newline at end of file diff --git a/meatools/meatools/ecsa_normal.py b/meatools/meatools/ecsa_normal.py new file mode 100644 index 0000000..514eadf --- /dev/null +++ b/meatools/meatools/ecsa_normal.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python +import numpy as np +import matplotlib.pyplot as plt +import re,os,json +from scipy import interpolate +from scipy.stats import mstats +from datetime import datetime +from pathlib import Path +from collections import Counter +from glob import glob + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + +def find_and_sort_load_dta_files(root_folder): + files = glob(os.path.join(root_folder, '**', searchKey), recursive=True) + file_info = [(os.path.getmtime(f), f) for f in files] + file_info.sort(reverse=False) + return file_info + +def find_cv_subfolders(root_dir='/ECSA'): + root_path = Path(root_dir).resolve() + csv_folders = {str(p.parent) for p in root_path.glob('**/' + searchKey)} + csv_folders = sorted(csv_folders) + log.write("Subfolders containing CV DTA files:\n") + for folder in csv_folders: + log.write(f"\t{folder}\n") + log.write("**" * 80+'\n') + return csv_folders + +def process_curve_data(A, ECAcutoff): + scanRate = np.median(np.abs(np.diff(A[:, 2]) / np.diff(A[:, 1]))) + CVall = A[:, [2, 3]] + updData = CVall[:, :2] + + # extract double1&double2 + mask1 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & \ + (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + double1 = updData[mask1, :] + + mask2 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & \ + (np.concatenate(([0], np.diff(updData[:, 0]))) < 0) + double2 = updData[mask2, :] + + # ranking + double1 = double1[np.argsort(double1[:, 0])] + double2 = double2[np.argsort(double2[:, 0])] + double1 = double1[np.concatenate(([True], np.diff(double1[:, 0]) != 0)), :] + double2 = double2[np.concatenate(([True], np.diff(double2[:, 0]) != 0)), :] + + # Interpolation + x_new = np.arange(0.35, 0.451, 0.001) + f1 = interpolate.interp1d(double1[:, 0], double1[:, 1], bounds_error=False) + double1_interp = f1(x_new) + f2 = interpolate.interp1d(double2[:, 0], double2[:, 1], bounds_error=False) + double2_interp = f2(x_new) + + ddouble = np.abs(double1_interp - double2_interp) + ddouble = ddouble[~np.isnan(ddouble)] + doubleMean = np.median(ddouble) + + + # calculating ECSA + updData = updData[np.concatenate(([True], np.diff(updData[:, 0]) > 0)), :] + base = mstats.mquantiles(updData[(updData[:, 0] > 0.4) & (updData[:, 0] < 0.6), 1], 0.25) + updData[:, 1] -= base + updData = updData[updData[:, 1] > 0, :] + updData = updData[(updData[:, 0] <= 0.4) & (updData[:, 0] > ECAcutoff), :] + updData = updData[np.argsort(updData[:, 0]), :] + + area = np.sum(np.diff(updData[:, 0]) * updData[:-1, 1]) # in mAV + QH = area / scanRate + ECA = QH / 2.1e-4 + + # print(f'rate={scanRate} V/s') + # print(f'Vmin={np.min(A[:, 2])} V') + # print(f"dd: {doubleMean}") + # print(f"ECA: {ECA}") + + dump={ + "rate (V/s)":scanRate, + "Vmin (V)":np.min(A[:, 2]), + "dd":doubleMean, + "ECA":ECA + } + + return dump + +def parse_dta_format1(filepath, ECAcutoff): + """running when len(u) < 2""" + + with open(filepath, 'r') as file: + lines = file.readlines() + + # Find the line where CURVE1 starts + curve_start = next((i for i, line in enumerate(lines) if line.strip().startswith('CURVE')), None) + data_lines = lines[curve_start + 3:] + data = [line.split()[-1] for line in data_lines if line.strip() and len(line.split()) >= 10] + + value_counts = Counter(data) + log.write("\nValue counts in the last column:\n") + for value, count in value_counts.items(): + log.write(f"Value {value}: {count} occurrences\n") + + data_dump={} + for value in value_counts.keys(): + if 0 < float(value) < len(value_counts) - 1: + with open('temp', 'w') as f: + for line in data_lines: + if line.strip() and len(line.split()) >= 10 and line.split()[-1] == value: + f.write(line.strip() + '\n') + + A = np.loadtxt('temp', skiprows=2, usecols=range(8)) + plt.subplot(2, 3, int(value)) + plt.plot(A[:, 2], A[:, 3]) + dump=process_curve_data(A, ECAcutoff) + data_dump[f"curve_{str(value)}"]=dump + os.remove('temp') + data_dump["file_path"]=filepath + return data_dump + +def parse_dta_format2(filepath, ECAcutoff): + """running when len(u) >= 2 """ + with open(filepath, 'r') as f: + for _ in range(65): + next(f) + content_from_line65 = f.read() + + u = re.split('CURVE', content_from_line65) + + data_dump={} + for j in range(len(u)): + with open('temp', 'w') as fileID: + fileID.write(u[j]) + A = np.loadtxt('temp', skiprows=2, usecols=range(8)) + plt.subplot(2, 3, j + 1) + plt.plot(A[:, 2], A[:, 3]) + if 0 < j < len(u) - 1: + dump=process_curve_data(A, ECAcutoff) + data_dump[f"curve_{str(j)}"]=dump + os.remove('temp') + data_dump["file_path"]=filepath + return data_dump + +if __name__ == "__main__": + ECAcutoff = 0.08 + searchKey = '*cv*.DTA' + os.makedirs('logs',exist_ok=True) + os.makedirs('results/ecsa_normal/',exist_ok=True) + log=open('logs/ecsa_normal.log','w') + + u_subfolders = find_cv_subfolders('./ECSA/') + results={} + for jk, file_path in enumerate(u_subfolders,start=1): + file_info = find_and_sort_load_dta_files(file_path) + plt.figure(jk) + results[f"dir_{str(jk)}"]={} + + for i, (mtime, filepath) in enumerate(file_info, 1): + with open(filepath, 'r') as f: + for _ in range(65): + next(f) + content_from_line65 = f.read() + + if len(re.split('CURVE', content_from_line65)) < 2: + log.write('different format'+str(len(re.split('CURVE', content_from_line65)))) + data_dump=parse_dta_format1(filepath, ECAcutoff) + else: + data_dump=parse_dta_format2(filepath, ECAcutoff) + + readable_time = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S') + results_jk_i={ + "time_stamp":readable_time, + "data":data_dump + } + results[f"dir_{str(jk)}"][f"file_{str(i)}"]=results_jk_i + plt.savefig(f'results/ecsa_normal/ECSA_{jk}.png') + + + with open('results/ecsa_normal/ecsa_results.json','w') as results_file: + json.dump(results,results_file,indent=2,cls=NumpyEncoder) + log.close() \ No newline at end of file diff --git a/meatools/meatools/impedence_calc.py b/meatools/meatools/impedence_calc.py new file mode 100644 index 0000000..c64ff16 --- /dev/null +++ b/meatools/meatools/impedence_calc.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python +import numpy as np +from test_squence import extract_data_from_file as edf +import os,re,json +from collections import defaultdict +import cantera as ct +from scipy.optimize import curve_fit +import matplotlib.pyplot as plt + +class r_total_calc(): + def __init__(self,root_path,temp=80): + self.root=os.path.abspath(root_path) + self.temp=temp+273.15 + + + def parse_data_title(self): + results=defaultdict(lambda:defaultdict(dict)) + for o2_dir in os.listdir(self.root): + if o2_dir.startswith('._'): + continue + o2_path=os.path.join(self.root,o2_dir) + if os.path.isdir(o2_path): + match=re.search(r"(\d+(?:\.\d+)?)%O2",o2_path) + if match: + o2_fraction=float(match.group(1))/100 + for file in os.listdir(o2_path): + if file.startswith('._') or not file.endswith('.csv'): + continue + match=re.search(r"(\d+)\s*kPa[a]?",file,re.IGNORECASE) + if match: + pressure=match.group(1) + filepath=os.path.join(o2_path,file) + results[o2_fraction][pressure]['data']={} + results[o2_fraction][pressure]['file_path']=filepath + + self.init_results=self.recursive_to_dict(results) + + return self.init_results + + def parse_data(self,long_out=False): + for conc,info in self.init_results.items(): + for pressure,data in info.items(): + file_path=data['file_path'] + results=edf(file_path) + data['data']['current']=list(results['data']['current']) + data['data']['current_density']=list( + np.array(results['data']['current'])/ + np.array(results['data']['cell_active_area']) + ) + o_conc,dry=self.concentration_calc(self.temp,float(pressure),float(conc)) + data['data']['o_concentration']=o_conc + data['data']['dry_pressure']=dry + if long_out: + with open('results/impedence/raw_data.json','w') as f: + json.dump(self.init_results,f,indent=2) + f.close() + + return self.init_results + + def r_calc(self,fit_plot=False): + results=defaultdict(lambda:defaultdict(list)) + for conc,info in self.init_results.items(): + for pressure,data in info.items(): + cd=np.array(data['data']['current_density']) + cut_off=int(len(cd)*0.8) + cd_avg=np.mean(cd[:cut_off]) + results[pressure]['current_density'].append(cd_avg) + results[pressure]['o_concentration'].append(data['data']['o_concentration']) + results[pressure]['dry_pressure'].append(data['data']['dry_pressure']) + # print(results) + for pressure,v in results.items(): + o_conc=v['o_concentration'] + cur_d=v['current_density'] + fitted=self.fitting(o_conc,cur_d,prefix=pressure,plot=fit_plot) + r_total=4*96485/1000/fitted['a'] + results[pressure]['r_total'].append(r_total) + with open('results/impedence/fitted_r_total.json','w') as f: + json.dump(results,f,indent=2) + f.close() + + self.fitted_results=results + + return self.fitted_results + + def run_calc(self,long_out=False,fit_plot=False): + self.parse_data_title() + self.parse_data(long_out) + self.r_calc(fit_plot) + pressures,rs_total=[],[] + for pressure,data in self.fitted_results.items(): + pressures.append(data['dry_pressure'][0]) + rs_total.extend(data['r_total']) + results=self.fitting(pressures,rs_total,prefix='final',plot=fit_plot) + r_diff=(101*results['a']+results['b'])*100 + r_other=101*results['b'] + results["r_diff (s m^-1)"]=r_diff + results["r_other (s m^-1)"]=r_other + results=self.recursive_to_dict(results) + print(results) + with open('results/impedence/final_results.json','w') as f: + json.dump(results,f,indent=2) + f.close() + + @staticmethod + def recursive_to_dict(d): + if isinstance(d, defaultdict): + d = {k: r_total_calc.recursive_to_dict(v) for k, v in d.items()} + elif isinstance(d, dict): + d = {k: r_total_calc.recursive_to_dict(v) for k, v in d.items()} + return d + + @staticmethod + def concentration_calc(temp,pressure,ratio): + # water=ct.Water() + # water.TQ=temp,0 + # water_pressure=water.vapor_pressure + water_pressure=47379 # unit-> Pa + dry_pressure=pressure*1000-water_pressure # unit-> Pa + gas=ct.Solution('air.yaml') + gas.TPX=temp,dry_pressure,{'O2':ratio,'N2':1-ratio} + conc=gas.concentrations[gas.species_index('O2')] # unit-> mol/L + return conc,dry_pressure/1000 + + @staticmethod + def fitting(x,y,prefix,plot=False): + linear_func=lambda x,a,b: a*x+b + x,y=np.array(x),np.array(y) + popt,pcov=curve_fit(linear_func,x,y) + a,b=popt + y_pred=linear_func(x, a, b) + residuals=y-y_pred + mse=np.mean(residuals**2) + r2=1-np.sum(residuals**2)/np.sum((y-np.mean(y))**2) + results={'function':'a*x+b', + 'a':a, + 'b':b, + 'a_err':pcov[0][0], + 'b_err':pcov[1][1], + 'mse':mse, + 'r2':r2 + } + if plot: + plt.figure() + plt.scatter(x,y,label='Data') + plt.plot(x,y_pred,'r-',label=f'Fit:y={a:.3f}x+{b:.3f}') + plt.xlabel("x") + plt.ylabel("y") + plt.title(f"Linear Fit (MSE={mse:.4g},R2={r2:.4f})") + plt.grid() + plt.legend() + plt.tight_layout() + plt.savefig(f'results/impedence/{prefix}_fitting.png') + + return results + + +if __name__ == "__main__": + os.makedirs('logs',exist_ok=True) + os.makedirs('results/impedence',exist_ok=True) + root_path='OTR/' + calc=r_total_calc(root_path=root_path) + # calc.parse_data_title() + # calc.parse_data() + # calc.r_calc() + calc.run_calc(long_out=True,fit_plot=True) \ No newline at end of file diff --git a/meatools/meatools/lsv.py b/meatools/meatools/lsv.py new file mode 100644 index 0000000..a4e4ea2 --- /dev/null +++ b/meatools/meatools/lsv.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python +import numpy as np +import matplotlib.pyplot as plt +import re,os,json +from scipy import interpolate +from scipy.stats import mstats +from glob import glob +from datetime import datetime +from pathlib import Path + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + +def find_and_sort_load_dta_files(root_folder='.'): + files = glob(os.path.join(root_folder, '**', '*.DTA'), recursive=True) + + # Create list of tuples (mtime, filepath) + file_info = [(os.path.getmtime(f), f) for f in files] + + # Sort by mtime (first element of tuple) + file_info.sort(reverse=False) + + return file_info + +def find_cv_subfolders(root_dir): + root_path = Path(root_dir) + csv_folders = {str(p.parent.relative_to(root_path)) + for p in root_path.glob('**/'+searchKey)} + + log.write("Subfolders containing CV DTA files:\n") + csv_folders=sorted(csv_folders) + for folder in csv_folders: + log.write(f"- {folder}" if folder != '.' else "- (root directory)\n") + return csv_folders + +def lsv_calc(u_subfolders): + results={} + for jk in range(len(u_subfolders)): + file_path=u_subfolders[jk] + file_info = find_and_sort_load_dta_files(file_path) + plt.figure(jk+1) + results[f"dir_{jk}"]={} + for i, (mtime, filepath) in enumerate(file_info, 1): + readable_time = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S') + results[f"dir_{jk}"]["file"]=filepath + results[f"dir_{jk}"]["time_stamp"]=readable_time + + with open(filepath, 'r') as f: + for _ in range(59): + next(f) + content_from_line65 = f.read() + + u = re.split('CURVE', content_from_line65) + plt.subplot(2,3,i) + dump={} + for j in range(min(5,len(u))): + with open('temp', 'w') as fileID: + fileID.write(u[j]) + A = np.loadtxt('temp', skiprows=2, usecols=range(8)) + plt.plot(A[:,2],A[:,3]) + if j>0 : + scanRate=np.median(np.abs(np.diff(A[:,2])/np.diff(A[:,1]))) + # print('rate='+str(scanRate)+' V/s') + #print('Vmin='+str(np.min(A[:,2]))+' V') + CVall = A[:, [2, 3]] + startV = CVall[0, 0] + updData = CVall[:, :2] + mask1 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + double1 = updData[mask1, :] + + # Interpolation + x_new = np.arange(0.35, 0.55, 0.001) + f1 = interpolate.interp1d(double1[:, 0], double1[:, 1], bounds_error=False) + double1_interp = f1(x_new) + + H2cx = np.quantile(double1_interp,0.99) + + # print(f"H2cx99%: {H2cx}") + n=len(double1[:,0]) + m =(n*np.sum(double1[:,0]*double1[:,1]) - np.sum(double1[:,0])*np.sum(double1[:,1])) /(n*np.sum(double1[:,0]*double1[:,0]) - np.sum(double1[:,0])**2) + b = (np.sum(double1[:,1]) - m*np.sum(double1[:,0])) / n + + H2cx2=m*0.8+b + # print(f"slopeReg: {m}") + # print(f"H2cxReg: {H2cx2}") + + dump[f"curve_{str(j)}"]={ + "rate (V/s)":scanRate, + "H2cx99%":H2cx, + "slopeReg":m, + "H2cxReg":H2cx2 + } + + + #plt.title(f"dd: {ECA:.1f}") + else: + + plt.title(f"{i}. {filepath}") + #if os.path.exists('temp'): + # os.remove('temp') + # + + results[f"dir_{jk}"]["data"]=dump + + plt.savefig(f'results/') + return results + +if __name__=="__main__": + os.makedirs('logs',exist_ok=True) + os.makedirs('results/lsv/',exist_ok=True) + log=open('logs/lsv.log','w') + + searchKey='LSV/*.DTA' + ECAcutoff = 0.08 + u_subfolders=find_cv_subfolders('.') + results=lsv_calc(u_subfolders) + + with open('results/lsv/lsv_results.json','w') as results_file: + json.dump(results,results_file,indent=2,cls=NumpyEncoder) + + + diff --git a/meatools/meatools/subcomands/__init__.py b/meatools/meatools/subcomands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/meatools/meatools/subcomands/__pycache__/__init__.cpython-312.pyc b/meatools/meatools/subcomands/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..0c63e83 Binary files /dev/null and b/meatools/meatools/subcomands/__pycache__/__init__.cpython-312.pyc differ diff --git a/meatools/meatools/subcomands/__pycache__/mea_proccess.cpython-312.pyc b/meatools/meatools/subcomands/__pycache__/mea_proccess.cpython-312.pyc new file mode 100644 index 0000000..f62133a Binary files /dev/null and b/meatools/meatools/subcomands/__pycache__/mea_proccess.cpython-312.pyc differ diff --git a/meatools/meatools/subcomands/__pycache__/test_squence.cpython-312.pyc b/meatools/meatools/subcomands/__pycache__/test_squence.cpython-312.pyc new file mode 100644 index 0000000..71b74ab Binary files /dev/null and b/meatools/meatools/subcomands/__pycache__/test_squence.cpython-312.pyc differ diff --git a/meatools/meatools/subcomands/conclude.py b/meatools/meatools/subcomands/conclude.py new file mode 100644 index 0000000..4435a03 --- /dev/null +++ b/meatools/meatools/subcomands/conclude.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python +import json,os,re +import numpy as np + +def read_json(filename,index=None): + try: + with open(f'results/{filename}','r') as f: + data = json.load(f) + if index is not None: + data=data[index] + except: + data={} + return data + + +def extract_row(data,key,target): + arr=np.array(data[key]) + idx=np.argmin(np.abs(arr - target)) + for k,v in data.items(): + if isinstance(v,list): + data[k]=v[idx] + return data + + +def strip_filename(filename: str) -> str: + """ + sample: + 'HRL_D048_05_Polarisation02_80C_H2_60%O2_2_5_250kpa - 20250816 1528.csv' + -> 'HRL_D048_05_Polarisation02_80C_H2_60%O2_2_5_250kpa' + """ + return re.split(r" - \d{8} \d{4}",filename)[0] + +def pairwise_average(big_dict): + items=list(big_dict.items()) + length=len(items) + results={} + + for i in range(0,length,2): + if i+1>=length: + name,data=items[i] + results[strip_filename(name)]=data + raise Warning("Odd files number is detected, please check the test results of polarization") + + name1,dict1=items[i] + name2,dict2=items[i+1] + new_name = strip_filename(name1) + + avg_dict = {} + for key in dict1.keys(): + v1,v2=dict1[key],dict2[key] + if isinstance(v1, list): + arr = np.array([v1, v2]) + avg_dict[key]=arr.mean(axis=0).tolist() + else: + avg_dict[key]=float((v1+v2)/2) + + results[new_name] = avg_dict + + return results + + + +if __name__=="__main__": + result_tol={} + + sample_name=os.path.basename(os.getcwd()) + + match=re.search(r"\((.*?)\)",sample_name) + if match: + station=match.group(1) + else: + station=None + # print(read_json('test_sequence/all_csv_results_in_timeline.json')["A2"]["data"].keys()) + sample_area=read_json('test_sequence/all_csv_results_in_timeline.json')["A2"]["data"]["cell_active_area"][0] + + test_seq=read_json('test_sequence/test_order_in_timeline.json') + + esca=read_json('ecsa_normal/ecsa_results.json') + for dir,results in esca.items(): + for file,info in results.items(): + eca,dd=[],[] + for curve,value in info["data"].items(): + if isinstance(value, dict) and "ECA" in value: + eca.append(value["ECA"]) + dd.append(value["dd"]) + avg_eca=float(np.mean(eca)) + avg_dd=float(np.mean(dd)) + info["avg_ECA"]=avg_eca + info["avg_dd"]=avg_dd + + lsv=read_json('lsv/lsv_results.json') + for dir,results in lsv.items(): + slopereg=[] + for file,info in results.items(): + for curve,value in info["data"].items(): + if isinstance(value, dict) and "slopeReg" in value: + slopereg.append(value["slopeReg"]) + avg_reg=float(np.mean(slopereg)) + results["avg_slopereg"]=avg_reg + + pol=read_json('polarization/polarization_results.json') + for file,data in pol.items(): + try: + data=extract_row(data,"current density (A cm^(-2))",1) + except: + pass + try: + pol=pairwise_average(pol) + except: + pass + + otr=read_json('impedence/final_results.json') + + ecsa_dry=read_json('ecsa_dry/ecsa_results.json') + for dir,results in ecsa_dry.items(): + for file,info in results["data"].items(): + eca,dd=[],[] + for curve,value in info.items(): + if isinstance(value, dict) and "ECA" in value: + eca.append(value["ECA"]) + dd.append(value["dd"]) + avg_eca=float(np.mean(eca)) + avg_dd=float(np.mean(dd)) + info["avg_ECA"]=avg_eca + info["avg_dd"]=avg_dd + + eis=read_json('eis/eis_results.json') + + + result_tol["sample"]=sample_name + result_tol["station_num."]=station + result_tol["sample_area (cm^2)"]=sample_area + result_tol["Test_Sequence"]=test_seq + result_tol["ECSA"]=esca + result_tol["ECSA_Dry"]=ecsa_dry + result_tol["LSV"]=lsv + result_tol["Polarization"]=pol + result_tol["O_Transfer_Resistance"]=otr + result_tol["EIS"]=eis + + + with open('results.json','w') as f: + json.dump(result_tol,f,indent=2) + + diff --git a/meatools/meatools/subcomands/ecsa_dry.py b/meatools/meatools/subcomands/ecsa_dry.py new file mode 100644 index 0000000..eb9f7d4 --- /dev/null +++ b/meatools/meatools/subcomands/ecsa_dry.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python +import numpy as np +import matplotlib.pyplot as plt +import re,os,json +import os +from scipy import interpolate +from scipy.stats import mstats +from glob import glob +from datetime import datetime +from pathlib import Path + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + +def find_and_sort_load_dta_files(root_folder='.'): + files = glob(os.path.join(root_folder, '**', '*cv*.DTA'), recursive=True) + file_info = [(os.path.getmtime(f), f) for f in files] + file_info.sort(reverse=False) + return file_info + +def find_cv_subfolders(root_dir): + root_path = Path(root_dir) + csv_folders = {str(p.parent.relative_to(root_path)) + for p in root_path.glob('**/'+searchKey)} + log.write("Subfolders containing CV DTA files:\n") + csv_folders=sorted(csv_folders) + for folder in csv_folders: + log.write(f"- {folder}\n" if folder != '.' else "- (root directory)\n") + return csv_folders + +def plot_COtripping(file_info): + data_dump={} + for i, (mtime, filepath) in enumerate(file_info, 1): + dump={} + readable_time = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S') + dump["time_stamp"]=readable_time + dump["file"]=filepath + + with open(filepath, 'r') as f: + for _ in range(65): + next(f) + content_from_line65 = f.read() + + u = re.split('CURVE', content_from_line65) + oldUpper = x_CO*0 + + for j in range(len(u)): + with open('temp', 'w') as fileID: + fileID.write(u[j]) + A = np.loadtxt('temp', skiprows=2, usecols=range(8)) + plt.subplot(2,3,j+1) + plt.plot(A[:,2],A[:,3]) + if j>0 and j<(len(u)-1): + scanRate=np.median(np.abs(np.diff(A[:,2])/np.diff(A[:,1]))) + # print('rate='+str(scanRate)+' V/s') + # print('Vmin='+str(np.min(A[:,2]))+' V' + + CVall = A[:, [2, 3]] + startV = CVall[0, 0] + updData = CVall[:, :2] + mask1 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + double1 = updData[mask1, :] + mask2 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & (np.concatenate(([0], np.diff(updData[:, 0]))) < 0) + double2 = updData[mask2, :] + + double1 = double1[np.argsort(double1[:, 0])] + double2 = double2[np.argsort(double2[:, 0])] + double1 = double1[np.concatenate(([True], np.diff(double1[:, 0]) != 0)), :] + double2 = double2[np.concatenate(([True], np.diff(double2[:, 0]) != 0)), :] + + # CO stripping area + maskCO = (updData[:, 0] > 0.5) & (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + topLimits = updData[maskCO, :] + fCO = interpolate.interp1d(topLimits[:, 0], topLimits[:, 1], bounds_error=False) + newUpper = fCO(x_CO) + plt.subplot(2,3,1) + plt.plot(x_CO,newUpper) + oldUpper =np.maximum(oldUpper,newUpper) + + # Interpolation + x_new = np.arange(0.35, 0.451, 0.001) + f1 = interpolate.interp1d(double1[:, 0], double1[:, 1], bounds_error=False) + double1_interp = f1(x_new) + f2 = interpolate.interp1d(double2[:, 0], double2[:, 1], bounds_error=False) + double2_interp = f2(x_new) + + ddouble = np.abs(double1_interp - double2_interp) + ddouble = ddouble[~np.isnan(ddouble)] + doubleMean = np.median(ddouble) + # print(f"dd: {doubleMean}") + + # Process updData + updData = updData[np.concatenate(([True], np.diff(updData[:, 0]) > 0)), :] + base = mstats.mquantiles(updData[(updData[:, 0] > 0.4) & (updData[:, 0] < 0.6), 1], 0.25) + updData[:, 1] = updData[:, 1] - base + updData = updData[updData[:, 1] > 0, :] + updData = updData[(updData[:, 0] <= 0.4) & (updData[:, 0] > ECAcutoff), :] + updData = updData[np.argsort(updData[:, 0]), :] + + area = np.sum(np.diff(updData[:, 0]) * updData[:-1, 1]) # in mAV + QH = area / scanRate + ECA = QH / 2.1e-4 + # print(f"ECA: {ECA}") + dump[f"curve_{str(j)}"]={} + dump[f"curve_{str(j)}"]["rate (V/s)"]=scanRate + dump[f"curve_{str(j)}"]["Vmin (V)"]=np.min(A[:,2]) + dump[f"curve_{str(j)}"]["dd"]=doubleMean + dump[f"curve_{str(j)}"]["ECA"]=ECA + + elif j==0: + CVall = A[:, [2, 3]] + updData = CVall[:, :2] + maskCO = (updData[:, 0] > 0.5) & (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + topLimits = updData[maskCO, :] + fCO = interpolate.interp1d(topLimits[:, 0], topLimits[:, 1], bounds_error=False) + COdesorb = fCO(x_CO) + if os.path.exists('temp'): + os.remove('temp') + data_dump[f"file_{str(i)}"]=dump + + return oldUpper, COdesorb, data_dump + + +if __name__=="__main__": + os.makedirs('logs',exist_ok=True) + os.makedirs('results/ecsa_dry/',exist_ok=True) + log=open('logs/ecsa_normal.log','w') + index=1 + results={} + + searchKey='Cathode CO*/*cv*.DTA' + ECAcutoff = 0.08 + x_CO = np.arange(0.5, 0.95, 0.001) + u_subfolders=find_cv_subfolders('.') + + for i, folderName in enumerate(u_subfolders,start=1): + results[f"dir_{str(i)}"]={} + + plt.figure(index) + index+=1 + file_info = find_and_sort_load_dta_files(u_subfolders[i-1]) + [oldUpper,COdesorb,data]=plot_COtripping(file_info) + plt.savefig(f'results/ecsa_dry/ECSA_Dry_{i}-1.png') + + plt.figure(index) + index+=1 + plt.plot(x_CO,oldUpper,'r--') + plt.plot(x_CO,COdesorb,'b-') + COECA=np.nansum( np.mean(np.diff(x_CO))*(COdesorb-oldUpper)) + plt.savefig(f'results/ecsa_dry/ECSA_Dry_{i}-2.png') + + results[f"dir_{str(i)}"]["data"]=data + results[f"dir_{str(i)}"]["COECA"]=COECA + with open('results/ecsa_dry/ecsa_results.json','w') as results_file: + json.dump(results,results_file,indent=2,cls=NumpyEncoder) + + log.close() \ No newline at end of file diff --git a/meatools/meatools/subcomands/ecsa_normal.py b/meatools/meatools/subcomands/ecsa_normal.py new file mode 100644 index 0000000..addf3d1 --- /dev/null +++ b/meatools/meatools/subcomands/ecsa_normal.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python +import numpy as np +import matplotlib.pyplot as plt +import re,os,json +from scipy import interpolate +from scipy.stats import mstats +from datetime import datetime +from pathlib import Path +from collections import Counter +from glob import glob + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + +def find_and_sort_load_dta_files(root_folder): + files = glob(os.path.join(root_folder, '**', searchKey), recursive=True) + file_info = [(os.path.getmtime(f), f) for f in files] + file_info.sort(reverse=False) + return file_info + +def find_cv_subfolders(root_dir='/ECSA'): + root_path = Path(root_dir).resolve() + csv_folders = {str(p.parent) for p in root_path.glob('**/' + searchKey)} + csv_folders = sorted(csv_folders) + log.write("Subfolders containing CV DTA files:\n") + for folder in csv_folders: + log.write(f"\t{folder}\n") + log.write("**" * 80+'\n') + return csv_folders + +def process_curve_data(A, ECAcutoff): + scanRate = np.median(np.abs(np.diff(A[:, 2]) / np.diff(A[:, 1]))) + CVall = A[:, [2, 3]] + updData = CVall[:, :2] + + # extract double1&double2 + mask1 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & \ + (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + double1 = updData[mask1, :] + + mask2 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & \ + (np.concatenate(([0], np.diff(updData[:, 0]))) < 0) + double2 = updData[mask2, :] + + # ranking + double1 = double1[np.argsort(double1[:, 0])] + double2 = double2[np.argsort(double2[:, 0])] + double1 = double1[np.concatenate(([True], np.diff(double1[:, 0]) != 0)), :] + double2 = double2[np.concatenate(([True], np.diff(double2[:, 0]) != 0)), :] + + # Interpolation + x_new = np.arange(0.35, 0.451, 0.001) + f1 = interpolate.interp1d(double1[:, 0], double1[:, 1], bounds_error=False) + double1_interp = f1(x_new) + f2 = interpolate.interp1d(double2[:, 0], double2[:, 1], bounds_error=False) + double2_interp = f2(x_new) + + ddouble = np.abs(double1_interp - double2_interp) + ddouble = ddouble[~np.isnan(ddouble)] + doubleMean = np.median(ddouble) + + + # calculating ECSA + updData = updData[np.concatenate(([True], np.diff(updData[:, 0]) > 0)), :] + base = mstats.mquantiles(updData[(updData[:, 0] > 0.4) & (updData[:, 0] < 0.6), 1], 0.25) + updData[:, 1] -= base + updData = updData[updData[:, 1] > 0, :] + updData = updData[(updData[:, 0] <= 0.4) & (updData[:, 0] > ECAcutoff), :] + updData = updData[np.argsort(updData[:, 0]), :] + + area = np.sum(np.diff(updData[:, 0]) * updData[:-1, 1]) # in mAV + QH = area / scanRate + ECA = QH / 2.1e-4 + + # print(f'rate={scanRate} V/s') + # print(f'Vmin={np.min(A[:, 2])} V') + # print(f"dd: {doubleMean}") + # print(f"ECA: {ECA}") + + dump={ + "rate (V/s)":scanRate, + "Vmin (V)":np.min(A[:, 2]), + "dd":doubleMean, + "ECA":ECA + } + + return dump + +def parse_dta_format1(filepath, ECAcutoff): + """running when len(u) < 2""" + + with open(filepath, 'r') as file: + lines = file.readlines() + + # Find the line where CURVE1 starts + curve_start = next((i for i, line in enumerate(lines) if line.strip().startswith('CURVE')), None) + data_lines = lines[curve_start + 3:] + data = [line.split()[-1] for line in data_lines if line.strip() and len(line.split()) >= 10] + + value_counts = Counter(data) + log.write("\nValue counts in the last column:\n") + for value, count in value_counts.items(): + log.write(f"Value {value}: {count} occurrences\n") + + data_dump={} + for value in value_counts.keys(): + if 0 < float(value) < len(value_counts) - 1: + with open('temp', 'w') as f: + for line in data_lines: + if line.strip() and len(line.split()) >= 10 and line.split()[-1] == value: + f.write(line.strip() + '\n') + + A = np.loadtxt('temp', skiprows=2, usecols=range(8)) + plt.subplot(2, 3, int(value)) + plt.plot(A[:, 2], A[:, 3]) + dump=process_curve_data(A, ECAcutoff) + data_dump[f"curve_{str(value)}"]=dump + os.remove('temp') + data_dump["file_path"]=filepath + return data_dump + +def parse_dta_format2(filepath, ECAcutoff): + """running when len(u) >= 2 """ + with open(filepath, 'r') as f: + for _ in range(65): + next(f) + content_from_line65 = f.read() + + u = re.split('CURVE', content_from_line65) + + data_dump={} + for j in range(len(u)): + with open('temp', 'w') as fileID: + fileID.write(u[j]) + A = np.loadtxt('temp', skiprows=2, usecols=range(8)) + plt.subplot(2, 3, j + 1) + plt.plot(A[:, 2], A[:, 3]) + if 0 < j < len(u) - 1: + dump=process_curve_data(A, ECAcutoff) + data_dump[f"curve_{str(j)}"]=dump + os.remove('temp') + data_dump["file_path"]=filepath + return data_dump + +if __name__ == "__main__": + ECAcutoff = 0.08 + searchKey = '*cv*.DTA' + os.makedirs('logs',exist_ok=True) + os.makedirs('results/ecsa_normal/',exist_ok=True) + log=open('logs/ecsa_normal.log','w') + u_subfolders = find_cv_subfolders('./ECSA/') + results={} + for jk, file_path in enumerate(u_subfolders,start=1): + file_info = find_and_sort_load_dta_files(file_path) + plt.figure(jk) + results[f"dir_{str(jk)}"]={} + + for i, (mtime, filepath) in enumerate(file_info, 1): + with open(filepath, 'r') as f: + for _ in range(65): + next(f) + content_from_line65 = f.read() + + if len(re.split('CURVE', content_from_line65)) < 2: + log.write('different format'+str(len(re.split('CURVE', content_from_line65)))) + data_dump=parse_dta_format1(filepath, ECAcutoff) + else: + data_dump=parse_dta_format2(filepath, ECAcutoff) + + readable_time = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S') + results_jk_i={ + "time_stamp":readable_time, + "data":data_dump + } + results[f"dir_{str(jk)}"][f"file_{str(i)}"]=results_jk_i + plt.savefig(f'results/ecsa_normal/ECSA_{jk}.png') + + + with open('results/ecsa_normal/ecsa_results.json','w') as results_file: + json.dump(results,results_file,indent=2,cls=NumpyEncoder) + log.close() \ No newline at end of file diff --git a/meatools/meatools/subcomands/eis.py b/meatools/meatools/subcomands/eis.py new file mode 100644 index 0000000..a2beb9b --- /dev/null +++ b/meatools/meatools/subcomands/eis.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python +import numpy as np +import matplotlib.pyplot as plt +import os,json +from glob import glob +from datetime import datetime +import matplotlib.pylab as plt +from sklearn.linear_model import HuberRegressor +from sklearn.metrics import r2_score +from pathlib import Path + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + +# def find_and_sort_load_dta_files(root_folder='.'): +# files = glob(os.path.join(root_folder, '**', '*.DTA'), recursive=True) +# file_info = [(os.path.getmtime(f), f) for f in files] +# file_info.sort(reverse=False) +# return file_info + +def find_and_sort_load_dta_files(root_folder='.', candidates=('EIS','PEIS','GEIS')): + root = Path(root_folder) + files = [] + for folder in candidates: + target_dirs = list(root.rglob(folder)) # 找到叫这些名字的目录 + for d in target_dirs: + files.extend(d.rglob("*.DTA")) + files.extend(d.rglob("*.dta")) + + file_info = [(f.stat().st_mtime, str(f)) for f in files if f.stat().st_size > 0] + file_info.sort(key=lambda x: x[0]) + return file_info + + +def read_dta_data(file_path): + data=[] + with open(file_path,'r',encoding='ISO-8859-1') as f: + txt_line=f.readlines() + i=0 + while i0]) + hfr_idx_neg=np.argmin(np.abs(zimag[zimag<0])) if np.any(zimag<0) else hfr_idx_pos + hfr_idx_pos,hfr_idx_neg=np.where(zimag>0)[0][hfr_idx_pos],np.where(zimag<0)[0][hfr_idx_neg] + hfr=(zreal[hfr_idx_pos]+zreal[hfr_idx_neg])/2 + + ### R_ion calculation + # mask=(freq>=1)&(freq<=4) + # zreal_sel,zimag_sel=zreal[mask],zimag[mask] + # diffirencials=[0] + # for i in range(1,len(zreal_sel)): + # diff=(zimag[i]-zimag[i-1])/(zreal[i]-zreal[i-1]) + # diffirencials.append(diff) + # diffirencials=np.array(diffirencials) + # slope_mask=np.where(np.abs(diffirencials[1:]-diffirencials[:-1])=0.999 + mask1=p1_1[2,:]>=np.quantile(p1_1[2,:],0.9) + mask2=p1_1[4,:]<=20 + mask=(mask1 | mask0) & mask2 + + plt.subplot(2,3,4) + plt.plot(p1_1[4, :],p1_1[0, :],'o')# y intercept + plt.xscale('log') + plt.subplot(2,3,5) + plt.plot(p1_1[4, :],p1_1[3, :],'o', alpha=0.3)# x intercept + plt.xscale('log') + plt.yscale('log') + plt.plot(p1_1[4, mask],p1_1[3, mask],'o') + plt.grid(True, which='major', linestyle='-', alpha=0.8) + plt.grid(True, which='minor', linestyle=':', alpha=0.4) + plt.subplot(2,3,6) + plt.plot(p1_1[4, :],p1_1[2, :],'o') + plt.xscale('log') + plt.savefig(f'results/eis/curve{index}.png') + + median=np.median(p1_1[2, mask]) + std=np.std(p1_1[2, mask]) + r_ion=(median-hfr)*3 + r_ion_std=std*3 + sample_num=len(p1_1[2, mask]) + + # ### Cdl calculation + # Freq = np.logspace(freq_range[0],freq_range[1],int((freq_range[1]-freq_range[0])/0.1)+1) + # try: + # f_interp=interp1d(freq,zimag,kind='linear',bounds_error=False,fill_value=np.nan) + # img_low=f_interp(Freq) + # except Exception as e: + # print("There is something wrong with interpotation for Freq and Zimag") + # img_low=np.full_like(Freq,np.nan) + # with np.errstate(divide='ignore',invalid='ignore'): + # Cdl=1.0/(img_low*Freq*2*np.pi) + + return hfr,r_ion,r_ion_std,sample_num + +def main(): + os.makedirs('results/eis/',exist_ok=True) + results={} + file_info = find_and_sort_load_dta_files('.',candidates=('EIS','PEIS')) + for i,(filetime,filepath) in enumerate(file_info): + readable_time=datetime.fromtimestamp(filetime).strftime('%Y-%m-%d %H:%M:%S') + data=read_dta_data(filepath) + hfr,r_ion,r_ion_std,sample_num=EIS_calc(data,i,filepath) + data_dump={"filename":filepath, + "filetime":readable_time, + "HFR (ohm)":hfr, + "R_ion (ohm)":f"{r_ion}±{r_ion_std},sample_num={sample_num}", + } + results[f"file_{i+1}"]=data_dump + with open('results/eis/eis_results.json','w') as results_file: + json.dump(results,results_file,indent=2,cls=NumpyEncoder) + +if __name__=="__main__": + # os.makedirs('logs',exist_ok=True) + # log=open('logs/ecsa_normal.log','w') + main() \ No newline at end of file diff --git a/meatools/meatools/subcomands/impedence_calc.py b/meatools/meatools/subcomands/impedence_calc.py new file mode 100644 index 0000000..9dc9c24 --- /dev/null +++ b/meatools/meatools/subcomands/impedence_calc.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python +import numpy as np +from test_squence import extract_data_from_file as edf +import os,re,json +from collections import defaultdict +import cantera as ct +from scipy.optimize import curve_fit +import matplotlib.pyplot as plt + +class r_total_calc(): + def __init__(self,root_path,temp=80): + self.root=os.path.abspath(root_path) + self.temp=temp+273.15 + + + def parse_data_title(self): + results=defaultdict(lambda:defaultdict(dict)) + for o2_dir in os.listdir(self.root): + if o2_dir.startswith('._'): + continue + o2_path=os.path.join(self.root,o2_dir) + if os.path.isdir(o2_path): + match=re.search(r"(\d+(?:\.\d+)?)%O2",o2_path) + if match: + o2_fraction=float(match.group(1))/100 + for file in os.listdir(o2_path): + if file.startswith('._') or not file.endswith('.csv'): + continue + match=re.search(r"(\d+)\s*kPa[a]?",file,re.IGNORECASE) + if match: + pressure=match.group(1) + filepath=os.path.join(o2_path,file) + results[o2_fraction][pressure]['data']={} + results[o2_fraction][pressure]['file_path']=filepath + + self.init_results=self.recursive_to_dict(results) + + return self.init_results + + def parse_data(self,long_out=False): + for conc,info in self.init_results.items(): + for pressure,data in info.items(): + file_path=data['file_path'] + results=edf(file_path) + data['data']['current']=list(results['data']['current']) + data['data']['current_density']=list( + np.array(results['data']['current'])/ + np.array(results['data']['cell_active_area']) + ) + o_conc,dry=self.concentration_calc(self.temp,float(pressure),float(conc)) + data['data']['o_concentration']=o_conc + data['data']['dry_pressure']=dry + if long_out: + with open('results/impedence/raw_data.json','w') as f: + json.dump(self.init_results,f,indent=2) + f.close() + + return self.init_results + + def r_calc(self,fit_plot=False): + results=defaultdict(lambda:defaultdict(list)) + for conc,info in self.init_results.items(): + for pressure,data in info.items(): + cd=np.array(data['data']['current_density']) + cut_off=int(len(cd)*0.8) + cd_avg=np.mean(cd[:cut_off]) + results[pressure]['current_density'].append(cd_avg) + results[pressure]['o_concentration'].append(data['data']['o_concentration']) + results[pressure]['dry_pressure'].append(data['data']['dry_pressure']) + # print(results) + for pressure,v in results.items(): + o_conc=v['o_concentration'] + cur_d=v['current_density'] + fitted=self.fitting(o_conc,cur_d,prefix=pressure,plot=fit_plot) + r_total=4*96485/1000/fitted['a'] + results[pressure]['r_total'].append(r_total) + with open('results/impedence/fitted_r_total.json','w') as f: + json.dump(results,f,indent=2) + f.close() + + self.fitted_results=results + + return self.fitted_results + + def run_calc(self,long_out=False,fit_plot=False): + self.parse_data_title() + self.parse_data(long_out) + self.r_calc(fit_plot) + pressures,rs_total=[],[] + for pressure,data in self.fitted_results.items(): + pressures.append(data['dry_pressure'][0]) + rs_total.extend(data['r_total']) + results=self.fitting(pressures,rs_total,prefix='final',plot=fit_plot) + r_diff=(101*results['a']+results['b'])*100 + r_other=101*results['b'] + results["r_diff (s m^-1)"]=r_diff + results["r_other (s m^-1)"]=r_other + results=self.recursive_to_dict(results) + # print(results) + with open('results/impedence/final_results.json','w') as f: + json.dump(results,f,indent=2) + f.close() + + @staticmethod + def recursive_to_dict(d): + if isinstance(d, defaultdict): + d = {k: r_total_calc.recursive_to_dict(v) for k, v in d.items()} + elif isinstance(d, dict): + d = {k: r_total_calc.recursive_to_dict(v) for k, v in d.items()} + return d + + @staticmethod + def concentration_calc(temp,pressure,ratio): + # water=ct.Water() + # water.TQ=temp,0 + # water_pressure=water.vapor_pressure + water_pressure=57875 # unit-> Pa 85 C + dry_pressure=pressure*1000-water_pressure # unit-> Pa + gas=ct.Solution('air.yaml') + gas.TPX=temp,dry_pressure,{'O2':ratio,'N2':1-ratio} + conc=gas.concentrations[gas.species_index('O2')] # unit-> mol/L + return conc,dry_pressure/1000 + + @staticmethod + def fitting(x,y,prefix,plot=False): + linear_func=lambda x,a,b: a*x+b + x,y=np.array(x),np.array(y) + popt,pcov=curve_fit(linear_func,x,y) + a,b=popt + y_pred=linear_func(x, a, b) + residuals=y-y_pred + mse=np.mean(residuals**2) + r2=1-np.sum(residuals**2)/np.sum((y-np.mean(y))**2) + results={'function':'a*x+b', + 'a':a, + 'b':b, + 'a_err':pcov[0][0], + 'b_err':pcov[1][1], + 'mse':mse, + 'r2':r2 + } + if plot: + plt.figure() + plt.scatter(x,y,label='Data') + plt.plot(x,y_pred,'r-',label=f'Fit:y={a:.3f}x+{b:.3f}') + plt.xlabel("x") + plt.ylabel("y") + plt.title(f"Linear Fit (MSE={mse:.4g},R2={r2:.4f})") + plt.grid() + plt.legend() + plt.tight_layout() + plt.savefig(f'results/impedence/{prefix}_fitting.png') + + return results + + +if __name__ == "__main__": + os.makedirs('logs',exist_ok=True) + os.makedirs('results/impedence',exist_ok=True) + root_path='OTR/' + calc=r_total_calc(root_path=root_path) + # calc.parse_data_title() + # calc.parse_data() + # calc.r_calc() + calc.run_calc(long_out=True,fit_plot=True) \ No newline at end of file diff --git a/meatools/meatools/subcomands/lsv.py b/meatools/meatools/subcomands/lsv.py new file mode 100644 index 0000000..3db5b0a --- /dev/null +++ b/meatools/meatools/subcomands/lsv.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +import numpy as np +import matplotlib.pyplot as plt +import re,os,json +from scipy import interpolate +from scipy.stats import mstats +from glob import glob +from datetime import datetime +from pathlib import Path + +# plt.rcParams['font.sans-serif'] = ['SimHei'] +plt.rcParams['font.sans-serif'] = ['Arial Unicode MS'] +plt.rcParams['axes.unicode_minus'] = False + + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + +def find_and_sort_load_dta_files(root_folder='.'): + files = glob(os.path.join(root_folder, '**', '*.DTA'), recursive=True) + + + # Create list of tuples (mtime, filepath) + file_info = [(os.path.getmtime(f), f) for f in files] + + # Sort by mtime (first element of tuple) + file_info.sort(reverse=False) + return file_info + +def find_cv_subfolders(root_dir): + root_path = Path(root_dir) + csv_folders = {str(p.parent.relative_to(root_path)) + for p in root_path.glob('**/'+searchKey)} + + log.write("Subfolders containing CV DTA files:\n") + csv_folders=sorted(csv_folders) + # print(csv_folders) + for folder in csv_folders: + log.write(f"- {folder}\n") + return csv_folders + +def lsv_calc(u_subfolders): + results={} + for jk in range(len(u_subfolders)): + file_path=u_subfolders[jk] + file_info = find_and_sort_load_dta_files(file_path) + plt.figure(jk+1) + results[f"dir_{jk}"]={} + for i, (mtime, filepath) in enumerate(file_info, 1): + results[f"dir_{jk}"][str(i)]={} + readable_time = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d %H:%M:%S') + results[f"dir_{jk}"][str(i)]["file"]=filepath + results[f"dir_{jk}"][str(i)]["time_stamp"]=readable_time + + with open(filepath, 'r') as f: + for _ in range(59): + next(f) + content_from_line65 = f.read() + + u = re.split('CURVE', content_from_line65) + plt.subplot(2,3,i) + dump={} + for j in range(min(5,len(u))): + with open(os.path.join(os.getcwd(),'temp'), 'w') as fileID: + # print(os.path.join(os.getcwd(),'temp')) + fileID.write(u[j]) + A = np.loadtxt(os.path.join(os.getcwd(),'temp'), skiprows=2, usecols=range(8)) + plt.plot(A[:,2],A[:,3]) + if j>0 : + scanRate=np.median(np.abs(np.diff(A[:,2])/np.diff(A[:,1]))) + # print('rate='+str(scanRate)+' V/s') + #print('Vmin='+str(np.min(A[:,2]))+' V') + CVall = A[:, [2, 3]] + startV = CVall[0, 0] + updData = CVall[:, :2] + mask1 = (updData[:, 0] > 0.3) & (updData[:, 0] < 0.6) & (np.concatenate(([0], np.diff(updData[:, 0]))) > 0) + double1 = updData[mask1, :] + + vol=updData[:,0] + index=np.argmin(abs(vol-0.4)) + H2cx3=updData[:,1][index] + + + # Interpolation + x_new = np.arange(0.35, 0.55, 0.001) + f1 = interpolate.interp1d(double1[:, 0], double1[:, 1], bounds_error=False) + double1_interp = f1(x_new) + + H2cx = np.quantile(double1_interp,0.99) + + # print(f"H2cx99%: {H2cx}") + n=len(double1[:,0]) + m =(n*np.sum(double1[:,0]*double1[:,1]) - np.sum(double1[:,0])*np.sum(double1[:,1])) /(n*np.sum(double1[:,0]*double1[:,0]) - np.sum(double1[:,0])**2) + b = (np.sum(double1[:,1]) - m*np.sum(double1[:,0])) / n + + H2cx2=m*0.8+b + + + # print(f"slopeReg: {m}") + # print(f"H2cxReg: {H2cx2}") + + voltage,cur=A[:,2],A[:,3] + lsv_membrane=cur[np.argmin(np.abs(voltage-0.4))] + + + dump[f"curve_{str(j)}"]={ + "rate (V/s)":scanRate, + "H2cx99%":H2cx, + "slopeReg":m, + "H2cx_800mV":H2cx2, + "H2cx_0mV":b, + "H2cx_400mV":H2cx3, + "lsv_membrane*area":lsv_membrane + } + + + #plt.title(f"dd: {ECA:.1f}") + else: + + plt.title(f"{i}. {filepath}") + #if os.path.exists('temp'): + # os.remove('temp') + # + + results[f"dir_{jk}"][str(i)]["data"]=dump + + plt.savefig(f'results/lsv/lsv_results.png') + return results + +if __name__=="__main__": + os.makedirs('logs',exist_ok=True) + os.makedirs('results/lsv/',exist_ok=True) + log=open('logs/lsv.log','w') + + searchKey='LSV/**/*.DTA' + ECAcutoff = 0.08 + u_subfolders=find_cv_subfolders('.') + results=lsv_calc(u_subfolders) + + with open('results/lsv/lsv_results.json','w') as results_file: + json.dump(results,results_file,indent=2,cls=NumpyEncoder) + + + diff --git a/meatools/meatools/subcomands/mea_proccess.py b/meatools/meatools/subcomands/mea_proccess.py new file mode 100644 index 0000000..1d64ac9 --- /dev/null +++ b/meatools/meatools/subcomands/mea_proccess.py @@ -0,0 +1,44 @@ +import subprocess,sys,os + +# def run_evt(args=None): +# path=os.path.join(os.path.dirname(__file__),'lasp_train_evt') +# subprocess.run(['bash',path]) + +def run_test_sequence(args=None): + path=os.path.join(os.path.dirname(__file__),'test_squence.py') + subprocess.run([sys.executable,path]) + +def run_otr(args=None): + path=os.path.join(os.path.dirname(__file__),'impedence_calc.py') + subprocess.run([sys.executable,path]) + +def run_ecsa(args=None): + path=os.path.join(os.path.dirname(__file__),'ecsa_normal.py') + subprocess.run([sys.executable,path]) + +def run_ecsa_dry(args=None): + path=os.path.join(os.path.dirname(__file__),'ecsa_dry.py') + subprocess.run([sys.executable,path]) + +def run_lsv(args=None): + path=os.path.join(os.path.dirname(__file__),'lsv.py') + subprocess.run([sys.executable,path]) + +def run_conclude(args=None): + path=os.path.join(os.path.dirname(__file__),'conclude.py') + subprocess.run([sys.executable,path]) + +def run_eis(args=None): + path=os.path.join(os.path.dirname(__file__),'eis.py') + subprocess.run([sys.executable,path]) + + +def run_all(args=None): + dirs=[dir for dir in os.listdir() if os.path.isdir(dir)] + run_test_sequence() + if "OTR" in dirs: + run_otr() + run_ecsa() + run_ecsa_dry() + run_lsv() + run_conclude() \ No newline at end of file diff --git a/meatools/meatools/subcomands/test_squence.py b/meatools/meatools/subcomands/test_squence.py new file mode 100644 index 0000000..91b904a --- /dev/null +++ b/meatools/meatools/subcomands/test_squence.py @@ -0,0 +1,571 @@ +#!/usr/bin/env python +import os,json +from datetime import datetime, timedelta +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +from glob import glob +from datetime import datetime +import pandas as pd +# plt.rcParams['font.sans-serif'] = ['SimHei'] +plt.rcParams['font.sans-serif'] = ['Arial Unicode MS'] +plt.rcParams['axes.unicode_minus'] = False + + +""" +When I wrote this code, only God and I know it. +Now ... +ONLY God knows... +""" + + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + +def find_csv_files(root_path): + """Find all CSV files recursively with relative paths""" + return [os.path.join(root, f) + for root, _, files in os.walk(root_path) + for f in files if f.lower().endswith('.csv')] + +# def find_csv_files(): +# """Find all CSV files recursively in the current working directory""" +# root_path = os.getcwd() # Use current working directory +# return [os.path.join(root, f) +# for root, _, files in os.walk(root_path) +# for f in files if f.lower().endswith('.csv')] + +def find_csv_pol_files(root_path): + files = glob(os.path.join(root_path, '**', searchKey), recursive=True) + + # Create list of tuples (mtime, filepath) + #file_info = [(os.path.getmtime(f), f) for f in files] + + # Sort by mtime (first element of tuple) + #file_info.sort(reverse=False) + + return files #file_info + +def extract_start_times(csv_files): + """Extract start times from files with validation""" + results = [] + + for filepath in csv_files: + try: + with open(filepath, 'r', encoding='ISO-8859-1') as f: + lines = [line.strip() for line in f.readlines()] + + for line_num in [5, 6, 7]: # Lines 6-8 (0-indexed 5-7) + if line_num >= len(lines): + continue + + parts = lines[line_num].split(',') or lines[line_num].split('\\t') + if len(parts) >= 2 and parts[0].strip() == 'Start time': + time_str = parts[1].strip() + try: + time_obj = datetime.strptime(time_str, '%m/%d/%y %H:%M:%S') + results.append({ + 'path': filepath, + 'time': time_obj, + 'time_str': time_str, + 'line': line_num + 1 + }) + break + except ValueError: + continue + except Exception as e: + log.write(f"\nError processing {os.path.basename(filepath)}: {e}\n") + + return sorted(results, key=lambda x: x['time']) + +def extract_data_from_file(filepath): + #print('active area col #'+str(find_column_number(filepath,'cell_active_area'))) + with open(filepath, 'r', encoding='ISO-8859-1') as f: + lines = [line.strip() for line in f.readlines()] + + # Find "Time stamp" line (case-insensitive) + data_start = None + for i, line in enumerate(lines): + if line.lower().startswith('time stamp'): + data_start = i + break + # print(data_start) + if data_start is None: + log.write(f"\nNo 'Time stamp' line found in {filepath}\n") + return None + + # Load data with numpy + data = np.genfromtxt(filepath, + delimiter=',', + skip_header=data_start+1, invalid_raise='false',encoding='ISO-8859-1') + + # Get header line to find column indices + header = lines[data_start].lower().split(',') + col_indices = {} + + # Find our target columns + targets = [ + 'elapsed time', + 'current', + 'current_set', + 'cell_voltage_001', + lambda x: x.endswith('.resistance'), # For any resistance column + 'temp_coolant_inlet', + 'temp_cathode_dewpoint_gas', + 'temp_anode_dewpoint_gas', + 'pressure_cathode_inlet', + 'cell_active_area' + ] + + for i, col in enumerate(header): + col = col.strip() + for target in targets: + if (callable(target) and target(col)) or (col == target): + col_name = 'resistance' if callable(target) else col.replace(' ', '_') + col_indices[col_name] = i + break + + # Extract the columns we found + extracted = {} + for name, idx in col_indices.items(): + extracted[name] = data[:, idx] + + return { + 'file_info': filepath, + 'data': extracted, + 'columns_found': list(col_indices.keys()) + } + +def plot_voltages(all_results): + """Generate subplots for each file's current vs elapsed time""" + num_files = len(all_results) + if num_files == 0: + return + + # Determine subplot grid size + cols = min(3, num_files) + rows = (num_files + cols - 1) // cols + + # Create figure with subplots + #plt.figure() + fig, axs = plt.subplots(rows, cols, figsize=(15, 5*rows)) + fig.subplots_adjust(hspace=0.5, wspace=0.3) + + # Flatten axes array if needed + if num_files > 1: + axs = axs.flatten() + else: + axs = [axs] + + for idx, (key, result) in enumerate(all_results.items()): + ax = axs[idx] + data = result['data'] + filename = os.path.basename(result['file_info']) + + if 'elapsed_time' in data and 'current' in data: + ax.plot(data['elapsed_time'], data['cell_voltage_001']) + ax.set_title(filename, fontsize=10) + ax.set_xlabel('Elapsed Time') + ax.set_ylabel('Voltage') + ax.grid(True) + ax1=ax.twinx() + ax1.plot(data['elapsed_time'], data['current'],'r-') + else: + ax.text(0.5, 0.5, 'Missing required columns', + ha='center', va='center') + ax.set_title(filename, fontsize=10) + + # Hide unused subplots + for j in range(len(all_results), len(axs)): + axs[j].axis('off') + + plt.tight_layout() + plt.savefig('results/test_sequence/IV_vs_time.png') + +def plot_Tcells(all_results): + """Generate subplots for each file's current vs elapsed time""" + num_files = len(all_results) + if num_files == 0: + return + + # Determine subplot grid size + cols = min(3, num_files) + rows = (num_files + cols - 1) // cols + + # Create figure with subplots + #plt.figure() + fig, axs = plt.subplots(rows, cols, figsize=(15, 5*rows)) + fig.subplots_adjust(hspace=0.5, wspace=0.3) + + # Flatten axes array if needed + if num_files > 1: + axs = axs.flatten() + else: + axs = [axs] + + for idx, (key, result) in enumerate(all_results.items()): + ax = axs[idx] + data = result['data'] + filename = os.path.basename(result['file_info']) + + if 'elapsed_time' in data and 'current' in data: + ax.plot(data['elapsed_time'], data['temp_anode_dewpoint_gas'],'b-') + ax.plot(data['elapsed_time'], data['temp_cathode_dewpoint_gas'],'y-') + ax.plot(data['elapsed_time'], data['temp_coolant_inlet'],'r-') + ax.set_title(filename, fontsize=10) + ax.set_xlabel('Elapsed Time') + ax.set_ylabel('Temperature') + ##ax.set_ylabel('Pressure') + ax.grid(True) + ax1=ax.twinx() + ax1.plot(data['elapsed_time'], data['pressure_cathode_inlet'],'g-') + else: + ax.text(0.5, 0.5, 'Missing required columns', + ha='center', va='center') + ax.set_title(filename, fontsize=10) + + # Hide unused subplots + for j in range(len(all_results), len(axs)): + axs[j].axis('off') + + plt.tight_layout() + plt.savefig('results/test_sequence/TP_vs_time.png') + +def plot_Pol(all_results,sampleArea): + """Generate subplots for each file's current vs elapsed time""" + num_files = len(all_results) + if num_files == 0: + return + + # Determine subplot grid size + rows = num_files + cols = 5 + + # Create figure with subplots + #plt.figure() + fig, axs = plt.subplots(rows, cols, figsize=(15, 5*rows)) + fig.subplots_adjust(hspace=0.5, wspace=0.3) + + # Flatten axes array if needed + if num_files > 1: + axs = axs.flatten() + else: + axs = [axs] + + Pols={} + for idx, (key, result) in enumerate(all_results.items()): + ax = axs[idx] + data = result['data'] + filename = os.path.basename(result['file_info']) + + if 'elapsed_time' in data and 'current' in data: + time=data['elapsed_time'] + voltage=data['cell_voltage_001'] + ocv=data['cell_voltage_001'][-1] + current=data['current'] + HFR=data['resistance'] + currentSet=data['current_set'] + voltageIR=voltage+HFR*current/1000 + + plt.subplot(rows,5,5*idx+1) + plt.plot(current,voltage)#, 'b-', linewidth=1, label='Current') + plt.title('IV curve') + + plt.subplot(rows,5,5*idx+2) + plt.plot(time,current)#, 'b-', linewidth=1, label='Current') + plt.title('current v.s. time curve') + #plt.plot(np.diff(currentSet)) + + positions = np.where(np.diff(currentSet) < -0.95)[0]+1 + diffs=np.diff(positions) + split_indices = np.where(diffs > 50)[0] + 1 + segs=positions[split_indices] + # print('load points:'+str(len(segs))) + # print(np.diff(time[segs])) + plt.subplot(rows,5,5*idx+3) + plt.plot(time,voltage) + plt.plot(time[segs],voltage[segs],'o')#, 'b-', linewidth=1, label='Current') + plt.title('voltage v.s. time curve') + + + segsVol=np.zeros((len(segs),5)) + s=0 + + + for steps in segs: + volAVG=np.average(voltage[(timetime[steps]-polReportAVG)]) + volIRAVG=np.average(voltageIR[(timetime[steps]-polReportAVG)]) + segsVol[s,1]=volAVG + segsVol[s,2]=volIRAVG + segsVol[s,3]=volIRAVG-volAVG + segsVol[s,0]=np.average(current[(timetime[steps]-polReportAVG)]) + s=s+1 + segsVol[:,4]=segsVol[:,0]/sampleArea + # print(segsVol) + Pols[filename]={ + "OCV (V)":ocv, + "current (A)":segsVol[:,0], + "voltage (V)":segsVol[:,1], + "voltage+IR (V)":segsVol[:,2], + "IR (V)":segsVol[:,3], + "current density (A cm^(-2))":segsVol[:,4] + } + + for row in segsVol: + if abs(row[-1]-1)<0.1: + log.write(' '.join(map(str,row))+'\n') + + plt.subplot(rows,5,5*idx+4) + plt.plot(segsVol[:,0],segsVol[:,1]) + plt.title('voltage v.s. current curve') + + plt.subplot(rows,5,5*idx+5) + plt.plot(segsVol[:,3],segsVol[:,2]) + plt.title('resistance v.s. IR voltage curve') + + plt.xscale('log') + plt.grid(True,which='both') + + else: + ax.text(0.5, 0.5, 'Missing required columns', + ha='center', va='center') + ax.set_title(filename, fontsize=10) + + # Hide unused subplots + #for j in range(len(all_results), len(axs)): + # axs[j].axis('off') + + plt.tight_layout() + plt.savefig('results/polarization/polarization_curves.png') + + with open('results/polarization/polarization_results.json','w') as Polresults: + json.dump(Pols,Polresults,indent=2,cls=NumpyEncoder) + +def find_column_number(file_path, target_column): + # Read the file to find where the data starts + with open(file_path, 'r',encoding='ISO-8859-1') as f: + lines = f.readlines() + + # Find the header line + header_line = None + for i, line in enumerate(lines): + if line.startswith('Time stamp'): + header_line = i + break + + if header_line is None: + raise ValueError("Could not find header line starting with 'Time stamp'") + + # Read just the header row + df = pd.read_csv(file_path, skiprows=header_line, nrows=0, encoding='ISO-8859-1') + column_names = df.columns.tolist() + + # Find the target column (case-sensitive) + try: + column_number = column_names.index(target_column) + 1 # +1 because Python uses 0-based indexing + print(df[10,column_number-1]) + #return column_number + except ValueError: + return None # Column not found + +def areaDetermine(all_results): + num_files = len(all_results) + if num_files == 0: + return + areaAll=np.zeros(num_files) + for idx, (key, result) in enumerate(all_results.items()): + data = result['data'] + areaAll[idx]=np.nanmedian(data['cell_active_area']) + # print(areaAll) + return np.average(areaAll) + +def plot_step_timeline(steps, title="Process Timeline", figsize=(20,10)): + """ + Plots a clean linear timeline of steps with durations. + + Args: + steps (list): List of step dictionaries with: + - "step" (int/str): Step identifier + - "start" (str): ISO format datetime string + - "duration" (float): Duration in minutes + title (str): Plot title + figsize (tuple): Figure dimensions + """ + # Convert strings to datetime objects + for step in steps: + step['start_dt'] = datetime.strptime(step['start'], '%Y-%m-%d %H:%M:%S') + step['end_dt'] = step['start_dt'] + timedelta(minutes=step['duration']) + + # Create figure + fig, ax = plt.subplots(figsize=figsize) + + # Plot each step + for i, step in enumerate(steps): + # Main timeline segment + ax.plot([step['start_dt'], step['end_dt']], + [i, i], + 'o-', + linewidth=3, + markersize=8, + label=step['name']) + + # Duration label + mid_point = step['start_dt'] + timedelta(minutes=step['duration']/2) + ax.text(mid_point, i+0.15, + f"{step['duration']} mins", + ha='center', + bbox=dict(facecolor='white', alpha=0.8)) + + # Formatting + ax.set_title(title, pad=20) + ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M')) + ax.xaxis.set_major_locator(mdates.HourLocator(interval=1)) + ax.yaxis.set_visible(False) + ax.grid(axis='x', linestyle='--', alpha=0.4) + # Move legend to bottom (with 2 columns if many steps) + #ncol = 2 if len(steps) > 3 else 1 + ax.legend( + loc='upper left', + #bbox_to_anchor=(0.5, -0.15), # Position below plot + ncol=1, + frameon=False + ) + + + plt.tight_layout() + plt.savefig('results/test_sequence/test_sequence.png') + +def main(): + log.write("CSV Start Time Analyzer\n") + log.write("=" * 40) + # script_dir = os.path.dirname(os.path.abspath(__file__)) + script_dir=os.getcwd() + + ############################################################## + ############### test squence analyzer######################### + ## detect all of the csv files and find start time from them ## + ############################################################## + csv_files = find_csv_files(script_dir) + if not csv_files: + log.write("\nNo CSV files found in directory tree\n") + else: + log.write(f"\nFound {len(csv_files)} CSV files\n") + + results = extract_start_times(csv_files) + if not results: + log.write("No valid start times found in lines 6-8\n") + else: + log.write(f"Found {len(results)} files with valid start times:\n") + log.write("=" * 80) + + + all_results = {} + steps_data = [] + for i, res in enumerate(results, 1): + rel_path = os.path.relpath(res['path'], script_dir) + result = extract_data_from_file(rel_path) + all_results[f"A{i}"] = result + data = result['data'] + duration=np.max(data['elapsed_time'])/60 + + log.write(f"\n{i}. {rel_path}") + # log.write(f" Line {res['line']}: Start time, {res['time_str']}") + log.write(f"\tParsed: {res['time'].strftime('%Y-%m-%d %H:%M:%S')}\n") + log.write(f"\tDuration: {duration:.1f} mins\n") + log.write(f"\tFound columns: {', '.join(result['columns_found'])}\n") + log.write("-" * 80) + # Append structured data to list + steps_data.append({ + "step": i, + "start": res['time'].strftime('%Y-%m-%d %H:%M:%S'), + "duration": float(f"{duration:.1f}"), + "name": f"Step {i}: {rel_path}" + }) + with open('results/test_sequence/all_csv_results_in_timeline.json','w') as all_json: + json.dump(all_results,all_json,indent=2,cls=NumpyEncoder) + all_json.close() + + time_line={} + total_time=0 + for step in steps_data: + starttime = datetime.strptime(step['start'], '%Y-%m-%d %H:%M:%S') + endtime = starttime + timedelta(minutes=step['duration']) + endtime_str = endtime.strftime('%Y-%m-%d %H:%M:%S') + step["end"]=endtime_str + time_line[str(step["step"])]={"start_time":step["start"], + "end_time":step["end"], + "duration (mins)":step["duration"], + "file_path":step["name"]} + total_time+=step["duration"] + time_line["total_time (mins)"]=total_time + with open('results/test_sequence/test_order_in_timeline.json','w') as timeline: + json.dump(time_line,timeline,indent=2,cls=NumpyEncoder) + timeline.close() + + #plot test sequence + plot_step_timeline(steps_data, title="Test sequence") + plot_voltages(all_results) + plot_Tcells(all_results) + + ########################################################## + ############### polarization analyzer##################### + ########################################################## + csv_files = find_csv_pol_files(script_dir) + log.write('\n'+"**" * 80) + log.write(f"\nFound {len(csv_files)} Pol CSV files") + results = extract_start_times(csv_files) + + all_pol_results={} + step_pol_data={} + log.write(f"Found {len(results)} Pol files with valid start times:\n") + log.write("=" * 80) + for i, res in enumerate(results, 1): + rel_path = os.path.relpath(res['path'], script_dir) + result = extract_data_from_file(rel_path) + data = result['data'] + duration=np.max(data['elapsed_time'])/60 + all_pol_results[f"A{i}"] = result + log.write(f"\n{i}. {rel_path}") + log.write(f"\tLine {res['line']}: Start time, {res['time_str']}\n") + log.write(f"\tParsed: {res['time'].strftime('%Y-%m-%d %H:%M:%S')}\n") + log.write(f"\tFound columns: {', '.join(result['columns_found'])}\n") + log.write("-" * 80) + endtime = res['time'] + timedelta(minutes=duration) + endtime_str = endtime.strftime('%Y-%m-%d %H:%M:%S') + step_data={ + "start_time":res['time'].strftime('%Y-%m-%d %H:%M:%S'), + "end_time":endtime_str, + "duration (mins)":float(f"{duration:.1f}"), + "file_path":f"Step {i}: {rel_path}" + } + step_pol_data[str(i)]=step_data + #plt.figure(1) + sampleSize=areaDetermine(all_pol_results) + step_pol_data["sampleSize (cm2)"]=sampleSize + + log.write(f"\nConfirmed the sample size is {sampleSize} cm2") + with open('results/polarization/all_pol_results.json','w') as all_pol_json: + json.dump(all_pol_results,all_pol_json,indent=2,cls=NumpyEncoder) + all_json.close() + + with open('results/polarization/polarization.json','w') as pol: + json.dump(step_pol_data,pol,indent=2) + + plot_Pol(all_pol_results,sampleSize) + + +if __name__ == '__main__': + #input zone# + searchKey='*Pol*-*.csv' + polReportAVG=30 # secs + os.makedirs('logs',exist_ok=True) + os.makedirs('results/test_sequence',exist_ok=True) + os.makedirs('results/polarization',exist_ok=True) + log=open('logs/test_sequence.log','w') + main() + log.close() diff --git a/meatools/meatools/test_squence.py b/meatools/meatools/test_squence.py new file mode 100644 index 0000000..a63985a --- /dev/null +++ b/meatools/meatools/test_squence.py @@ -0,0 +1,563 @@ +#!/usr/bin/env python +import os,json +from datetime import datetime, timedelta +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +from glob import glob +from datetime import datetime +import pandas as pd +# plt.rcParams['font.sans-serif'] = ['SimHei'] +plt.rcParams['font.sans-serif'] = ['Arial Unicode MS'] +plt.rcParams['axes.unicode_minus'] = False + + +""" +When I wrote this code, only God and I know it. +Now ... +ONLY God knows... +""" + + +class NumpyEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.generic): + return obj.item() + return super().default(obj) + +def find_csv_files(root_path): + """Find all CSV files recursively with relative paths""" + return [os.path.join(root, f) + for root, _, files in os.walk(root_path) + for f in files if f.lower().endswith('.csv')] + +def find_csv_pol_files(root_path): + files = glob(os.path.join(root_path, '**', searchKey), recursive=True) + + # Create list of tuples (mtime, filepath) + #file_info = [(os.path.getmtime(f), f) for f in files] + + # Sort by mtime (first element of tuple) + #file_info.sort(reverse=False) + + return files #file_info + +def extract_start_times(csv_files): + """Extract start times from files with validation""" + results = [] + + for filepath in csv_files: + try: + with open(filepath, 'r', encoding='ISO-8859-1') as f: + lines = [line.strip() for line in f.readlines()] + + for line_num in [5, 6, 7]: # Lines 6-8 (0-indexed 5-7) + if line_num >= len(lines): + continue + + parts = lines[line_num].split(',') or lines[line_num].split('\\t') + if len(parts) >= 2 and parts[0].strip() == 'Start time': + time_str = parts[1].strip() + try: + time_obj = datetime.strptime(time_str, '%m/%d/%y %H:%M:%S') + results.append({ + 'path': filepath, + 'time': time_obj, + 'time_str': time_str, + 'line': line_num + 1 + }) + break + except ValueError: + continue + except Exception as e: + log.write(f"\nError processing {os.path.basename(filepath)}: {e}\n") + + return sorted(results, key=lambda x: x['time']) + +def extract_data_from_file(filepath): + #print('active area col #'+str(find_column_number(filepath,'cell_active_area'))) + with open(filepath, 'r', encoding='ISO-8859-1') as f: + lines = [line.strip() for line in f.readlines()] + + # Find "Time stamp" line (case-insensitive) + data_start = None + for i, line in enumerate(lines): + if line.lower().startswith('time stamp'): + data_start = i + break + # print(data_start) + if data_start is None: + log.write(f"\nNo 'Time stamp' line found in {filepath}\n") + return None + + # Load data with numpy + data = np.genfromtxt(filepath, + delimiter=',', + skip_header=data_start+1, invalid_raise='false',encoding='ISO-8859-1') + + # Get header line to find column indices + header = lines[data_start].lower().split(',') + col_indices = {} + + # Find our target columns + targets = [ + 'elapsed time', + 'current', + 'current_set', + 'cell_voltage_001', + lambda x: x.endswith('.resistance'), # For any resistance column + 'temp_coolant_inlet', + 'temp_cathode_dewpoint_gas', + 'temp_anode_dewpoint_gas', + 'pressure_cathode_inlet', + 'cell_active_area' + ] + + for i, col in enumerate(header): + col = col.strip() + for target in targets: + if (callable(target) and target(col)) or (col == target): + col_name = 'resistance' if callable(target) else col.replace(' ', '_') + col_indices[col_name] = i + break + + # Extract the columns we found + extracted = {} + for name, idx in col_indices.items(): + extracted[name] = data[:, idx] + + return { + 'file_info': filepath, + 'data': extracted, + 'columns_found': list(col_indices.keys()) + } + +def plot_voltages(all_results): + """Generate subplots for each file's current vs elapsed time""" + num_files = len(all_results) + if num_files == 0: + return + + # Determine subplot grid size + cols = min(3, num_files) + rows = (num_files + cols - 1) // cols + + # Create figure with subplots + #plt.figure() + fig, axs = plt.subplots(rows, cols, figsize=(15, 5*rows)) + fig.subplots_adjust(hspace=0.5, wspace=0.3) + + # Flatten axes array if needed + if num_files > 1: + axs = axs.flatten() + else: + axs = [axs] + + for idx, (key, result) in enumerate(all_results.items()): + ax = axs[idx] + data = result['data'] + filename = os.path.basename(result['file_info']) + + if 'elapsed_time' in data and 'current' in data: + ax.plot(data['elapsed_time'], data['cell_voltage_001']) + ax.set_title(filename, fontsize=10) + ax.set_xlabel('Elapsed Time') + ax.set_ylabel('Voltage') + ax.grid(True) + ax1=ax.twinx() + ax1.plot(data['elapsed_time'], data['current'],'r-') + else: + ax.text(0.5, 0.5, 'Missing required columns', + ha='center', va='center') + ax.set_title(filename, fontsize=10) + + # Hide unused subplots + for j in range(len(all_results), len(axs)): + axs[j].axis('off') + + plt.tight_layout() + plt.savefig('results/test_sequence/IV_vs_time.png') + +def plot_Tcells(all_results): + """Generate subplots for each file's current vs elapsed time""" + num_files = len(all_results) + if num_files == 0: + return + + # Determine subplot grid size + cols = min(3, num_files) + rows = (num_files + cols - 1) // cols + + # Create figure with subplots + #plt.figure() + fig, axs = plt.subplots(rows, cols, figsize=(15, 5*rows)) + fig.subplots_adjust(hspace=0.5, wspace=0.3) + + # Flatten axes array if needed + if num_files > 1: + axs = axs.flatten() + else: + axs = [axs] + + for idx, (key, result) in enumerate(all_results.items()): + ax = axs[idx] + data = result['data'] + filename = os.path.basename(result['file_info']) + + if 'elapsed_time' in data and 'current' in data: + ax.plot(data['elapsed_time'], data['temp_anode_dewpoint_gas'],'b-') + ax.plot(data['elapsed_time'], data['temp_cathode_dewpoint_gas'],'y-') + ax.plot(data['elapsed_time'], data['temp_coolant_inlet'],'r-') + ax.set_title(filename, fontsize=10) + ax.set_xlabel('Elapsed Time') + ax.set_ylabel('Temperature') + ##ax.set_ylabel('Pressure') + ax.grid(True) + ax1=ax.twinx() + ax1.plot(data['elapsed_time'], data['pressure_cathode_inlet'],'g-') + else: + ax.text(0.5, 0.5, 'Missing required columns', + ha='center', va='center') + ax.set_title(filename, fontsize=10) + + # Hide unused subplots + for j in range(len(all_results), len(axs)): + axs[j].axis('off') + + plt.tight_layout() + plt.savefig('results/test_sequence/TP_vs_time.png') + +def plot_Pol(all_results,sampleArea): + """Generate subplots for each file's current vs elapsed time""" + num_files = len(all_results) + if num_files == 0: + return + + # Determine subplot grid size + rows = num_files + cols = 5 + + # Create figure with subplots + #plt.figure() + fig, axs = plt.subplots(rows, cols, figsize=(15, 5*rows)) + fig.subplots_adjust(hspace=0.5, wspace=0.3) + + # Flatten axes array if needed + if num_files > 1: + axs = axs.flatten() + else: + axs = [axs] + + Pols={} + for idx, (key, result) in enumerate(all_results.items()): + ax = axs[idx] + data = result['data'] + filename = os.path.basename(result['file_info']) + + if 'elapsed_time' in data and 'current' in data: + time=data['elapsed_time'] + voltage=data['cell_voltage_001'] + ocv=data['cell_voltage_001'][-1] + current=data['current'] + HFR=data['resistance'] + currentSet=data['current_set'] + voltageIR=voltage+HFR*current/1000 + + plt.subplot(rows,5,5*idx+1) + plt.plot(current,voltage)#, 'b-', linewidth=1, label='Current') + plt.title('IV curve') + + plt.subplot(rows,5,5*idx+2) + plt.plot(time,current)#, 'b-', linewidth=1, label='Current') + plt.title('current v.s. time curve') + #plt.plot(np.diff(currentSet)) + + positions = np.where(np.diff(currentSet) < -0.95)[0]+1 + diffs=np.diff(positions) + split_indices = np.where(diffs > 50)[0] + 1 + segs=positions[split_indices] + # print('load points:'+str(len(segs))) + # print(np.diff(time[segs])) + plt.subplot(rows,5,5*idx+3) + plt.plot(time,voltage) + plt.plot(time[segs],voltage[segs],'o')#, 'b-', linewidth=1, label='Current') + plt.title('voltage v.s. time curve') + + + segsVol=np.zeros((len(segs),5)) + s=0 + + + for steps in segs: + volAVG=np.average(voltage[(timetime[steps]-polReportAVG)]) + volIRAVG=np.average(voltageIR[(timetime[steps]-polReportAVG)]) + segsVol[s,1]=volAVG + segsVol[s,2]=volIRAVG + segsVol[s,3]=volIRAVG-volAVG + segsVol[s,0]=np.average(current[(timetime[steps]-polReportAVG)]) + s=s+1 + segsVol[:,4]=segsVol[:,0]/sampleArea + # print(segsVol) + Pols[filename]={ + "OCV (V)":ocv, + "current (A)":segsVol[:,0], + "voltage (V)":segsVol[:,1], + "voltage+IR (V)":segsVol[:,2], + "IR (V)":segsVol[:,3], + "current density (A cm^(-2))":segsVol[:,4] + } + + for row in segsVol: + if abs(row[-1]-1)<0.1: + log.write(' '.join(map(str,row))+'\n') + + plt.subplot(rows,5,5*idx+4) + plt.plot(segsVol[:,0],segsVol[:,1]) + plt.title('voltage v.s. current curve') + + plt.subplot(rows,5,5*idx+5) + plt.plot(segsVol[:,3],segsVol[:,2]) + plt.title('resistance v.s. IR voltage curve') + + plt.xscale('log') + plt.grid(True,which='both') + + else: + ax.text(0.5, 0.5, 'Missing required columns', + ha='center', va='center') + ax.set_title(filename, fontsize=10) + + # Hide unused subplots + #for j in range(len(all_results), len(axs)): + # axs[j].axis('off') + + plt.tight_layout() + plt.savefig('results/polarization/polarization_curves.png') + + with open('results/polarization/polarization_results.json','w') as Polresults: + json.dump(Pols,Polresults,indent=2,cls=NumpyEncoder) + +def find_column_number(file_path, target_column): + # Read the file to find where the data starts + with open(file_path, 'r',encoding='ISO-8859-1') as f: + lines = f.readlines() + + # Find the header line + header_line = None + for i, line in enumerate(lines): + if line.startswith('Time stamp'): + header_line = i + break + + if header_line is None: + raise ValueError("Could not find header line starting with 'Time stamp'") + + # Read just the header row + df = pd.read_csv(file_path, skiprows=header_line, nrows=0, encoding='ISO-8859-1') + column_names = df.columns.tolist() + + # Find the target column (case-sensitive) + try: + column_number = column_names.index(target_column) + 1 # +1 because Python uses 0-based indexing + print(df[10,column_number-1]) + #return column_number + except ValueError: + return None # Column not found + +def areaDetermine(all_results): + num_files = len(all_results) + if num_files == 0: + return + areaAll=np.zeros(num_files) + for idx, (key, result) in enumerate(all_results.items()): + data = result['data'] + areaAll[idx]=np.nanmedian(data['cell_active_area']) + # print(areaAll) + return np.average(areaAll) + +def plot_step_timeline(steps, title="Process Timeline", figsize=(20,10)): + """ + Plots a clean linear timeline of steps with durations. + + Args: + steps (list): List of step dictionaries with: + - "step" (int/str): Step identifier + - "start" (str): ISO format datetime string + - "duration" (float): Duration in minutes + title (str): Plot title + figsize (tuple): Figure dimensions + """ + # Convert strings to datetime objects + for step in steps: + step['start_dt'] = datetime.strptime(step['start'], '%Y-%m-%d %H:%M:%S') + step['end_dt'] = step['start_dt'] + timedelta(minutes=step['duration']) + + # Create figure + fig, ax = plt.subplots(figsize=figsize) + + # Plot each step + for i, step in enumerate(steps): + # Main timeline segment + ax.plot([step['start_dt'], step['end_dt']], + [i, i], + 'o-', + linewidth=3, + markersize=8, + label=step['name']) + + # Duration label + mid_point = step['start_dt'] + timedelta(minutes=step['duration']/2) + ax.text(mid_point, i+0.15, + f"{step['duration']} mins", + ha='center', + bbox=dict(facecolor='white', alpha=0.8)) + + # Formatting + ax.set_title(title, pad=20) + ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M')) + ax.xaxis.set_major_locator(mdates.HourLocator(interval=1)) + ax.yaxis.set_visible(False) + ax.grid(axis='x', linestyle='--', alpha=0.4) + # Move legend to bottom (with 2 columns if many steps) + #ncol = 2 if len(steps) > 3 else 1 + ax.legend( + loc='upper left', + #bbox_to_anchor=(0.5, -0.15), # Position below plot + ncol=1, + frameon=False + ) + + + plt.tight_layout() + plt.savefig('results/test_sequence/test_sequence.png') + +def main(): + log.write("CSV Start Time Analyzer\n") + log.write("=" * 40) + script_dir = os.path.dirname(os.path.abspath(__file__)) + + ############################################################## + ############### test squence analyzer######################### + ## detect all of the csv files and find start time from them ## + ############################################################## + csv_files = find_csv_files(script_dir) + if not csv_files: + log.write("\nNo CSV files found in directory tree\n") + else: + log.write(f"\nFound {len(csv_files)} CSV files\n") + + results = extract_start_times(csv_files) + if not results: + log.write("No valid start times found in lines 6-8\n") + else: + log.write(f"Found {len(results)} files with valid start times:\n") + log.write("=" * 80) + + + all_results = {} + steps_data = [] + for i, res in enumerate(results, 1): + rel_path = os.path.relpath(res['path'], script_dir) + result = extract_data_from_file(rel_path) + all_results[f"A{i}"] = result + data = result['data'] + duration=np.max(data['elapsed_time'])/60 + + log.write(f"\n{i}. {rel_path}") + # log.write(f" Line {res['line']}: Start time, {res['time_str']}") + log.write(f"\tParsed: {res['time'].strftime('%Y-%m-%d %H:%M:%S')}\n") + log.write(f"\tDuration: {duration:.1f} mins\n") + log.write(f"\tFound columns: {', '.join(result['columns_found'])}\n") + log.write("-" * 80) + # Append structured data to list + steps_data.append({ + "step": i, + "start": res['time'].strftime('%Y-%m-%d %H:%M:%S'), + "duration": float(f"{duration:.1f}"), + "name": f"Step {i}: {rel_path}" + }) + with open('results/test_sequence/all_csv_results_in_timeline.json','w') as all_json: + json.dump(all_results,all_json,indent=2,cls=NumpyEncoder) + all_json.close() + + time_line={} + total_time=0 + for step in steps_data: + starttime = datetime.strptime(step['start'], '%Y-%m-%d %H:%M:%S') + endtime = starttime + timedelta(minutes=step['duration']) + endtime_str = endtime.strftime('%Y-%m-%d %H:%M:%S') + step["end"]=endtime_str + time_line[str(step["step"])]={"start_time":step["start"], + "end_time":step["end"], + "duration (mins)":step["duration"], + "file_path":step["name"]} + total_time+=step["duration"] + time_line["total_time (mins)"]=total_time + with open('results/test_sequence/test_order_in_timeline.json','w') as timeline: + json.dump(time_line,timeline,indent=2,cls=NumpyEncoder) + timeline.close() + + #plot test sequence + plot_step_timeline(steps_data, title="Test sequence") + plot_voltages(all_results) + plot_Tcells(all_results) + + ########################################################## + ############### polarization analyzer##################### + ########################################################## + csv_files = find_csv_pol_files(script_dir) + log.write('\n'+"**" * 80) + log.write(f"\nFound {len(csv_files)} Pol CSV files") + results = extract_start_times(csv_files) + + all_pol_results={} + step_pol_data={} + log.write(f"Found {len(results)} Pol files with valid start times:\n") + log.write("=" * 80) + for i, res in enumerate(results, 1): + rel_path = os.path.relpath(res['path'], script_dir) + result = extract_data_from_file(rel_path) + data = result['data'] + duration=np.max(data['elapsed_time'])/60 + all_pol_results[f"A{i}"] = result + log.write(f"\n{i}. {rel_path}") + log.write(f"\tLine {res['line']}: Start time, {res['time_str']}\n") + log.write(f"\tParsed: {res['time'].strftime('%Y-%m-%d %H:%M:%S')}\n") + log.write(f"\tFound columns: {', '.join(result['columns_found'])}\n") + log.write("-" * 80) + endtime = res['time'] + timedelta(minutes=duration) + endtime_str = endtime.strftime('%Y-%m-%d %H:%M:%S') + step_data={ + "start_time":res['time'].strftime('%Y-%m-%d %H:%M:%S'), + "end_time":endtime_str, + "duration (mins)":float(f"{duration:.1f}"), + "file_path":f"Step {i}: {rel_path}" + } + step_pol_data[str(i)]=step_data + #plt.figure(1) + sampleSize=areaDetermine(all_pol_results) + step_pol_data["sampleSize (cm2)"]=sampleSize + + log.write(f"\nConfirmed the sample size is {sampleSize} cm2") + with open('results/polarization/all_pol_results.json','w') as all_pol_json: + json.dump(all_pol_results,all_pol_json,indent=2,cls=NumpyEncoder) + all_json.close() + + with open('results/polarization/polarization.json','w') as pol: + json.dump(step_pol_data,pol,indent=2) + + plot_Pol(all_pol_results,sampleSize) + + +if __name__ == '__main__': + #input zone# + searchKey='*Pol*-*.csv' + polReportAVG=30 # secs + os.makedirs('logs',exist_ok=True) + os.makedirs('results/test_sequence',exist_ok=True) + os.makedirs('results/polarization',exist_ok=True) + log=open('logs/test_sequence.log','w') + main() + log.close() diff --git a/meatools/setup.py b/meatools/setup.py new file mode 100644 index 0000000..57846e8 --- /dev/null +++ b/meatools/setup.py @@ -0,0 +1,22 @@ +from setuptools import setup,find_packages + +setup( + name="meatools", + version="0.3.2", + author='Jiangyuan John Feng, Shicheng Xu', + description='', + requires=[ + "python", + "numpy", + "cantera", + "matplotlib", + "scipy", + "scikit-learn" + ], + packages=find_packages(), + entry_points={ + "console_scripts": [ + "mea = meatools.cli:main" + ] + } +) \ No newline at end of file diff --git a/setup.py b/setup.py index 5effed9..bc0305a 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="meatools", - version="0.1.0", + version="0.3.2", author='Shicheng Xu,Jiangyuan John Feng', description='', requires=[