diff --git a/src/caelestia/parser.py b/src/caelestia/parser.py index 840ead5c..4f5afca2 100644 --- a/src/caelestia/parser.py +++ b/src/caelestia/parser.py @@ -1,32 +1,61 @@ import argparse -from caelestia.subcommands import clipboard, emoji, record, resizer, scheme, screenshot, shell, toggle, wallpaper +from caelestia.subcommands import ( + clipboard, + emoji, + record, + resizer, + scheme, + screenshot, + shell, + toggle, + wallpaper, +) from caelestia.utils.paths import wallpapers_dir from caelestia.utils.scheme import get_scheme_names, scheme_variants from caelestia.utils.wallpaper import get_wallpaper def parse_args() -> (argparse.ArgumentParser, argparse.Namespace): - parser = argparse.ArgumentParser(prog="caelestia", description="Main control script for the Caelestia dotfiles") - parser.add_argument("-v", "--version", action="store_true", help="print the current version") + parser = argparse.ArgumentParser( + prog="caelestia", description="Main control script for the Caelestia dotfiles" + ) + parser.add_argument( + "-v", "--version", action="store_true", help="print the current version" + ) # Add subcommand parsers command_parser = parser.add_subparsers( - title="subcommands", description="valid subcommands", metavar="COMMAND", help="the subcommand to run" + title="subcommands", + description="valid subcommands", + metavar="COMMAND", + help="the subcommand to run", ) # Create parser for shell opts shell_parser = command_parser.add_parser("shell", help="start or message the shell") shell_parser.set_defaults(cls=shell.Command) - shell_parser.add_argument("message", nargs="*", help="a message to send to the shell") - shell_parser.add_argument("-d", "--daemon", action="store_true", help="start the shell detached") - shell_parser.add_argument("-s", "--show", action="store_true", help="print all shell IPC commands") - shell_parser.add_argument("-l", "--log", action="store_true", help="print the shell log") - shell_parser.add_argument("-k", "--kill", action="store_true", help="kill the shell") + shell_parser.add_argument( + "message", nargs="*", help="a message to send to the shell" + ) + shell_parser.add_argument( + "-d", "--daemon", action="store_true", help="start the shell detached" + ) + shell_parser.add_argument( + "-s", "--show", action="store_true", help="print all shell IPC commands" + ) + shell_parser.add_argument( + "-l", "--log", action="store_true", help="print the shell log" + ) + shell_parser.add_argument( + "-k", "--kill", action="store_true", help="kill the shell" + ) shell_parser.add_argument("--log-rules", metavar="RULES", help="log rules to apply") # Create parser for toggle opts - toggle_parser = command_parser.add_parser("toggle", help="toggle a special workspace") + toggle_parser = command_parser.add_parser( + "toggle", help="toggle a special workspace" + ) toggle_parser.set_defaults(cls=toggle.Command) toggle_parser.add_argument("workspace", help="the workspace to toggle") @@ -34,66 +63,162 @@ def parse_args() -> (argparse.ArgumentParser, argparse.Namespace): scheme_parser = command_parser.add_parser("scheme", help="manage the colour scheme") scheme_command_parser = scheme_parser.add_subparsers(title="subcommands") - list_parser = scheme_command_parser.add_parser("list", help="list available schemes") + list_parser = scheme_command_parser.add_parser( + "list", help="list available schemes" + ) list_parser.set_defaults(cls=scheme.List) - list_parser.add_argument("-n", "--names", action="store_true", help="list scheme names") - list_parser.add_argument("-f", "--flavours", action="store_true", help="list scheme flavours") - list_parser.add_argument("-m", "--modes", action="store_true", help="list scheme modes") - list_parser.add_argument("-v", "--variants", action="store_true", help="list scheme variants") + list_parser.add_argument( + "-n", "--names", action="store_true", help="list scheme names" + ) + list_parser.add_argument( + "-f", "--flavours", action="store_true", help="list scheme flavours" + ) + list_parser.add_argument( + "-m", "--modes", action="store_true", help="list scheme modes" + ) + list_parser.add_argument( + "-v", "--variants", action="store_true", help="list scheme variants" + ) get_parser = scheme_command_parser.add_parser("get", help="get scheme properties") get_parser.set_defaults(cls=scheme.Get) - get_parser.add_argument("-n", "--name", action="store_true", help="print the current scheme name") - get_parser.add_argument("-f", "--flavour", action="store_true", help="print the current scheme flavour") - get_parser.add_argument("-m", "--mode", action="store_true", help="print the current scheme mode") - get_parser.add_argument("-v", "--variant", action="store_true", help="print the current scheme variant") + get_parser.add_argument( + "-n", "--name", action="store_true", help="print the current scheme name" + ) + get_parser.add_argument( + "-f", "--flavour", action="store_true", help="print the current scheme flavour" + ) + get_parser.add_argument( + "-m", "--mode", action="store_true", help="print the current scheme mode" + ) + get_parser.add_argument( + "-v", "--variant", action="store_true", help="print the current scheme variant" + ) set_parser = scheme_command_parser.add_parser("set", help="set the current scheme") set_parser.set_defaults(cls=scheme.Set) - set_parser.add_argument("--notify", action="store_true", help="send a notification on error") - set_parser.add_argument("-r", "--random", action="store_true", help="switch to a random scheme") - set_parser.add_argument("-n", "--name", choices=get_scheme_names(), help="the name of the scheme to switch to") + set_parser.add_argument( + "--notify", action="store_true", help="send a notification on error" + ) + set_parser.add_argument( + "-r", "--random", action="store_true", help="switch to a random scheme" + ) + set_parser.add_argument( + "-n", + "--name", + choices=get_scheme_names(), + help="the name of the scheme to switch to", + ) set_parser.add_argument("-f", "--flavour", help="the flavour to switch to") - set_parser.add_argument("-m", "--mode", choices=["dark", "light"], help="the mode to switch to") - set_parser.add_argument("-v", "--variant", choices=scheme_variants, help="the variant to switch to") + set_parser.add_argument( + "-m", "--mode", choices=["dark", "light"], help="the mode to switch to" + ) + set_parser.add_argument( + "-v", "--variant", choices=scheme_variants, help="the variant to switch to" + ) # Create parser for screenshot opts - screenshot_parser = command_parser.add_parser("screenshot", help="take a screenshot") + screenshot_parser = command_parser.add_parser( + "screenshot", help="take a screenshot" + ) screenshot_parser.set_defaults(cls=screenshot.Command) - screenshot_parser.add_argument("-r", "--region", nargs="?", const="slurp", help="take a screenshot of a region") screenshot_parser.add_argument( - "-f", "--freeze", action="store_true", help="freeze the screen while selecting a region" + "-r", "--region", nargs="?", const="slurp", help="take a screenshot of a region" + ) + screenshot_parser.add_argument( + "-f", + "--freeze", + action="store_true", + help="freeze the screen while selecting a region", ) # Create parser for record opts record_parser = command_parser.add_parser("record", help="start a screen recording") record_parser.set_defaults(cls=record.Command) - record_parser.add_argument("-r", "--region", nargs="?", const="slurp", help="record a region") - record_parser.add_argument("-s", "--sound", action="store_true", help="record audio") - record_parser.add_argument("-p", "--pause", action="store_true", help="pause/resume the recording") + + # Recording mode options - separate video and audio modes + record_parser.add_argument( + "-m", + "--mode", + choices=["fullscreen", "region", "window"], + default="fullscreen", + help="video recording mode (default: fullscreen)", + ) + + record_parser.add_argument( + "-a", + "--audio", + choices=["none", "mic", "system", "combined"], + default="none", + help="Audio recording mode", + ) + + # Region option (only used with region mode) + record_parser.add_argument( + "-r", + "--region", + nargs="?", + const="slurp", + help="record a region (only for region mode)", + ) + + # Control options + record_parser.add_argument( + "-p", "--pause", action="store_true", help="pause/resume the recording" + ) + record_parser.add_argument( + "-s", "--stop", action="store_true", help="stop the current recording" + ) + record_parser.add_argument( + "--status", action="store_true", help="check recording status" + ) # Create parser for clipboard opts - clipboard_parser = command_parser.add_parser("clipboard", help="open clipboard history") + clipboard_parser = command_parser.add_parser( + "clipboard", help="open clipboard history" + ) clipboard_parser.set_defaults(cls=clipboard.Command) - clipboard_parser.add_argument("-d", "--delete", action="store_true", help="delete from clipboard history") + clipboard_parser.add_argument( + "-d", "--delete", action="store_true", help="delete from clipboard history" + ) # Create parser for emoji-picker opts emoji_parser = command_parser.add_parser("emoji", help="emoji/glyph utilities") emoji_parser.set_defaults(cls=emoji.Command) - emoji_parser.add_argument("-p", "--picker", action="store_true", help="open the emoji/glyph picker") - emoji_parser.add_argument("-f", "--fetch", action="store_true", help="fetch emoji/glyph data from remote") + emoji_parser.add_argument( + "-p", "--picker", action="store_true", help="open the emoji/glyph picker" + ) + emoji_parser.add_argument( + "-f", "--fetch", action="store_true", help="fetch emoji/glyph data from remote" + ) # Create parser for wallpaper opts - wallpaper_parser = command_parser.add_parser("wallpaper", help="manage the wallpaper") + wallpaper_parser = command_parser.add_parser( + "wallpaper", help="manage the wallpaper" + ) wallpaper_parser.set_defaults(cls=wallpaper.Command) wallpaper_parser.add_argument( - "-p", "--print", nargs="?", const=get_wallpaper(), metavar="PATH", help="print the scheme for a wallpaper" + "-p", + "--print", + nargs="?", + const=get_wallpaper(), + metavar="PATH", + help="print the scheme for a wallpaper", + ) + wallpaper_parser.add_argument( + "-r", + "--random", + nargs="?", + const=wallpapers_dir, + metavar="DIR", + help="switch to a random wallpaper", ) wallpaper_parser.add_argument( - "-r", "--random", nargs="?", const=wallpapers_dir, metavar="DIR", help="switch to a random wallpaper" + "-f", "--file", help="the path to the wallpaper to switch to" + ) + wallpaper_parser.add_argument( + "-n", "--no-filter", action="store_true", help="do not filter by size" ) - wallpaper_parser.add_argument("-f", "--file", help="the path to the wallpaper to switch to") - wallpaper_parser.add_argument("-n", "--no-filter", action="store_true", help="do not filter by size") wallpaper_parser.add_argument( "-t", "--threshold", @@ -110,7 +235,9 @@ def parse_args() -> (argparse.ArgumentParser, argparse.Namespace): # Create parser for resizer opts resizer_parser = command_parser.add_parser("resizer", help="window resizer daemon") resizer_parser.set_defaults(cls=resizer.Command) - resizer_parser.add_argument("-d", "--daemon", action="store_true", help="start the resizer daemon") + resizer_parser.add_argument( + "-d", "--daemon", action="store_true", help="start the resizer daemon" + ) resizer_parser.add_argument( "pattern", nargs="?", @@ -125,6 +252,8 @@ def parse_args() -> (argparse.ArgumentParser, argparse.Namespace): ) resizer_parser.add_argument("width", nargs="?", help="width to resize to") resizer_parser.add_argument("height", nargs="?", help="height to resize to") - resizer_parser.add_argument("actions", nargs="?", help="comma-separated actions to apply (float,center,pip)") + resizer_parser.add_argument( + "actions", nargs="?", help="comma-separated actions to apply (float,center,pip)" + ) return parser, parser.parse_args() diff --git a/src/caelestia/subcommands/record.py b/src/caelestia/subcommands/record.py index 867eb1b5..c63b0e91 100644 --- a/src/caelestia/subcommands/record.py +++ b/src/caelestia/subcommands/record.py @@ -7,10 +7,21 @@ from datetime import datetime from caelestia.utils.notify import close_notification, notify -from caelestia.utils.paths import recording_notif_path, recording_path, recordings_dir, user_config_path +from caelestia.utils.paths import ( + recording_notif_path, + recording_path, + recordings_dir, + user_config_path, +) RECORDER = "gpu-screen-recorder" +AUDIO_MODES = { + "mic": "default_input", + "system": "default_output", + "combined": "default_output|default_input", +} + class Command: args: Namespace @@ -19,26 +30,142 @@ def __init__(self, args: Namespace) -> None: self.args = args def run(self) -> None: - if self.args.pause: - subprocess.run(["pkill", "-USR2", "-f", RECORDER], stdout=subprocess.DEVNULL) + if hasattr(self.args, "status") and self.args.status: + self.status() + elif hasattr(self.args, "stop") and self.args.stop: + self.stop() + elif self.args.pause: + subprocess.run( + ["pkill", "-USR2", "-f", RECORDER], stdout=subprocess.DEVNULL + ) elif self.proc_running(): self.stop() else: self.start() + def status(self) -> None: + """Check and display recording status""" + if self.proc_running(): + print("Recording: RUNNING") + else: + print("Recording: STOPPED") + def proc_running(self) -> bool: - return subprocess.run(["pidof", RECORDER], stdout=subprocess.DEVNULL).returncode == 0 + return ( + subprocess.run(["pidof", RECORDER], stdout=subprocess.DEVNULL).returncode + == 0 + ) + + def intersects( + self, a: tuple[int, int, int, int], b: tuple[int, int, int, int] + ) -> bool: + return ( + a[0] < b[0] + b[2] + and a[0] + a[2] > b[0] + and a[1] < b[1] + b[3] + and a[1] + a[3] > b[1] + ) + + def get_audio_device(self, audio_mode: str) -> str: + """Get the appropriate audio device for the given mode with fallback handling.""" + if not audio_mode: + return "none" + + device = AUDIO_MODES.get(audio_mode, "") + + # Check if the device is available + if audio_mode in ["mic", "system", "combined"]: + try: + result = subprocess.run( + ["pactl", "list", "sources", "short"], + capture_output=True, + text=True, + check=True, + ) + available_devices = [ + line.split("\t")[1] + for line in result.stdout.strip().split("\n") + if line + ] - def intersects(self, a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> bool: - return a[0] < b[0] + b[2] and a[0] + a[2] > b[0] and a[1] < b[1] + b[3] and a[1] + a[3] > b[1] + if device and device not in available_devices: + print( + f"Warning: Audio device '{device}' not available, falling back to default" + ) + if audio_mode == "mic": + input_devices = [ + d + for d in available_devices + if "input" in d.lower() or "mic" in d.lower() + ] + device = input_devices[0] if input_devices else "" + elif audio_mode == "system": + output_devices = [ + d + for d in available_devices + if "output" in d.lower() or "monitor" in d.lower() + ] + device = output_devices[0] if output_devices else "" + except (subprocess.CalledProcessError, FileNotFoundError): + print( + "Warning: Could not check audio devices, audio recording may fail" + ) + device = "" + + return device + + def get_window_region(self) -> str | None: + """Get window region using Hyprland's client list and slurp for selection.""" + try: + # Get all windows from Hyprland + clients = json.loads(subprocess.check_output(["hyprctl", "clients", "-j"])) + + if not clients: + print("No windows found") + return None + + # Create slurp format strings for each window + slurp_regions = [] + for client in clients: + x = client["at"][0] + y = client["at"][1] + w = client["size"][0] + h = client["size"][1] + slurp_regions.append(f"{x},{y} {w}x{h}") + + # Use slurp with predefined regions to pick a window + slurp_input = "\n".join(slurp_regions) + result = subprocess.run( + ["slurp", "-f", "%wx%h+%x+%y"], + input=slurp_input, + capture_output=True, + text=True, + ) + + if result.returncode != 0: + return None + + return result.stdout.strip() + + except (subprocess.CalledProcessError, json.JSONDecodeError, KeyError) as e: + print(f"Error getting window region: {e}") + return None def start(self) -> None: args = ["-w"] + # Get video mode and audio mode from args + video_mode = getattr(self.args, "mode", "fullscreen") + audio_mode = getattr(self.args, "audio", "none") + monitors = json.loads(subprocess.check_output(["hyprctl", "monitors", "-j"])) - if self.args.region: - if self.args.region == "slurp": - region = subprocess.check_output(["slurp", "-f", "%wx%h+%x+%y"], text=True) + + # Handle video modes + if video_mode == "region" or self.args.region: + if self.args.region == "slurp" or not self.args.region: + region = subprocess.check_output( + ["slurp", "-f", "%wx%h+%x+%y"], text=True + ).strip() else: region = self.args.region.strip() args += ["region", "-region", region] @@ -51,18 +178,65 @@ def start(self) -> None: r = x, y, w, h max_rr = 0 for monitor in monitors: - if self.intersects((monitor["x"], monitor["y"], monitor["width"], monitor["height"]), r): + if self.intersects( + (monitor["x"], monitor["y"], monitor["width"], monitor["height"]), r + ): rr = round(monitor["refreshRate"]) max_rr = max(max_rr, rr) args += ["-f", str(max_rr)] - else: - focused_monitor = next(monitor for monitor in monitors if monitor["focused"]) + + elif video_mode == "window": + window_info = self.get_window_region() + if not window_info: + print("Window selection canceled") + return + + args += ["region", "-region", window_info] + m = re.match(r"(\d+)x(\d+)\+(\d+)\+(\d+)", window_info) + if not m: + raise ValueError(f"Invalid window region: {window_info}") + + w, h, x, y = map(int, m.groups()) + r = x, y, w, h + + # Calculate max refresh rate for the window region + max_rr = 0 + for monitor in monitors: + if self.intersects( + ( + monitor["x"], + monitor["y"], + monitor["width"], + monitor["height"], + ), + r, + ): + rr = round(monitor["refreshRate"]) + max_rr = max(max_rr, rr) + args += ["-f", str(max_rr)] + + else: # fullscreen + focused_monitor = next( + (monitor for monitor in monitors if monitor["focused"]), None + ) if focused_monitor: - args += [focused_monitor["name"], "-f", str(round(focused_monitor["refreshRate"]))] + args += [ + focused_monitor["name"], + "-f", + str(round(focused_monitor["refreshRate"])), + ] - if self.args.sound: + # Handle audio modes + audio_device = self.get_audio_device(audio_mode) + if audio_device: + args += ["-a", audio_device, "-ac", "opus", "-ab", "192k"] + print(f"Recording with audio: {audio_device} ({audio_mode})") + elif hasattr(self.args, "sound") and self.args.sound: args += ["-a", "default_output"] + else: + print("Recording without audio") + # Load extra args from config try: config = json.loads(user_config_path.read_text()) if "record" in config and "extraArgs" in config["record"]: @@ -70,12 +244,18 @@ def start(self) -> None: except (json.JSONDecodeError, FileNotFoundError): pass except TypeError as e: - raise ValueError(f"Config option 'record.extraArgs' should be an array: {e}") + raise ValueError( + f"Config option 'record.extraArgs' should be an array: {e}" + ) recording_path.parent.mkdir(parents=True, exist_ok=True) - proc = subprocess.Popen([RECORDER, *args, "-o", str(recording_path)], start_new_session=True) + proc = subprocess.Popen( + [RECORDER, *args, "-o", str(recording_path)], start_new_session=True + ) - notif = notify("-p", "Recording started", "Recording...") + # Show notification with mode info + mode_text = f"{video_mode} with {audio_mode if audio_device else 'no'} audio" + notif = notify("-p", "Recording started", f"Recording {mode_text}...") recording_notif_path.write_text(notif) try: @@ -94,11 +274,26 @@ def stop(self) -> None: subprocess.run(["pkill", "-f", RECORDER], stdout=subprocess.DEVNULL) # Wait for recording to finish to avoid corrupted video file - while self.proc_running(): + max_wait = 50 # Max 5 seconds + wait_count = 0 + while self.proc_running() and wait_count < max_wait: time.sleep(0.1) + wait_count += 1 + + # Check if file exists before trying to move it + if not recording_path.exists(): + print("Warning: No recording file found") + try: + close_notification(recording_notif_path.read_text()) + except IOError: + pass + return # Move to recordings folder - new_path = recordings_dir / f"recording_{datetime.now().strftime('%Y%m%d_%H-%M-%S')}.mp4" + new_path = ( + recordings_dir + / f"recording_{datetime.now().strftime('%Y%m%d_%H-%M-%S')}.mp4" + ) recordings_dir.mkdir(exist_ok=True, parents=True) shutil.move(recording_path, new_path) @@ -108,30 +303,22 @@ def stop(self) -> None: except IOError: pass - action = notify( - "--action=watch=Watch", - "--action=open=Open", - "--action=delete=Delete", - "Recording stopped", - f"Recording saved in {new_path}", - ) - - if action == "watch": - subprocess.Popen(["app2unit", "-O", new_path], start_new_session=True) - elif action == "open": - p = subprocess.run( + # Show completion notification in background (non-blocking) + try: + subprocess.Popen( [ - "dbus-send", - "--session", - "--dest=org.freedesktop.FileManager1", - "--type=method_call", - "/org/freedesktop/FileManager1", - "org.freedesktop.FileManager1.ShowItems", - f"array:string:file://{new_path}", - "string:", - ] + "notify-send", + "-a", + "caelestia-cli", + "--action=watch=Watch", + "--action=open=Open", + "--action=delete=Delete", + "Recording stopped", + f"Recording saved in {new_path}", + ], + start_new_session=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, ) - if p.returncode != 0: - subprocess.Popen(["app2unit", "-O", new_path.parent], start_new_session=True) - elif action == "delete": - new_path.unlink() + except Exception as e: + print(f"Could not show notification: {e}")