diff --git a/Framework/Built_In_Automation/Sequential_Actions/common_functions.py b/Framework/Built_In_Automation/Sequential_Actions/common_functions.py index ed616398..c2b361a6 100755 --- a/Framework/Built_In_Automation/Sequential_Actions/common_functions.py +++ b/Framework/Built_In_Automation/Sequential_Actions/common_functions.py @@ -5900,37 +5900,6 @@ def data_store_read(data_set): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME - import logging - from datetime import datetime - from settings import ZEUZ_NODE_ARTIFACTS_DIR - - log_dir = ZEUZ_NODE_ARTIFACTS_DIR / "data_store_logs" - log_dir.mkdir(parents=True, exist_ok=True) - - current_time = datetime.now() - log_filename = f"data_store_read_{current_time.strftime('%Y-%m-%d_%H')}.log" - log_file = log_dir / log_filename - - logger = logging.getLogger(f"data_store_read_{current_time.strftime('%Y%m%d%H')}") - logger.setLevel(logging.DEBUG) - - # Remove existing handlers to avoid duplicate logs - if logger.handlers: - logger.handlers.clear() - - handler = logging.FileHandler(str(log_file), mode='a') - handler.setFormatter(logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - )) - logger.addHandler(handler) - - logger.info("="*80) - logger.info(f"Function data_store_read called") - logger.info(f"Module Info: {sModuleInfo}") - logger.info(f"Received data_set: {data_set}") - - - try: test_id = None node_id = sr.Get_Shared_Variables("node_id") or None @@ -5942,104 +5911,71 @@ def data_store_read(data_set): test_id = current_tc["testcase_no"] table_name = columns = var_name = "" params = { - 'test_id': test_id, - 'node_id': node_id, - 'action_no': action_no, - 'step_no': step_no + "test_id": test_id, + "node_id": node_id, + "action_no": action_no, + "step_no": step_no, } - logger.debug("Starting to parse data_set") for left, mid, right in data_set: - logger.debug(f"Processing row: left='{left}', mid='{mid}', right='{right}'") - if left.strip() == 'table name': + if left.strip() == "table name": table_name = right.strip() - params['table_name'] = table_name - logger.info(f"Table name set to: {table_name}") - if left.strip() == 'where': + params["table_name"] = table_name + if left.strip() == "where": q = right.strip() - logger.info(f"Processing WHERE clause: {q}") - # q = re.sub(r"\band\b",",",q) - # q = re.sub(r"\bor\b",",",q) - logic=[] + logic = [] for s in q.split(" "): - if s=='and': - logic.append('and') - elif s=='or': - logic.append('or') - logger.debug(f"Detected logic operators: {logic}") + if s == "and": + logic.append("and") + elif s == "or": + logic.append("or") q = right.strip() - q = re.sub(r"\band\b",",",q) - q = re.sub(r"\bor\b",",",q) - temp= q.split(',') - logger.debug(f"Split WHERE clause into: {temp}") - t = temp[0].split('=') - params['and_' + t[0].strip()] = [t[1].strip()] + q = re.sub(r"\band\b", ",", q) + q = re.sub(r"\bor\b", ",", q) + temp = q.split(",") + t = temp[0].split("=") + params["and_" + t[0].strip()] = [t[1].strip()] i = 1 - j=0 + j = 0 for s in temp[1:]: - if logic[j] == 'and': - t = temp[i].split('=') - if 'and_' + t[0].strip() not in params: - params['and_' + t[0].strip()] = [t[1].strip()] - else:params['and_' + t[0].strip()].append(t[1].strip()) - i+=1 - j+=1 - elif logic[j] == 'or': - t = temp[i].split('=') - if 'or_' + t[0].strip() not in params: - params['or_' + t[0].strip()] = [t[1].strip()] - else:params['or_' + t[0].strip()].append(t[1].strip()) - + if logic[j] == "and": + t = temp[i].split("=") + if "and_" + t[0].strip() not in params: + params["and_" + t[0].strip()] = [t[1].strip()] + else: + params["and_" + t[0].strip()].append(t[1].strip()) i += 1 - j+=1 - logger.info(f"Final WHERE params: {params}") + j += 1 + elif logic[j] == "or": + t = temp[i].split("=") + if "or_" + t[0].strip() not in params: + params["or_" + t[0].strip()] = [t[1].strip()] + else: + params["or_" + t[0].strip()].append(t[1].strip()) + i += 1 + j += 1 if mid.strip() == "action": var_name = right.strip() - logger.info(f"Variable name set to: {var_name}") - - logger.info(f"Final table_name: {table_name}") - logger.info(f"Final params: {params}") - logger.info(f"Final var_name: {var_name}") - - logger.debug("Preparing request headers") + headers = RequestFormatter.add_api_key_to_headers({}) - headers['headers']['content-type'] = 'application/json' - headers['headers']['X-API-KEY'] = ConfigModule.get_config_value("Authentication", "api-key") - logger.debug(f"Headers prepared (API key masked)") - - logger.info(f"Making GET request to data_store/data_store/custom_operation/") - logger.debug(f"Request params: {json.dumps(params)}") - - res = RequestFormatter.request("get", - RequestFormatter.form_uri('data_store/data_store/custom_operation/'), + headers["headers"]["content-type"] = "application/json" + headers["headers"]["X-API-KEY"] = ConfigModule.get_config_value("Authentication", "api-key") + + res = RequestFormatter.request( + "get", + RequestFormatter.form_uri("data_store/data_store/custom_operation/"), params=json.dumps(params), verify=False, - **headers + **headers, ) - - logger.info(f"Response status code: {res.status_code}") - logger.debug(f"Response text: {res.text}") - + if res.status_code == 200: response_json = json.loads(res.text) response_json = response_json["data"] - logger.info(f"Successfully retrieved {len(response_json) if isinstance(response_json, (list, dict)) else 'N/A'} items from datastore") - logger.debug(f"Response data: {response_json}") - result = sr.Set_Shared_Variables(var_name, response_json, pretty=True) - logger.info(f"Data stored in variable '{var_name}' successfully") - logger.info("="*80) - return result - else: - CommonUtil.ExecLog(sModuleInfo, "No data found, please check your dataset", 1) - logger.warning("No data found in datastore, please check your dataset") - logger.info("="*80) - return "zeuz_failed" - return "passed" + return sr.Set_Shared_Variables(var_name, response_json, pretty=True) + return "zeuz_failed" - except Exception as e: - logger.error(f"Exception occurred: {str(e)}", exc_info=True) - logger.error(f"Traceback: {traceback.format_exc()}") - logger.info("="*80) - return CommonUtil.Exception_Handler(sys.exc_info()) + except Exception: + return "zeuz_failed" def data_store_get_data(data_set): try: @@ -6141,35 +6077,6 @@ def data_store_write(data_set): sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME - import logging - from datetime import datetime - from settings import ZEUZ_NODE_ARTIFACTS_DIR - - log_dir = ZEUZ_NODE_ARTIFACTS_DIR / "data_store_logs" - log_dir.mkdir(parents=True, exist_ok=True) - - current_time = datetime.now() - log_filename = f"data_store_write_{current_time.strftime('%Y-%m-%d_%H')}.log" - log_file = log_dir / log_filename - - logger = logging.getLogger(f"data_store_write_{current_time.strftime('%Y%m%d%H')}") - logger.setLevel(logging.DEBUG) - - # Remove existing handlers to avoid duplicate logs - if logger.handlers: - logger.handlers.clear() - - handler = logging.FileHandler(str(log_file), mode='a') - handler.setFormatter(logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - )) - logger.addHandler(handler) - - logger.info("="*80) - logger.info("Function data_store_write called") - logger.info(f"Module Info: {sModuleInfo}") - logger.info(f"Received data_set: {data_set}") - try: test_id = None node_id = sr.Get_Shared_Variables("node_id") or None @@ -6181,122 +6088,76 @@ def data_store_write(data_set): test_id = current_tc["testcase_no"] table_name = columns = var_name = "" params = { - 'test_id': test_id, - 'node_id': node_id, - 'action_no': action_no, - 'step_no': step_no + "test_id": test_id, + "node_id": node_id, + "action_no": action_no, + "step_no": step_no, } - data={} - logger.debug("Starting to parse data_set") + data = {} for left, mid, right in data_set: - logger.debug(f"Processing row: left='{left}', mid='{mid}', right='{right}'") - if left.strip() == 'table name': + if left.strip() == "table name": table_name = right.strip() - params['table_name'] = table_name - logger.info(f"Table name set to: {table_name}") - if left.strip() == 'where': + params["table_name"] = table_name + if left.strip() == "where": q = right.strip() - logger.info(f"Processing WHERE clause: {q}") - # q = re.sub(r"\band\b",",",q) - # q = re.sub(r"\bor\b",",",q) - logic=[] + logic = [] for s in q.split(" "): - if s=='and': - logic.append('and') - elif s=='or': - logic.append('or') - logger.debug(f"Detected logic operators: {logic}") + if s == "and": + logic.append("and") + elif s == "or": + logic.append("or") q = right.strip() - q = re.sub(r"\band\b",",",q) - q = re.sub(r"\bor\b",",",q) - temp= q.split(',') - logger.debug(f"Split WHERE clause into: {temp}") - t = temp[0].split('=') - params['and_' + t[0].strip()] = [t[1].strip()] + q = re.sub(r"\band\b", ",", q) + q = re.sub(r"\bor\b", ",", q) + temp = q.split(",") + t = temp[0].split("=") + params["and_" + t[0].strip()] = [t[1].strip()] i = 1 - j=0 + j = 0 for s in temp[1:]: - if logic[j] == 'and': - t = temp[i].split('=') - if 'and_' + t[0].strip() not in params: - params['and_' + t[0].strip()] = [t[1].strip()] - else:params['and_' + t[0].strip()].append(t[1].strip()) - i+=1 - j+=1 - elif logic[j] == 'or': - t = temp[i].split('=') - if 'or_' + t[0].strip() not in params: - params['or_' + t[0].strip()] = [t[1].strip()] - else:params['or_' + t[0].strip()].append(t[1].strip()) - + if logic[j] == "and": + t = temp[i].split("=") + if "and_" + t[0].strip() not in params: + params["and_" + t[0].strip()] = [t[1].strip()] + else: + params["and_" + t[0].strip()].append(t[1].strip()) i += 1 - j+=1 - logger.info(f"Final WHERE params: {params}") - if left.strip() == 'data': + j += 1 + elif logic[j] == "or": + t = temp[i].split("=") + if "or_" + t[0].strip() not in params: + params["or_" + t[0].strip()] = [t[1].strip()] + else: + params["or_" + t[0].strip()].append(t[1].strip()) + i += 1 + j += 1 + if left.strip() == "data": temp = [right.strip()] - logger.debug(f"Processing data field: {temp}") - print(temp) for t in temp: - tt = t.split('=', 1) - logger.debug(f"Split data into: {tt}") - print(tt) - data[tt[0].strip()]=tt[1].strip() - logger.debug(f"Data field '{tt[0].strip()}' = '{tt[1].strip()}'") - print(data[tt[0].strip()]) - logger.info(f"Complete data dict: {data}") + tt = t.split("=", 1) + data[tt[0].strip()] = tt[1].strip() if mid.strip() == "action": var_name = right.strip() - logger.info(f"Variable name set to: {var_name}") - - logger.info(f"Final table_name: {table_name}") - logger.info(f"Final params: {params}") - logger.info(f"Final data: {data}") - logger.info(f"Final var_name: {var_name}") - logger.debug("Preparing request headers") headers = RequestFormatter.add_api_key_to_headers({}) - headers['headers']['content-type'] = 'application/json' - headers['headers']['X-API-KEY'] = ConfigModule.get_config_value("Authentication", "api-key") - logger.debug("Headers prepared (API key masked)") - - logger.info("Making PATCH request to data_store/data_store/custom_operation/") - logger.debug(f"Request params: {json.dumps(params)}") - logger.debug(f"Request data: {json.dumps(data)}") - + headers["headers"]["content-type"] = "application/json" + headers["headers"]["X-API-KEY"] = ConfigModule.get_config_value("Authentication", "api-key") + res = requests.patch( - RequestFormatter.form_uri('data_store/data_store/custom_operation/'), + RequestFormatter.form_uri("data_store/data_store/custom_operation/"), params=json.dumps(params), data=json.dumps(data), verify=False, - **headers + **headers, ) - - logger.info(f"Response status code: {res.status_code}") - logger.debug(f"Response text: {res.text}") - # - # print(res.text) if res.status_code == 200: - # CommonUtil.ExecLog(sModuleInfo, f"Captured following output:\n{res.text}", 1) response_json = json.loads(res.text) - logger.info(f"Successfully updated datastore") - logger.debug(f"Response data: {response_json}") - result = sr.Set_Shared_Variables(var_name, response_json, pretty=True) - logger.info(f"Response stored in variable '{var_name}' successfully") - logger.info("="*80) - return result - else: - CommonUtil.ExecLog(sModuleInfo, "No data found to update , please check your dataset", 1) - logger.warning("No data found to update in datastore, please check your dataset") - logger.info("="*80) - return "zeuz_failed" - return "passed" + return sr.Set_Shared_Variables(var_name, response_json, pretty=True) + return "zeuz_failed" - except Exception as e: - logger.error(f"Exception occurred: {str(e)}", exc_info=True) - logger.error(f"Traceback: {traceback.format_exc()}") - logger.info("="*80) - return CommonUtil.Exception_Handler(sys.exc_info()) + except Exception: + return "zeuz_failed" def data_store_overwrite(data_set): diff --git a/Framework/Built_In_Automation/Web/Selenium/utils.py b/Framework/Built_In_Automation/Web/Selenium/utils.py index 22431b76..3c5de425 100644 --- a/Framework/Built_In_Automation/Web/Selenium/utils.py +++ b/Framework/Built_In_Automation/Web/Selenium/utils.py @@ -17,7 +17,9 @@ from datetime import timedelta import struct import urllib.request +from contextlib import contextmanager from rich.progress import Progress +from filelock import FileLock from settings import ZEUZ_NODE_DOWNLOADS_DIR try: @@ -58,6 +60,48 @@ def __init__(self): if not self.CHROME_INFO_FILE.exists(): self._init_info_file() + @contextmanager + def _info_lock(self): + """Serialize access to the chrome-for-testing cache files.""" + self.CHROME_BASE_DIR.mkdir(parents=True, exist_ok=True) + lock_path = str(self.CHROME_INFO_FILE) + ".lock" + with FileLock(lock_path): + yield + + def _read_info_unlocked(self): + """Read info.json without taking the lock.""" + defaults = { + "latest": {"version": "", "last_check": ""}, + "installed_versions": {}, + "settings": {"days_before_fetch": 15, "days_before_cleanup": 50}, + } + + if not self.CHROME_INFO_FILE.exists(): + return defaults + + with open(self.CHROME_INFO_FILE, "r") as f: + info = json.load(f) + + if "settings" not in info: + info["settings"] = defaults["settings"] + + if "latest" not in info: + info["latest"] = defaults["latest"] + + if "installed_versions" not in info: + info["installed_versions"] = defaults["installed_versions"] + + return info + + def _write_info_unlocked(self, info): + """Atomically write info.json without taking the lock.""" + temp_path = self.CHROME_INFO_FILE.with_suffix(".json.tmp") + with open(temp_path, "w") as f: + json.dump(info, f, indent=4) + f.flush() + os.fsync(f.fileno()) + os.replace(temp_path, self.CHROME_INFO_FILE) + def _install_linux_dependencies(self): """Install Chrome dependencies for Ubuntu 24.04 and newer""" try: @@ -182,35 +226,28 @@ def _init_info_file(self): "days_before_cleanup": 50, # set default cleanup old versions after 50 days }, } - with open(self.CHROME_INFO_FILE, "w") as f: - json.dump(info, f, indent=4) + with self._info_lock(): + self._write_info_unlocked(info) def _load_info(self): """Load the info.json content""" - # modification here to use defaults with settings - defaults = { - "latest": {"version": "", "last_check": ""}, - "installed_versions": {}, - "settings": {"days_before_fetch": 15, "days_before_cleanup": 50}, - } - - if not self.CHROME_INFO_FILE.exists(): - return defaults - - with open(self.CHROME_INFO_FILE, "r") as f: - info = json.load(f) - - # adds settings if missing - if "settings" not in info: - info["settings"] = defaults["settings"] - self._save_info(info) - - return info + with self._info_lock(): + try: + info = self._read_info_unlocked() + except json.JSONDecodeError: + info = { + "latest": {"version": "", "last_check": ""}, + "installed_versions": {}, + "settings": {"days_before_fetch": 15, "days_before_cleanup": 50}, + } + self._write_info_unlocked(info) + + return info def _save_info(self, info): """Save data to info.json""" - with open(self.CHROME_INFO_FILE, "w") as f: - json.dump(info, f, indent=4) + with self._info_lock(): + self._write_info_unlocked(info) def get_latest_version(self, channel="Stable", force_check=False): """Get the latest Chrome version with caching""" @@ -318,11 +355,12 @@ def is_version_installed(self, version): def _update_installed_version_date(self, version): """Update the last used date for an installed version""" - info = self._load_info() - today = datetime.date.today().isoformat() + with self._info_lock(): + info = self._read_info_unlocked() + today = datetime.date.today().isoformat() - info["installed_versions"][version] = today - self._save_info(info) + info["installed_versions"][version] = today + self._write_info_unlocked(info) def get_chrome_binary_path(self, version_dir): """Get path to Chrome binary""" @@ -413,45 +451,46 @@ def set_execute_permissions(self, version_dir): def cleanup_old_versions(self): """Remove versions not used in the last X days (from settings)""" - info = self._load_info() - installed_versions = info.get("installed_versions", {}) - today = datetime.date.today() + with self._info_lock(): + info = self._read_info_unlocked() + installed_versions = info.get("installed_versions", {}) + today = datetime.date.today() + + # get days_before_cleanup from settings + settings = info.get("settings", {}) + + # Check environment variable first + env_fetch = os.environ.get("CHROME_DAYS_BEFORE_CLEANUP") + if env_fetch: + days_before_cleanup = int(env_fetch) + print(f"Using days_before_cleanup from env: {days_before_cleanup}") + else: + # otherwise use info.json or default + days_before_cleanup = settings.get("days_before_cleanup", 50) - # get days_before_cleanup from settings - settings = info.get("settings", {}) + # modification here to use settings for days_before_cleanup + cutoff_date = today - timedelta(days=days_before_cleanup) - # Check environment variable first - env_fetch = os.environ.get("CHROME_DAYS_BEFORE_CLEANUP") - if env_fetch: - days_before_cleanup = int(env_fetch) - print(f"Using days_before_cleanup from env: {days_before_cleanup}") - else: - # otherwise use info.json or default - days_before_cleanup = settings.get("days_before_cleanup", 50) - - # modification here to use settings for days_before_cleanup - cutoff_date = today - timedelta(days=days_before_cleanup) - - versions_to_remove = [] - for version, date_str in installed_versions.items(): - if not date_str: - continue - - last_used = datetime.date.fromisoformat(date_str) - if last_used < cutoff_date: - versions_to_remove.append(version) - - for version in versions_to_remove: - version_dir = self.CHROME_VERSIONS_DIR / version - if version_dir.exists(): - print(f"Cleaning up unused CfT version: {version}") - shutil.rmtree(version_dir, ignore_errors=True) - del installed_versions[version] - - if versions_to_remove: - info["installed_versions"] = installed_versions - self._save_info(info) - print(f"Removed {len(versions_to_remove)} old versions of CfT") + versions_to_remove = [] + for version, date_str in installed_versions.items(): + if not date_str: + continue + + last_used = datetime.date.fromisoformat(date_str) + if last_used < cutoff_date: + versions_to_remove.append(version) + + for version in versions_to_remove: + version_dir = self.CHROME_VERSIONS_DIR / version + if version_dir.exists(): + print(f"Cleaning up unused CfT version: {version}") + shutil.rmtree(version_dir, ignore_errors=True) + del installed_versions[version] + + if versions_to_remove: + info["installed_versions"] = installed_versions + self._write_info_unlocked(info) + print(f"Removed {len(versions_to_remove)} old versions of CfT") def install_version(self, version): """Install a specific Chrome version"""