diff --git a/openad/app/main.py b/openad/app/main.py index 578ad34b..9e4c6c04 100644 --- a/openad/app/main.py +++ b/openad/app/main.py @@ -21,6 +21,7 @@ from openad.gui.ws_server import ws_server # Web socket server for gui - experimental from openad.helpers.output import output_table from openad.helpers.plugins import display_plugin_overview +from openad.helpers.history import init_history, update_history_file, add_history_entry # Core import openad.core.help as openad_help @@ -508,27 +509,11 @@ def do_help(self, inp, display_info=True, starts_with_only=False, disable_catego def preloop(self): """CMD class called function: Preloop is called by cmd to get an update the history file each History File""" - if readline and os.path.exists(self.histfile): - # note history files can get corrupted so using try to compensate - try: - readline.read_history_file(self.histfile) - except Exception: # pylint: disable=broad-exception-caught # do not need to know exception - # Create history file in case it doesn't exist yet. - # - - - - # To trigger: - # >> create new workspace foobar - # >> ctrl+c - # (Reboot) - readline.write_history_file(self.histfile) + init_history(self) def postloop(self): """CMD class called function: Post loop is called by cmd to get an update the history file""" - readline.set_history_length(self.histfile_size) - readline.write_history_file(self.histfile) - - def add_history(self, inp): - """CMD class called function: adds history file""" - readline.add_history(inp) + update_history_file(self) def complete(self, text, state): """CMD class called function: @@ -748,10 +733,6 @@ def emptyline(self): def default(self, line): """Default method call on hitting of the return Key, it tries to parse and execute the statements.""" - # Prevent the history from growing too large - if readline.get_current_history_length() > self.histfile_size: - readline.remove_history_item(0) - inp = line # assigning line to input value x = None @@ -1031,7 +1012,6 @@ def api_remote( """ global MAGIC_PROMPT - # GLOBAL_SETTINGS["display"] = "notebook" initialise() @@ -1066,14 +1046,7 @@ def api_remote( set_context(magic_prompt, x) magic_prompt.api_variables = api_var_list - # We now manage history. The history sometimes gets corrupted through no fault of ours. - # If so, we just reset it. - try: - readline.read_history_file(magic_prompt.histfile) - except Exception: # pylint: disable=broad-exception-caught # could be a number of errors - readline.add_history("") - readline.write_history_file(magic_prompt.histfile) - readline.read_history_file(magic_prompt.histfile) + for i in arguments: inp = inp + a_space + i a_space = " " @@ -1105,9 +1078,8 @@ def api_remote( # Note, may be possible add code completion here #revisit else: magic_prompt.preloop() - magic_prompt.add_history(inp) + add_history_entry(magic_prompt, inp) magic_prompt.postloop() - readline.write_history_file(magic_prompt.histfile) result = magic_prompt.default(inp) @@ -1171,7 +1143,7 @@ def cmd_line(): and command_line.settings["context"] == words[2 + word_increment].upper() ): command_line.preloop() - command_line.add_history(str(" ".join(words[3 + word_increment :])).strip()) + add_history_entry(command_line, str(" ".join(words[3 + word_increment :])).strip()) command_line.postloop() result = command_line.default(str(" ".join(words[3 + word_increment :])).strip()) else: @@ -1179,7 +1151,7 @@ def cmd_line(): # Note, may be possible add code completion here #revisit command_line.preloop() - command_line.add_history(inp[+increment:].strip()) + add_history_entry(command_line, inp[+increment:].strip()) command_line.postloop() result = command_line.default(inp[+increment:].strip()) command_line.do_exit("dummy do not remove") diff --git a/openad/core/lang_runs.py b/openad/core/lang_runs.py index 9efdb838..043c9734 100644 --- a/openad/core/lang_runs.py +++ b/openad/core/lang_runs.py @@ -7,6 +7,7 @@ # Global variables from openad.app.global_var_lib import GLOBAL_SETTINGS +from openad.helpers.history import update_history_file # Helpers from openad.helpers.output import output_text, output_error, output_success, output_table @@ -23,7 +24,7 @@ def _create_workspace_dir_if_nonexistent(cmd_pointer, dir_name): def save_run(cmd_pointer, parser): """Saves a Run""" _create_workspace_dir_if_nonexistent(cmd_pointer, "_runs") - readline.write_history_file(cmd_pointer.histfile) + update_history_file(cmd_pointer) # f =_meta_workspaces+'/'+ cmd_pointer.settings['workspace'].upper()+'/.cmd_history' runlist = [] @@ -111,9 +112,6 @@ def display_run(cmd_pointer, parser): # Create _runs directory if it does not exist yet. _create_workspace_dir_if_nonexistent(cmd_pointer, "_runs") - # import readline - # readline.write_history_file(cmd_pointer.histfile) # @Phil, I put this back but it was commented out - # Read the run file. commands = [] run_file_path = ( diff --git a/openad/core/lang_workspaces.py b/openad/core/lang_workspaces.py index aa34894e..966ec89e 100644 --- a/openad/core/lang_workspaces.py +++ b/openad/core/lang_workspaces.py @@ -3,8 +3,6 @@ import os from time import sleep -import readline - # Core from openad.core.lang_sessions_and_registry import write_registry, update_main_registry_env_var @@ -16,12 +14,13 @@ from openad.helpers.output_msgs import msg from openad.helpers.general import other_sessions_exist, user_input from openad.helpers.spinner import spinner +from openad.helpers.history import clear_memory_history, init_history # Sets the current workspace from the fgiven workspaces available def set_workspace(cmd_pointer, parser): """Sets the current Workspace""" - readline.write_history_file(cmd_pointer.histfile) + current_workspace_name = cmd_pointer.settings["workspace"].upper() new_workspace_name = parser["Workspace_Name"].upper() if new_workspace_name not in cmd_pointer.settings["workspaces"]: @@ -29,19 +28,17 @@ def set_workspace(cmd_pointer, parser): elif new_workspace_name == current_workspace_name: return output_warning(msg("warn_workspace_already_active", new_workspace_name)) + + # New workspace else: cmd_pointer.settings["workspace"] = new_workspace_name write_registry(cmd_pointer.settings, cmd_pointer) cmd_pointer.histfile = os.path.expanduser(cmd_pointer.workspace_path(new_workspace_name) + "/.cmd_history") - readline.clear_history() - try: # Open history file if not corrupt - if readline and os.path.exists(cmd_pointer.histfile): - readline.read_history_file(cmd_pointer.histfile) - except Exception: - readline.write_history_file(cmd_pointer.histfile) + # Switch history + clear_memory_history(cmd_pointer) + init_history(cmd_pointer) - readline.write_history_file(cmd_pointer.histfile) return output_success(msg("success_workspace_set", new_workspace_name)) @@ -131,11 +128,9 @@ def remove_workspace(cmd_pointer, parser): def create_workspace(cmd_pointer, parser): """Creates a Workspace""" - # Make sure existing workspace history file is saved. - readline.write_history_file(cmd_pointer.histfile) + cmd_pointer.refresh_vector = True cmd_pointer.refresh_train = True - cmd_pointer.settings["env_vars"]["refresh_help_ai"] = True update_main_registry_env_var(cmd_pointer, "refresh_help_ai", True) @@ -167,7 +162,7 @@ def create_workspace(cmd_pointer, parser): cmd_pointer.settings["descriptions"][workspace_name] = description write_registry(cmd_pointer.settings, cmd_pointer, True) # Create registry write_registry(cmd_pointer.settings, cmd_pointer) # Create session registry - except Exception as err: + except Exception as err: # pylint: disable=broad-exception-caught return output_error(msg("err_workspace_description", err)) # Create workspace. @@ -198,16 +193,17 @@ def create_workspace(cmd_pointer, parser): if not os.path.exists(dir_path): os.mkdir(dir_path) else: - # This currently happens when you remove a workspace and then try to recreate it. - # @Phil - we probably should move or archive the workspace folder when removing the workspace. + # When you remove a workspace and then recreate it os.chdir(dir_path) error_creating_dir = msg("warn_workspace_folder_already_exists", workspace_name) + # Main and session registry writes write_registry(cmd_pointer.settings, cmd_pointer, True) write_registry(cmd_pointer.settings, cmd_pointer) - readline.clear_history() - readline.write_history_file(cmd_pointer.histfile) + # Switch history + clear_memory_history(cmd_pointer) + # raise ValueError('This is a test error.\n') @later this causes the app to break permamenently. except Exception as err: error_other = msg("err_workspace_create", err) diff --git a/openad/helpers/general.py b/openad/helpers/general.py index 505d77f3..eb5dcfc7 100644 --- a/openad/helpers/general.py +++ b/openad/helpers/general.py @@ -29,7 +29,7 @@ def refresh_prompt(settings): def is_notebook_mode(): """Return True if we are running inside a Jupyter Notebook or Jupyter Lab.""" try: - get_ipython() # pylint: disable=undefined-variable + get_ipython() # pylint: disable=undefined-variable # noqa: F821 return True except Exception: # pylint: disable=broad-exception-caught return False @@ -191,7 +191,7 @@ def load_module_from_path(module_name, file_path): sys.modules[module_name] = module spec.loader.exec_module(module) return module - except Exception as err: + except Exception: # Silent fail - only enable this for debugging # output_error(f"load_module_from_path('{module_name}', {file_path})\n{err}") return None @@ -201,7 +201,7 @@ def load_module_from_path(module_name, file_path): def print_separator(style=None, width=None, return_val=False): from openad.app.global_var_lib import GLOBAL_SETTINGS - if GLOBAL_SETTINGS["display"] == "terminal" or GLOBAL_SETTINGS["display"] == None: + if GLOBAL_SETTINGS["display"] == "terminal" or GLOBAL_SETTINGS["display"] is None: cli_width = get_print_width(full=True) width = cli_width if not width or cli_width < width else width if style: @@ -439,19 +439,3 @@ async def _loop(text="", i=0, line_length=0): # _loop(text=text) # Sync asyncio.run(_loop(text=text)) sys.stdout.write("\033[?25h") # Show cursor - - -# NOT USED -# Clear the current line. -# Couldn't get this to work, because I can't clear the buffer. -# As a result, (eg.) when you try ctrl-c twice with n answer, -# the second time it will have the first time's response in the buffer, -# causing it to mess up the layout. -def clear_current_line(): - buffer = readline.get_line_buffer() - line_length = len(buffer) - readline.clear_history() - eraser = "\b \b" * line_length - sys.stdout.write(eraser) - # readline.insert_text(' ') - # print(len(buffer), buffer) diff --git a/openad/helpers/history.py b/openad/helpers/history.py new file mode 100644 index 00000000..328e313e --- /dev/null +++ b/openad/helpers/history.py @@ -0,0 +1,84 @@ +import os +import readline +from openad.helpers.output import output_text, output_error, output_success + +DEBUG_HIST = False + + +def init_history(cmd_pointer): + """ + Load history content from the .cmd_history file: + - On startup + - When switching workspaces + """ + readline.set_history_length(cmd_pointer.histfile_size) + if readline and os.path.exists(cmd_pointer.histfile): + try: + is_startup = readline.get_current_history_length() == 0 + if is_startup: + readline.read_history_file(cmd_pointer.histfile) + if DEBUG_HIST: + output_success(f"load_history_file: {cmd_pointer.histfile}", return_val=False) + except Exception as err: # pylint: disable=broad-exception-caught + if DEBUG_HIST: + output_error(["load_history_file", err], return_val=False) + elif DEBUG_HIST: + output_error( + [ + f"load_history_file - .cmd_history not found in {cmd_pointer.settings['workspace']}", + cmd_pointer.histfile, + ], + return_val=False, + ) + + +def clear_memory_history(cmd_pointer): + """ + Clear the in-memory history without removing the history file. + Used when switching workspaces. + + Workaround needed because readline doesn't let you clear in-memory + history without also deleting the history file. + """ + try: + readline.write_history_file(cmd_pointer.histfile + "--temp") + readline.clear_history() + os.remove(cmd_pointer.histfile + "--temp") + except Exception as err: # pylint: disable=broad-exception-caught + if DEBUG_HIST: + output_error(["refresh_history", err], return_val=False) + + +def add_history_entry(cmd_pointer, inp): + """ + Add the current command to the in-memory history. + This is called when a command is executed. + """ + try: + # Ignore super long commands + if len(str(str(inp).strip())) < int(4096): + readline.add_history(str(str(inp).strip())) + + # Cap the in-memory history + # Without this, it will keep on growing until you switch workspaces or restart kernel + if readline.get_current_history_length() > cmd_pointer.histfile_size: + readline.remove_history_item(0) + + if DEBUG_HIST: + output_text(f"add_history_entry #{cmd_pointer.histfile_size}: {inp}", return_val=False) + except Exception as err: # pylint: disable=broad-exception-caught + if DEBUG_HIST: + output_error(["add_history_entry", err], return_val=False) + + +def update_history_file(cmd_pointer): + """ + Write in-memory history to disk. + """ + try: + readline.write_history_file(cmd_pointer.histfile) + if DEBUG_HIST: + output_text(f"update_history_file: {cmd_pointer.histfile}", return_val=False) + except Exception as err: # pylint: disable=broad-exception-caught + if DEBUG_HIST: + output_error(["update_history_file", err], return_val=False) diff --git a/openad/helpers/spinner.py b/openad/helpers/spinner.py index b8f95bd3..d6ca351a 100644 --- a/openad/helpers/spinner.py +++ b/openad/helpers/spinner.py @@ -26,12 +26,10 @@ class Spinner(Halo): - def __init__(self): - # Fancy spinner, but requires more CPU, blocking the main thread # To do: see if separating thread for spinner resolves this - wave_spinner = { + wave_spinner = { # noqa: F841 "interval": 700, "frames": [ "▉▋▍▎▏▏", diff --git a/openad/user_toolkits/RXN/fn_reactions/fn_predict_retro.py b/openad/user_toolkits/RXN/fn_reactions/fn_predict_retro.py index 1e2abd8a..baf79fd1 100644 --- a/openad/user_toolkits/RXN/fn_reactions/fn_predict_retro.py +++ b/openad/user_toolkits/RXN/fn_reactions/fn_predict_retro.py @@ -261,7 +261,7 @@ def predict_retro(inputs: dict, cmd_pointer): "", return_val=False, ) - if num_results < 4 or GLOBAL_SETTINGS["display"] == "api":: + if num_results < 4 or GLOBAL_SETTINGS["display"] == "api": results[str(index)] = {"confidence": tree["confidence"], "reactions": []} output_text( @@ -273,7 +273,7 @@ def predict_retro(inputs: dict, cmd_pointer): ) for reaction in collect_reactions_from_retrosynthesis(tree): - if num_results < 4 or GLOBAL_SETTINGS["display"] == "api":: + if num_results < 4 or GLOBAL_SETTINGS["display"] == "api": results[str(index)]["reactions"].append(reactions_text[i]) output_text(" Reaction: " + reactions_text[i], return_val=False) i = i + 1