diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..043ccaa --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,24 @@ +version: 2 + +updates: + - package-ecosystem: uv + cooldown: + default-days: 7 + directory: / + groups: + python: + patterns: + - "*" + schedule: + interval: daily + + - package-ecosystem: github-actions + cooldown: + default-days: 7 + directory: / + groups: + actions: + patterns: + - "*" + schedule: + interval: daily diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml new file mode 100644 index 0000000..3c1e215 --- /dev/null +++ b/.github/workflows/docs.yml @@ -0,0 +1,44 @@ +name: Deploy Documentation + +on: + push: + branches: + - main + +permissions: {} + +jobs: + build: + runs-on: ubuntu-latest + permissions: + contents: read # needed for private repos + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + + - name: Install uv + uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # v7.6.0 + + - name: build docs + run: | + make doc + + - name: upload docs artifact + uses: actions/upload-pages-artifact@7b1f4a764d45c48632c6b24a0339c27f5614fb0b # v4.0.0 + with: + path: ./html/ + + deploy: + needs: build + runs-on: ubuntu-latest + permissions: + # NOTE: Needed to push to the repository. + pages: write + id-token: write + environment: + name: github-pages + url: ${{ steps.deployment.outputs.page_url }} + steps: + - id: deployment + uses: actions/deploy-pages@d6db90164ac5ed86f2b6aed7e0febac5b3c0c03e # v4.0.5 diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..b129a51 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,25 @@ +name: Lint + +on: + push: + branches: + - main + pull_request: + +permissions: {} + +jobs: + lint: + runs-on: ubuntu-latest + permissions: + contents: read # needed for private repos + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + + - name: Install uv + uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # v7.6.0 + + - name: lint + run: make lint diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..abfbce1 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,32 @@ +name: Unit tests + +on: + push: + branches: + - main + pull_request: + +permissions: {} + +jobs: + test: + strategy: + matrix: + python: + - "3.13" + - "3.14" + runs-on: ubuntu-latest + permissions: + contents: read # needed for private repos + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + + - name: Install uv + uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 # v7.6.0 + with: + python-version: ${{ matrix.python }} + + - name: test + run: make test diff --git a/.github/workflows/zizmor.yml b/.github/workflows/zizmor.yml new file mode 100644 index 0000000..3dc6e1d --- /dev/null +++ b/.github/workflows/zizmor.yml @@ -0,0 +1,25 @@ +name: GitHub Actions Security Analysis with zizmor 🌈 + +on: + push: + branches: ["main"] + pull_request: + branches: ["**"] + +permissions: {} + +jobs: + zizmor: + runs-on: ubuntu-latest + permissions: + security-events: write + contents: read # only needed for private repos + actions: read # only needed for private repos + steps: + - name: Checkout repository + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + + - name: Run zizmor 🌈 + uses: zizmorcore/zizmor-action@71321a20a9ded102f6e9ce5718a2fcec2c4f70d8 # v0.5.2 diff --git a/.gitignore b/.gitignore index 68bc17f..64d49ae 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ # Byte-compiled / optimized / DLL files __pycache__/ -*.py[cod] +*.py[codz] *$py.class # C extensions @@ -27,8 +27,8 @@ share/python-wheels/ MANIFEST # PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. *.manifest *.spec @@ -46,7 +46,7 @@ htmlcov/ nosetests.xml coverage.xml *.cover -*.py,cover +*.py.cover .hypothesis/ .pytest_cache/ cover/ @@ -92,22 +92,37 @@ ipython_config.py # However, in case of collaboration, if having platform-specific dependencies or dependencies # having no cross-platform support, pipenv may install dependencies that don't work, or not # install all needed dependencies. -#Pipfile.lock +# Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# uv.lock # poetry # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock +# poetry.lock +# poetry.toml # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. -#pdm.lock -# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it -# in version control. -# https://pdm.fming.dev/#use-with-ide -.pdm.toml +# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python. +# https://pdm-project.org/en/latest/usage/project/#working-with-version-control +# pdm.lock +# pdm.toml +.pdm-python +.pdm-build/ + +# pixi +# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control. +# pixi.lock +# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one +# in the .venv directory. It is recommended not to include this directory in version control. +.pixi # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ @@ -116,11 +131,25 @@ __pypackages__/ celerybeat-schedule celerybeat.pid +# Redis +*.rdb +*.aof +*.pid + +# RabbitMQ +mnesia/ +rabbitmq/ +rabbitmq-data/ + +# ActiveMQ +activemq-data/ + # SageMath parsed files *.sage.py # Environments .env +.envrc .venv env/ venv/ @@ -153,8 +182,35 @@ dmypy.json cython_debug/ # PyCharm -# JetBrains specific template is maintained in a separate JetBrains.gitignore that can -# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore -# and can be added to the global gitignore or merged into this file. For a more nuclear -# option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +# .idea/ + +# Abstra +# Abstra is an AI-powered process automation framework. +# Ignore directories containing user credentials, local state, and settings. +# Learn more at https://abstra.io/docs +.abstra/ + +# Visual Studio Code +# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore +# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore +# and can be added to the global gitignore or merged into this file. However, if you prefer, +# you could uncomment the following to ignore the entire vscode folder +# .vscode/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + +# Marimo +marimo/_static/ +marimo/_lsp/ +__marimo__/ + +# Streamlit +.streamlit/secrets.toml \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..d0458e8 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,42 @@ +# AGENTS.md + +## Project Overview + +Rip DVDs quickly and easily from the command line. +This is a command-line application with entry point `dvdrip`. + +## Stack + +- Package manager: uv +- Build backend: uv_build +- Linting/formatting: ruff +- Type checking: ty +- Testing: pytest +- CI: GitHub Actions (lint, test, release, docs) + +## Commands + +Use Makefile targets, not tool commands directly: + +- `make format` - Fix formatting issues +- `make lint` - Run all static checks (ruff, ty) +- `make test` - Run tests with coverage +- `make doc` - Generate documentation +- `make build` - Build the package +- `make run` - Run the CLI (use `ARGS="..."` for arguments) + +## Verification + +Run `make lint && make test` before committing. + +## Commit Messages + +- Summary line: max 50 chars (hard limit 72) +- Body lines: max 72 chars +- Use Markdown formatting in description +- Reference issues where relevant (e.g., `See #123`) + +## Project Layout + +- `src/dvdrip/` - Source code +- `test/` - Test files diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..43c994c --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +@AGENTS.md diff --git a/LICENSE b/LICENSE index f288702..b571e2f 100644 --- a/LICENSE +++ b/LICENSE @@ -1,5 +1,6 @@ - GNU GENERAL PUBLIC LICENSE - Version 3, 29 June 2007 + + GNU AFFERO GENERAL PUBLIC LICENSE + Version 3, 19 November 2007 Copyright (C) 2007 Free Software Foundation, Inc. Everyone is permitted to copy and distribute verbatim copies @@ -7,17 +8,15 @@ Preamble - The GNU General Public License is a free, copyleft license for -software and other kinds of works. + The GNU Affero General Public License is a free, copyleft license for +software and other kinds of works, specifically designed to ensure +cooperation with the community in the case of network server software. The licenses for most software and other practical works are designed to take away your freedom to share and change the works. By contrast, -the GNU General Public License is intended to guarantee your freedom to +our General Public Licenses are intended to guarantee your freedom to share and change all versions of a program--to make sure it remains free -software for all its users. We, the Free Software Foundation, use the -GNU General Public License for most of our software; it applies also to -any other work released this way by its authors. You can apply it to -your programs, too. +software for all its users. When we speak of free software, we are referring to freedom, not price. Our General Public Licenses are designed to make sure that you @@ -26,44 +25,34 @@ them if you wish), that you receive source code or can get it if you want it, that you can change the software or use pieces of it in new free programs, and that you know you can do these things. - To protect your rights, we need to prevent others from denying you -these rights or asking you to surrender the rights. Therefore, you have -certain responsibilities if you distribute copies of the software, or if -you modify it: responsibilities to respect the freedom of others. - - For example, if you distribute copies of such a program, whether -gratis or for a fee, you must pass on to the recipients the same -freedoms that you received. You must make sure that they, too, receive -or can get the source code. And you must show them these terms so they -know their rights. - - Developers that use the GNU GPL protect your rights with two steps: -(1) assert copyright on the software, and (2) offer you this License -giving you legal permission to copy, distribute and/or modify it. - - For the developers' and authors' protection, the GPL clearly explains -that there is no warranty for this free software. For both users' and -authors' sake, the GPL requires that modified versions be marked as -changed, so that their problems will not be attributed erroneously to -authors of previous versions. - - Some devices are designed to deny users access to install or run -modified versions of the software inside them, although the manufacturer -can do so. This is fundamentally incompatible with the aim of -protecting users' freedom to change the software. The systematic -pattern of such abuse occurs in the area of products for individuals to -use, which is precisely where it is most unacceptable. Therefore, we -have designed this version of the GPL to prohibit the practice for those -products. If such problems arise substantially in other domains, we -stand ready to extend this provision to those domains in future versions -of the GPL, as needed to protect the freedom of users. - - Finally, every program is threatened constantly by software patents. -States should not allow patents to restrict development and use of -software on general-purpose computers, but in those that do, we wish to -avoid the special danger that patents applied to a free program could -make it effectively proprietary. To prevent this, the GPL assures that -patents cannot be used to render the program non-free. + Developers that use our General Public Licenses protect your rights +with two steps: (1) assert copyright on the software, and (2) offer +you this License which gives you legal permission to copy, distribute +and/or modify the software. + + A secondary benefit of defending all users' freedom is that +improvements made in alternate versions of the program, if they +receive widespread use, become available for other developers to +incorporate. Many developers of free software are heartened and +encouraged by the resulting cooperation. However, in the case of +software used on network servers, this result may fail to come about. +The GNU General Public License permits making a modified version and +letting the public access it on a server without ever releasing its +source code to the public. + + The GNU Affero General Public License is designed specifically to +ensure that, in such cases, the modified source code becomes available +to the community. It requires the operator of a network server to +provide the source code of the modified version running there to the +users of that server. Therefore, public use of a modified version, on +a publicly accessible server, gives the public access to the source +code of the modified version. + + An older license, called the Affero General Public License and +published by Affero, was designed to accomplish similar goals. This is +a different license, not a version of the Affero GPL, but Affero has +released a new version of the Affero GPL which permits relicensing under +this license. The precise terms and conditions for copying, distribution and modification follow. @@ -72,7 +61,7 @@ modification follow. 0. Definitions. - "This License" refers to version 3 of the GNU General Public License. + "This License" refers to version 3 of the GNU Affero General Public License. "Copyright" also means copyright-like laws that apply to other kinds of works, such as semiconductor masks. @@ -549,35 +538,45 @@ to collect a royalty for further conveying from those to whom you convey the Program, the only way you could satisfy both those terms and this License would be to refrain entirely from conveying the Program. - 13. Use with the GNU Affero General Public License. + 13. Remote Network Interaction; Use with the GNU General Public License. + + Notwithstanding any other provision of this License, if you modify the +Program, your modified version must prominently offer all users +interacting with it remotely through a computer network (if your version +supports such interaction) an opportunity to receive the Corresponding +Source of your version by providing access to the Corresponding Source +from a network server at no charge, through some standard or customary +means of facilitating copying of software. This Corresponding Source +shall include the Corresponding Source for any work covered by version 3 +of the GNU General Public License that is incorporated pursuant to the +following paragraph. Notwithstanding any other provision of this License, you have permission to link or combine any covered work with a work licensed -under version 3 of the GNU Affero General Public License into a single +under version 3 of the GNU General Public License into a single combined work, and to convey the resulting work. The terms of this License will continue to apply to the part which is the covered work, -but the special requirements of the GNU Affero General Public License, -section 13, concerning interaction through a network will apply to the -combination as such. +but the work with which it is combined will remain governed by version +3 of the GNU General Public License. 14. Revised Versions of this License. The Free Software Foundation may publish revised and/or new versions of -the GNU General Public License from time to time. Such new versions will -be similar in spirit to the present version, but may differ in detail to +the GNU Affero General Public License from time to time. Such new versions +will be similar in spirit to the present version, but may differ in detail to address new problems or concerns. Each version is given a distinguishing version number. If the -Program specifies that a certain numbered version of the GNU General +Program specifies that a certain numbered version of the GNU Affero General Public License "or any later version" applies to it, you have the option of following the terms and conditions either of that numbered version or of any later version published by the Free Software Foundation. If the Program does not specify a version number of the -GNU General Public License, you may choose any version ever published +GNU Affero General Public License, you may choose any version ever published by the Free Software Foundation. If the Program specifies that a proxy can decide which future -versions of the GNU General Public License can be used, that proxy's +versions of the GNU Affero General Public License can be used, that proxy's public statement of acceptance of a version permanently authorizes you to choose that version for the Program. @@ -635,40 +634,29 @@ the "copyright" line and a pointer to where the full notice is found. Copyright (C) This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by + it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. + GNU Affero General Public License for more details. - You should have received a copy of the GNU General Public License + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . Also add information on how to contact you by electronic and paper mail. - If the program does terminal interaction, make it output a short -notice like this when it starts in an interactive mode: - - Copyright (C) - This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. - This is free software, and you are welcome to redistribute it - under certain conditions; type `show c' for details. - -The hypothetical commands `show w' and `show c' should show the appropriate -parts of the General Public License. Of course, your program's commands -might be different; for a GUI interface, you would use an "about box". + If your software can interact with users remotely through a computer +network, you should also make sure that it provides a way for users to +get its source. For example, if your program is a web application, its +interface could display a "Source" link that leads users to an archive +of the code. There are many ways you could offer source, and different +solutions will be better for different programs; see section 13 for the +specific requirements. You should also get your employer (if you work as a programmer) or school, if any, to sign a "copyright disclaimer" for the program, if necessary. -For more information on this, and how to apply and follow the GNU GPL, see +For more information on this, and how to apply and follow the GNU AGPL, see . - - The GNU General Public License does not permit incorporating your program -into proprietary programs. If your program is a subroutine library, you -may consider it more useful to permit linking proprietary applications with -the library. If this is what you want to do, use the GNU Lesser General -Public License instead of this License. But first, please read -. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ff8c8b6 --- /dev/null +++ b/Makefile @@ -0,0 +1,62 @@ +SHELL := /bin/bash + +PY_IMPORT = dvdrip + +# Optionally overridden by the user in the `test` target. +TESTS := + +# If the user selects a specific test pattern to run, set `pytest` to fail fast +# and only run tests that match the pattern. +# Otherwise, run all tests and enable coverage assertions, since we expect +# complete test coverage. +ifneq ($(TESTS),) + TEST_ARGS := -x -k $(TESTS) + COV_ARGS := +else + TEST_ARGS := + COV_ARGS := --fail-under 100 +endif + +.PHONY: all +all: + @echo "Run my targets individually!" + +.PHONY: dev +dev: + uv sync --group dev + uv run prek install +.PHONY: run +run: + uv run dvdrip $(ARGS) + +.PHONY: lint +lint: + uv sync --group lint + uv run ruff format --check && \ + uv run ruff check && \ + uv run ty check && \ + uv run interrogate -c pyproject.toml . + +.PHONY: format +format: + uv sync --group lint + uv run ruff format && \ + uv run ruff check --fix + +.PHONY: test +test: + uv sync --group test + uv run pytest -svv --timeout=300 --cov=$(PY_IMPORT) $(T) $(TEST_ARGS) + uv run coverage report -m $(COV_ARGS) + +.PHONY: doc +doc: + uv run pdoc -o html $(PY_IMPORT) + +.PHONY: build +build: + uv sync + +.PHONY: clean +clean: + rm -rf .venv/ dist/ .ruff_cache/ .pytest_cache/ .coverage diff --git a/README.md b/README.md index 11dc19a..a1cad71 100644 --- a/README.md +++ b/README.md @@ -1,44 +1,32 @@ # dvdrip -Rip DVDs quickly and easily from the commandline. - -## Dependencies - - [Python3](https://www.python.org/) - - [HandBrakeCLI](https://handbrake.fr/) - -## NOTE -This script has been tested on both Linux and Mac OS X with Python 3, -HandBrakeCLI and VLC installed (and also MacPorts in the case of OS X). - -## Features - - With minimal configuration: - - Encodes videos in mp4 files with h.264 video and aac audio. - (compatible with a wide variety of media players without - additional transcoding, including PS3, Roku, and most smart - phones, smart TVs and tablets). - - Preserves all audio tracks, all subtitle tracks, and chapter - markers. - - Intelligently chooses output filename based on a provided prefix. - - Generates one video file per DVD title, or optionally one per - chapter. - - Easy to read "scan" mode tells you what you need need to know about - a disk to decide on how to rip it. + +[![CI](https://github.com/evandowning/dvdrip/actions/workflows/tests.yml/badge.svg)](https://github.com/evandowning/dvdrip/actions/workflows/tests.yml) +[![PyPI version](https://badge.fury.io/py/dvdrip.svg)](https://pypi.org/project/dvdrip) +[![Packaging status](https://repology.org/badge/tiny-repos/python:dvdrip.svg)](https://repology.org/project/python:dvdrip/versions) + -## Usage +Rip DVDs quickly and easily from the command line. Encodes to mp4 (h.265 video, AAC audio) with all audio tracks, subtitles, and chapter markers preserved. + +## Requirements + +* [uv](https://docs.astral.sh/uv/) +* [HandBrakeCLI](https://handbrake.fr/downloads2.php) + +## Build + +```shell +make build ``` -$ python3 dvdrip.py -h + +## Usage + +```shell +uv run dvdrip -i /dev/cdrom -o output ``` -## Examples - - Determine number of chapters - ``` - $ python3 dvdrip.py --scan -i /path/to/cdrom - ``` - - Rip Movie (one file for the movie) - ``` - $ python3 dvdrip.py -i /path/to/cdrom -o output_name - ``` - - Rip TV Show (one file per episode) - ``` - $ python3 dvdrip.py -c -i /path/to/cdrom -o output_name - ``` +## Development + +```shell +make format lint test +``` diff --git a/dvdrip.py b/dvdrip.py deleted file mode 100644 index 2837841..0000000 --- a/dvdrip.py +++ /dev/null @@ -1,703 +0,0 @@ -#!/usr/bin/env python3 -# coding=utf-8 - -""" -Rip DVDs quickly and easily from the commandline. - -Features: - - With minimal configuration: - - Encodes videos in mp4 files with h.264 video and aac audio. - (compatible with a wide variety of media players without - additional transcoding, including PS3, Roku, and most smart - phones, smart TVs and tablets). - - Preserves all audio tracks, all subtitle tracks, and chapter - markers. - - Intelligently chooses output filename based on a provided prefix. - - Generates one video file per DVD title, or optionally one per - chapter. - - Easy to read "scan" mode tells you what you need need to know about - a disk to decide on how to rip it. - -Why I wrote this: - This script exists because I wanted a simple way to back up DVDs with - reasonably good compression and quality settings, and in a format I could - play on the various media players I own including PS3, Roku, smart TVs, - smartphones and tablets. Using mp4 files with h.264 video and aac audio seems - to be the best fit for these constraints. - - I also wanted it to preserve as much as possible: chapter markers, subtitles, - and (most of all) *all* of the audio tracks. My kids have a number of - bilingual DVDs, and I wanted to back these up so they don't have to handle - the physical disks, but can still watch their shows in either language. For - some reason HandBrakeCLI doesn't have a simple β€œencode all audio tracks” - option. - - This script also tries to be smart about the output name. You just tell it - the pathname prefix, eg: "/tmp/AwesomeVideo", and it'll decide whether to - produce a single file, "/tmp/AwesomeVideo.mp4", or a directory - "/tmp/AwesomeVideo/" which will contain separate files for each title, - depending on whether you're ripping a single title or multiple titles. - - -Using it, Step 1: - - The first step is to scan your DVD and decide whether or not you want - to split chapters. Here's an example of a disc with 6 episodes of a TV - show, plus a "bump", all stored as a single title. - - $ dvdrip --scan -i /dev/cdrom - Reading from '/media/EXAMPLE1' - Title 1/ 1: 02:25:33 720Γ—576 4:3 25 fps - audio 1: Chinese (5.1ch) [48000Hz, 448000bps] - chapter 1: 00:24:15 β—–β– β– β– β– β– β– β– β– β– β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β—— - chapter 2: 00:24:15 β—–β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β– β– β– β– β– β– β– β– β– β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β—— - chapter 3: 00:24:14 β—–β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β– β– β– β– β– β– β– β– β– β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β—— - chapter 4: 00:24:15 β—–β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β– β– β– β– β– β– β– β– β– β– β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β—— - chapter 5: 00:24:15 β—–β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β– β– β– β– β– β– β– β– β– β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β—— - chapter 6: 00:24:14 β—–β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β– β– β– β– β– β– β– β– β– β—— - chapter 7: 00:00:05 β—–β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β– β—— - - Knowing that this is 6 episodes of a TV show, I'd choose to split the - chapters. If it was a movie with 6 chapters, I would choose to not - split it. - - Here's a disc with 3 2-segment episodes of a show, plus two "bumps", - stored as 8 titles. - - Reading from '/media/EXAMPLE2' - Title 1/ 5: 00:23:22 720Γ—576 4:3 25 fps - audio 1: Chinese (2.0ch) [48000Hz, 192000bps] - audio 2: English (2.0ch) [48000Hz, 192000bps] - sub 1: English [(Bitmap)(VOBSUB)] - chapter 1: 00:11:41 β—–β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β—— - chapter 2: 00:11:41 β—–β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β—— - - Title 2/ 5: 00:22:40 720Γ—576 4:3 25 fps - audio 1: Chinese (2.0ch) [48000Hz, 192000bps] - audio 2: English (2.0ch) [48000Hz, 192000bps] - sub 1: English [(Bitmap)(VOBSUB)] - chapter 1: 00:11:13 β—–β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β—— - chapter 2: 00:11:28 β—–β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β—— - - Title 3/ 5: 00:22:55 720Γ—576 4:3 25 fps - audio 1: Chinese (2.0ch) [48000Hz, 192000bps] - audio 2: English (2.0ch) [48000Hz, 192000bps] - sub 1: English [(Bitmap)(VOBSUB)] - chapter 1: 00:15:56 β—–β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β—— - chapter 2: 00:06:59 β—–β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β€₯β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β– β—— - - Title 4/ 5: 00:00:08 720Γ—576 4:3 25 fps - audio 1: English (2.0ch) [None] - chapter 1: 00:00:08 β—–β—— - - Title 5/ 5: 00:00:05 720Γ—576 4:3 25 fps - chapter 1: 00:00:05 β—–β—— - - Given that these are 2-segment episodes (it's pretty common for kids' - shows to have two segments per episode -- essentially 2 "mini-episodes") you - can choose whether to do the default one video per title (episodes) or - split by chapter (segments / mini-episodes). - -Using it, Step 2: - - If you've decided to split by chapter, execute: - - dvdrip.py -c -i /dev/cdrom -o Output_Name - - Otherwise, leave out the -c flag. - - If there is only one video being ripped, it will be named Output_Name.mp4. If - there are multiple files, they will be placed in a new directory called - Output_Name. - -Limitations: - - This script has been tested on both Linux and Mac OS X with Python 3, - HandBrakeCLI and VLC installed (and also MacPorts in the case of OS X). -""" - -# TODO: Detect if HandBrakeCLI is burning in vobsubs. -# TODO: Support half-open ranges in title specs (DVD title numbers range from -# 1-99) -# TODO: Deal with failed scan of first title better. - -import ctypes -import argparse -import os -import re -import stat -import subprocess -import sys -import time - -from pprint import pprint -from collections import namedtuple -from math import gcd - - -class UserError(Exception): - def __init__(self, message): - self.message = message - -CHAR_ENCODING = 'UTF-8' - -def check_err(*popenargs, **kwargs): - process = subprocess.Popen(stderr=subprocess.PIPE, *popenargs, **kwargs) - _, stderr = process.communicate() - retcode = process.poll() - if retcode: - cmd = kwargs.get("args") - if cmd is None: - cmd = popenargs[0] - raise subprocess.CalledProcessError(retcode, cmd, output=stderr) - return stderr.decode(CHAR_ENCODING, 'replace') - -def check_output(*args, **kwargs): - s = subprocess.check_output(*args, **kwargs).decode(CHAR_ENCODING) - return s.replace(os.linesep, '\n') - -HANDBRAKE = 'HandBrakeCLI' - -TITLE_COUNT_REGEXES = [ - re.compile(r'^Scanning title \d+ of (\d+)\.\.\.$'), - re.compile(r'^\[\d\d:\d\d:\d\d] scan: DVD has (\d+) title\(s\)$'), -] - -def FindTitleCount(scan, verbose): - for regex in TITLE_COUNT_REGEXES: - for line in scan: - m = regex.match(line) - if m: break - if m: - return int(m.group(1)) - if verbose: - for line in scan: - print(line) - raise AssertionError("Can't find TITLE_COUNT_REGEX in scan") - - -STRUCTURED_LINE_RE = re.compile(r'( *)\+ (([a-z0-9 ]+):)?(.*)') - -def ExtractTitleScan(scan): - result = [] - in_title_scan = False - for line in scan: - if not in_title_scan: - if line.startswith('+'): - in_title_scan = True - if in_title_scan: - m = STRUCTURED_LINE_RE.match(line) - if m: - result.append(line) - else: - break - return tuple(result) - - -TRACK_VALUE_RE = re.compile(r'(\d+), (.*)') - -def MassageTrackData(node, key): - if key in node: - track_data = node[key] - if type(track_data) is list: - new_track_data = {} - for track in track_data: - k, v = TRACK_VALUE_RE.match(track).groups() - new_track_data[k] = v - node[key] = new_track_data - -def ParseTitleScan(scan): - pos, result = ParseTitleScanHelper(scan, pos=0, indent=0) - - # HandBrakeCLI inexplicably uses a comma instead of a colon to - # separate the track identifier from the track data in the "audio - # tracks" and "subtitle tracks" nodes, so we "massage" these parsed - # nodes to get a consistent parsed reperesentation. - for value in result.values(): - MassageTrackData(value, 'audio tracks') - MassageTrackData(value, 'subtitle tracks') - return result - -def ParseTitleScanHelper(scan, pos, indent): - result = {} - cruft = [] - while True: - pos, node = ParseNode(scan, pos=pos, indent=indent) - if node: - if type(node) is tuple: - k, v = node - result[k] = v - else: - cruft.append(node) - result[None] = cruft - else: - break - if len(result) == 1 and None in result: - result = result[None] - return pos, result - -def ParseNode(scan, pos, indent): - if pos >= len(scan): - return pos, None - line = scan[pos] - spaces, colon, name, value = STRUCTURED_LINE_RE.match(line).groups() - spaces = len(spaces) / 2 - if spaces < indent: - return pos, None - assert spaces == indent, '%d <> %r' % (indent, line) - pos += 1 - if colon: - if value: - node = (name, value) - else: - pos, children = ParseTitleScanHelper(scan, pos, indent + 1) - node = (name, children) - else: - node = value - return pos, node - -def only(iterable): - """ - Return the one and only element in iterable. - - Raises an ValueError if iterable does not have exactly one item. - """ - result, = iterable - return result - -Title = namedtuple('Title', ['number', 'info']) -Task = namedtuple('Task', ['title', 'chapter']) - -TOTAL_EJECT_SECONDS = 5 -EJECT_ATTEMPTS_PER_SECOND = 10 - -class DVD: - def __init__(self, mountpoint, verbose, mount_timeout=0): - if stat.S_ISBLK(os.stat(mountpoint).st_mode): - mountpoint = FindMountPoint(mountpoint, mount_timeout) - if not os.path.isdir(mountpoint): - raise UserError('%r is not a directory' % mountpoint) - self.mountpoint = mountpoint - self.verbose = verbose - - def RipTitle(self, task, output, dry_run, verbose): - if verbose: - print('Title Scan:') - pprint(task.title.info) - print('-' * 78) - - audio_tracks = task.title.info['audio tracks'].keys() - audio_encoders = ['faac'] * len(audio_tracks) - subtitles = task.title.info['subtitle tracks'].keys() - - args = [ - HANDBRAKE, - '--title', str(task.title.number), - '--preset', "Production Standard", - '--encoder', 'x264', - '--audio', ','.join(audio_tracks), - '--aencoder', ','.join(audio_encoders), - ] - if task.chapter is not None: - args += [ - '--chapters', str(task.chapter), - ] - if subtitles: - args += [ - '--subtitle', ','.join(subtitles), - ] - args += [ - '--markers', - '--optimize', - #'--no-dvdnav', # TODO: turn this on as a fallback - '--input', self.mountpoint, - '--output', output, - ] - if verbose: - print(' '.join(('\n ' + a) - if a.startswith('-') else a for a in args)) - print('-' * 78) - if not dry_run: - if verbose: - subprocess.call(args) - else: - check_err(args) - - def ScanTitle(self, i): - for line in check_err([ - HANDBRAKE, - #'--no-dvdnav', # TODO: turn this on as a fallback - '--scan', - '--title', str(i), - '-i', - self.mountpoint], stdout=subprocess.PIPE).split(os.linesep): - if self.verbose: - print('< %s' % line.rstrip()) - yield line - - def ScanTitles(self, title_numbers, verbose): - """ - Returns an iterable of parsed titles. - """ - first = title_numbers[0] if title_numbers else 1 - raw_scan = tuple(self.ScanTitle(first)) - title_count = FindTitleCount(raw_scan, verbose) - print('Disc claims to have %d titles.' % title_count) - title_name, title_info = only( - ParseTitleScan(ExtractTitleScan(raw_scan)).items()) - del raw_scan - - def MakeTitle(name, number, info): - assert ('title %d' % number) == name - info['duration'] = ExtractDuration('duration ' + info['duration']) - return Title(number, info) - - yield MakeTitle(title_name, first, title_info) - - to_scan = [x for x in range(1, title_count + 1) - if x != first - and ((not title_numbers) - or x in title_numbers)] - for i in to_scan: - try: - scan = ExtractTitleScan(self.ScanTitle(i)) - except subprocess.CalledProcessError as exc: - warn("Cannot scan title %d." % i) - else: - title_info_names = ParseTitleScan(scan).items() - if title_info_names: - title_name, title_info = only(title_info_names) - yield MakeTitle(title_name, i, title_info) - else: - warn("Cannot parse scan of title %d." % i) - - def Eject(self): - if os.name == 'nt': - if len(self.mountpoint) < 4 and self.mountpoint[1] == ':': - # mountpoint is only a drive letter like "F:" or "F:\" not a subdirectory - drive_letter = self.mountpoint[0] - ctypes.windll.WINMM.mciSendStringW("open %s: type CDAudio alias %s_drive" % (drive_letter, drive_letter), None, 0, None) - ctypes.windll.WINMM.mciSendStringW("set %s_drive door open" % drive_letter, None, 0, None) - return - - # TODO: this should really be a while loop that terminates once a - # deadline is met. - for i in range(TOTAL_EJECT_SECONDS * EJECT_ATTEMPTS_PER_SECOND): - if not subprocess.call(['eject', self.mountpoint]): - return - time.sleep(1.0 / EJECT_ATTEMPTS_PER_SECOND) - -def ParseDuration(s): - result = 0 - for field in s.strip().split(':'): - result *= 60 - result += int(field) - return result - -def FindMountPoint(dev, timeout): - regex = re.compile(r'^' + re.escape(os.path.realpath(dev)) + r'\b') - - now = time.time() - end_time = now + timeout - while end_time >= now: - for line in check_output(['df', '-P']).split('\n'): - m = regex.match(line) - if m: - line = line.split(None, 5) - if len(line) > 1: - return line[-1] - time.sleep(0.1) - now = time.time() - raise UserError('%r not mounted.' % dev) - -def FindMainFeature(titles, verbose=False): - if verbose: - print('Attempting to determine main feature of %d titles...' - % len(titles)) - main_feature = max(titles, - key=lambda title: ParseDuration(title.info['duration'])) - if verbose: - print('Selected %r as main feature.' % main_feature.number) - print() - -def ConstructTasks(titles, chapter_split): - for title in titles: - num_chapters = len(title.info['chapters']) - if chapter_split and num_chapters > 1: - for chapter in range(1, num_chapters + 1): - yield Task(title, chapter) - else: - yield Task(title, None) - -def TaskFilenames(tasks, output, dry_run=False): - if (len(tasks) > 1): - def ComputeFileName(task): - if task.chapter is None: - return os.path.join(output, - 'Title%02d.mp4' % task.title.number) - else: - return os.path.join(output, - 'Title%02d_%02d.mp4' - % (task.title.number, task.chapter)) - if not dry_run: - os.makedirs(output) - else: - def ComputeFileName(task): - return '%s.mp4' % output - result = [ComputeFileName(task) for task in tasks] - if len(set(result)) != len(result): - raise UserError("multiple tasks use same filename") - return result - -def PerformTasks(dvd, tasks, title_count, filenames, - dry_run=False, verbose=False): - for task, filename in zip(tasks, filenames): - print('=' * 78) - if task.chapter is None: - print('Title %s / %s => %r' - % (task.title.number, title_count, filename)) - else: - num_chapters = len(task.title.info['chapters']) - print('Title %s / %s , Chapter %s / %s=> %r' - % (task.title.number, title_count, task.chapter, - num_chapters, filename)) - print('-' * 78) - dvd.RipTitle(task, filename, dry_run, verbose) - -Size = namedtuple('Size', - ['width', 'height', 'pix_aspect_width', 'pix_aspect_height', 'fps']) - -SIZE_REGEX = re.compile( - r'^\s*(\d+)x(\d+),\s*' - r'pixel aspect: (\d+)/(\d+),\s*' - r'display aspect: (?:\d+(?:\.\d+)),\s*' - r'(\d+(?:\.\d+)) fps\s*$') - -SIZE_CTORS = [int] * 4 + [float] - -def ParseSize(s): - return Size(*(f(x) - for f, x in zip(SIZE_CTORS, SIZE_REGEX.match(s).groups()))) - -def ComputeAspectRatio(size): - w = size.width * size.pix_aspect_width - h = size.height * size.pix_aspect_height - d = gcd(w, h) - return (w // d, h // d) - -DURATION_REGEX = re.compile( - r'^(?:.*,)?\s*duration\s+(\d\d):(\d\d):(\d\d)\s*(?:,.*)?$') - -class Duration(namedtuple('Duration', 'hours minutes seconds')): - def __str__(self): - return '%02d:%02d:%02d' % (self) - - def in_seconds(self): - return 60 * (60 * self.hours + self.minutes) + self.seconds - -def ExtractDuration(s): - return Duration(*map(int, DURATION_REGEX.match(s).groups())) - -Chapter = namedtuple('Chapter', 'number duration') - -def ParseChapters(d): - """ - Parses dictionary of (str) chapter numbers to chapter. - - Result will be an iterable of Chapter objects, sorted by number. - """ - for number, info in sorted(((int(n), info) for (n, info) in d.items())): - yield Chapter(number, ExtractDuration(info)) - -AUDIO_TRACK_REGEX = re.compile( - r'^(\S+)\s*((?:\([^)]*\)\s*)*)(?:,\s*(.*))?$') - -AUDIO_TRACK_FIELD_REGEX = re.compile( - r'^\(([^)]*)\)\s*\(([^)]*?)\s*ch\)\s*' + - r'((?:\([^()]*\)\s*)*)\(iso639-2:\s*([^)]+)\)$') - -AudioTrack = namedtuple('AudioTrack', - 'number lang codec channels iso639_2 extras') - -def ParseAudioTracks(d): - for number, info in sorted(((int(n), info) for (n, info) in d.items())): - m = AUDIO_TRACK_REGEX.match(info) - if m: - lang, field_string, extras = m.groups() - m2 = AUDIO_TRACK_FIELD_REGEX.match(field_string) - if m2: - codec, channels, more_extras, iso639_2 = m2.groups() - if more_extras: - extras = more_extras + extras - yield AudioTrack(number, lang, codec, channels, - iso639_2, extras) - else: - warn('Cannot parse audio track fields %r' % field_string) - else: - warn('Cannot parse audio track info %r' % info) - -SubtitleTrack = namedtuple('SubtitleTrack', - 'number info') - -def ParseSubtitleTracks(d): - for number, info in sorted(((int(n), info) for (n, info) in d.items())): - yield SubtitleTrack(number, info) - -def RenderBar(start, length, total, width): - end = start + length - start = int(round(start * (width - 1) / total)) - length = int(round(end * (width - 1) / total)) - start + 1 - return ('β€₯' * start + - 'β– ' * length + - 'β€₯' * (width - start - length)) - -MAX_BAR_WIDTH = 50 - -def DisplayScan(titles): - max_title_seconds = max( - title.info['duration'].in_seconds() - for title in titles) - - for title in titles: - info = title.info - size = ParseSize(info['size']) - xaspect, yaspect = ComputeAspectRatio(size) - duration = info['duration'] - title_seconds = duration.in_seconds() - print('Title % 3d/% 3d: %s %dΓ—%d %d:%d %3g fps' % - (title.number, len(titles), duration, size.width, - size.height, xaspect, yaspect, size.fps)) - for at in ParseAudioTracks(info['audio tracks']): - print(' audio % 3d: %s (%sch) [%s]' % - (at.number, at.lang, at.channels, at.extras)) - for sub in ParseSubtitleTracks(info['subtitle tracks']): - print(' sub % 3d: %s' % - (sub.number, sub.info)) - position = 0 - if title_seconds > 0: - for chapter in ParseChapters(info['chapters']): - seconds = chapter.duration.in_seconds() - bar_width = int(round( - MAX_BAR_WIDTH * title_seconds / max_title_seconds)) - bar = RenderBar(position, seconds, title_seconds, bar_width) - print(' chapter % 3d: %s β—–%sβ——' - % (chapter.number, chapter.duration, bar)) - position += seconds - print() - -def ParseArgs(): - description, epilog = __doc__.strip().split('\n', 1) - parser = argparse.ArgumentParser(description=description, epilog=epilog, - formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument('-v', '--verbose', - action='store_true', - help="Increase verbosity.") - parser.add_argument('-c', '--chapter_split', - action='store_true', - help="Split each chapter out into a separate file.") - parser.add_argument('-n', '--dry-run', - action='store_true', - help="Don't actually write anything.") - parser.add_argument('--scan', - action='store_true', - help="Display scan of disc; do not rip.") - parser.add_argument('--main-feature', - action='store_true', - help="Rip only the main feature title.") - parser.add_argument('-t', '--titles', - default="*", - help="""Comma-separated list of title numbers to consider - (starting at 1) or * for all titles.""") - parser.add_argument('-i', '--input', - help="Volume to rip (must be a directory).", required=True) - parser.add_argument('-o', '--output', - help="""Output location. Extension is added if only one title - being ripped, otherwise, a directory will be created to contain - ripped titles.""") - parser.add_argument('--mount-timeout', - default=15, - help="Amount of time to wait for a mountpoint to be mounted", - type=float) - args = parser.parse_args() - if not args.scan and args.output is None: - raise UserError("output argument is required") - return args - -# TODO: make it possible to have ranges with no end (meaning they end at last -# title) -NUM_RANGE_REGEX = re.compile(r'^(\d*)-(\d+)|(\d+)$') -def parse_titles_arg(titles_arg): - if titles_arg == '*': - return None # all titles - else: - def str_to_ints(s): - m = NUM_RANGE_REGEX.match(s) - if not m : - raise UserError( - "--titles must be * or list of integer ranges, found %r" % - titles_arg) - else: - start,end,only = m.groups() - if only is not None: - return [int(only)] - else: - start = int(start) if start else 1 - end = int(end) - return range(start, end + 1) - result = set() - for s in titles_arg.split(','): - result.update(str_to_ints(s)) - result = sorted(list(result)) - return result - -def main(): - args = ParseArgs() - dvd = DVD(args.input, args.verbose, args.mount_timeout) - print('Reading from %r' % dvd.mountpoint) - title_numbers = parse_titles_arg(args.titles) - titles = tuple(dvd.ScanTitles(title_numbers, args.verbose)) - - if args.scan: - DisplayScan(titles) - else: - if args.main_feature and len(titles) > 1: - # TODO: make this affect scan as well - titles = [FindMainFeature(titles, args.verbose)] - - if not titles: - raise UserError("No titles to rip") - else: - if not args.output: - raise UserError("No output specified") - print('Writing to %r' % args.output) - tasks = tuple(ConstructTasks(titles, args.chapter_split)) - - filenames = TaskFilenames(tasks, args.output, dry_run=args.dry_run) - # Don't stomp on existing files - for filename in filenames: - if os.path.exists(filename): - raise UserError('%r already exists' % filename) - - PerformTasks(dvd, tasks, len(titles), filenames, - dry_run=args.dry_run, verbose=args.verbose) - - print('=' * 78) - if not args.dry_run: - dvd.Eject() - -def warn(msg): - print('warning: %s' % (msg,), file=sys.stderr) - -if __name__ == '__main__': - error = None - try: - main() - except FileExistsError as exc: - error = '%s: %r' % (exc.strerror, exc.filename) - except UserError as exc: - error = exc.message - - if error is not None: - print('%s: error: %s' - % (os.path.basename(sys.argv[0]), error), file=sys.stderr) - sys.exit(1) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..87b1f9f --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,116 @@ +[project] +name = "dvdrip" +version = "0.0.1" +description = "Rip DVDs quickly and easily from the command line." +readme = "README.md" +license-files = ["LICENSE"] +license = "AGPL-3.0-or-later" + +authors = [ + { name = "Laurence Gonsalves" }, + { name = "Evan Downing" }, +] +classifiers = [ + "Programming Language :: Python :: 3", +] +dependencies = ["pydantic-settings>=2.9,<3"] +requires-python = ">=3.13" + +[build-system] +requires = ["uv_build>=0.11.0,<0.12.0"] +build-backend = "uv_build" + +[dependency-groups] +doc = ["pdoc"] +test = ["pytest", "pytest-cov", "pytest-timeout", "pretend", "coverage[toml]"] +lint = [ + # NOTE: ruff is under active development, so we pin conservatively here + # and let Dependabot periodically perform this update. + "ruff ~= 0.14.0", + "ty >=0.0.8", + "interrogate", +] +dev = [ + {include-group = "doc"}, + {include-group = "test"}, + {include-group = "lint"}, + "prek", +] + +[project.scripts] +"dvdrip" = "dvdrip._cli:main" + +[project.urls] +Issues = "https://github.com/xenomachina/dvdrip/issues" +Source = "https://github.com/xenomachina/dvdrip" + +[tool.coverage.run] +# don't attempt code coverage for the CLI entrypoints +omit = ["src/dvdrip/_cli.py"] + +[tool.ty.terminal] +error-on-warning = true + +[tool.ty.environment] +python-version = "3.13" + +[tool.ty.src] +include = ["src", "test"] + +[tool.ruff] +line-length = 100 +target-version = "py313" + +[tool.ruff.format] +line-ending = "lf" +quote-style = "double" + +[tool.ruff.lint] +select = ["ALL"] +ignore = [ + "D203", # Incompatible with D211 + "D213", # Incompatible with D212 + "COM812", # Can conflict with formatter + "ISC001", # Can conflict with formatter +] + +[tool.ruff.lint.mccabe] +# Maximum cyclomatic complexity +max-complexity = 8 + +[tool.ruff.lint.pydocstyle] +# Use Google-style docstrings +convention = "google" + +[tool.ruff.lint.pylint] +# Maximum number of branches for function or method +max-branches = 12 +# Maximum number of return statements in function or method +max-returns = 6 +# Maximum number of positional arguments for function or method +max-positional-args = 5 + +[tool.ruff.lint.per-file-ignores] +"src/dvdrip/_cli.py" = [ + "TRY400", # user-facing errors should not include tracebacks +] +"test/**/*.py" = [ + "D", # no docstrings in tests + "S101", # asserts are expected in tests + "PLR2004", # Allow magic values in tests +] +"**/conftest.py" = ["D"] # No docstrings in pytest config +[tool.interrogate] +# don't enforce documentation coverage for packaging, testing, the virtual +# environment, or the CLI (which is documented separately). +exclude = ["env", "test", "src/dvdrip/_cli.py"] +ignore-semiprivate = true +fail-under = 100 + +[tool.pytest.ini_options] +testpaths = ["test"] +python_files = ["test_*.py"] +addopts = "--durations=10" + +[tool.uv.sources] +dvdrip = { workspace = true } diff --git a/src/dvdrip/__init__.py b/src/dvdrip/__init__.py new file mode 100644 index 0000000..bc048d4 --- /dev/null +++ b/src/dvdrip/__init__.py @@ -0,0 +1,31 @@ +"""The ``dvdrip`` APIs.""" + +import importlib.metadata + +from dvdrip._dvd import DVD +from dvdrip._errors import UserError +from dvdrip._models import ( + AudioTrack, + Chapter, + Duration, + Size, + SubtitleTrack, + Task, + Title, + TitleInfo, +) + +__all__ = [ + "DVD", + "AudioTrack", + "Chapter", + "Duration", + "Size", + "SubtitleTrack", + "Task", + "Title", + "TitleInfo", + "UserError", +] + +__version__ = importlib.metadata.version("dvdrip") diff --git a/src/dvdrip/__main__.py b/src/dvdrip/__main__.py new file mode 100644 index 0000000..56561b8 --- /dev/null +++ b/src/dvdrip/__main__.py @@ -0,0 +1,6 @@ +"""The `python -m dvdrip` entrypoint.""" + +if __name__ == "__main__": # pragma: no cover + from dvdrip._cli import main + + main() diff --git a/src/dvdrip/_cli.py b/src/dvdrip/_cli.py new file mode 100644 index 0000000..8843d73 --- /dev/null +++ b/src/dvdrip/_cli.py @@ -0,0 +1,204 @@ +"""The ``dvdrip`` CLI entrypoint.""" + +import logging +import re +import sys +from pathlib import Path + +from pydantic import AliasChoices, Field +from pydantic_settings import BaseSettings, CliImplicitFlag + +from dvdrip._display import display_scan +from dvdrip._dvd import DVD +from dvdrip._errors import UserError +from dvdrip._tasks import ( + construct_tasks, + find_main_feature, + perform_tasks, + task_filenames, +) + +_NUM_RANGE_REGEX = re.compile(r"^(\d*)-(\d+)|(\d+)$") + +_logger = logging.getLogger(__name__) + + +class Settings(BaseSettings, cli_parse_args=True, cli_exit_on_error=True): + """Rip DVDs quickly and easily from the command line.""" + + verbose: CliImplicitFlag[bool] = Field( + default=False, + validation_alias=AliasChoices("v", "verbose"), + description="Increase verbosity.", + ) + chapter_split: CliImplicitFlag[bool] = Field( + default=False, + validation_alias=AliasChoices("c", "chapter-split", "chapter_split"), + description="Split each chapter out into a separate file.", + ) + dry_run: CliImplicitFlag[bool] = Field( + default=False, + validation_alias=AliasChoices("n", "dry-run", "dry_run"), + description="Don't actually write anything.", + ) + scan: CliImplicitFlag[bool] = Field( + default=False, + validation_alias=AliasChoices("scan"), + description="Display scan of disc; do not rip.", + ) + main_feature: CliImplicitFlag[bool] = Field( + default=False, + validation_alias=AliasChoices("main-feature", "main_feature"), + description="Rip only the main feature title.", + ) + preset: str = Field( + default="HQ 2160p60 4K AV1 Surround", + validation_alias=AliasChoices("p", "preset"), + description="HandBrakeCLI preset name.", + ) + titles: str = Field( + default="*", + validation_alias=AliasChoices("t", "titles"), + description=( + "Comma-separated list of title numbers to consider (starting at 1) or * for all titles." + ), + ) + input: str = Field( + validation_alias=AliasChoices("i", "input"), + description="Volume to rip (must be a directory).", + ) + output: str | None = Field( + default=None, + validation_alias=AliasChoices("o", "output"), + description=( + "Output location. Extension is added if only one title " + "being ripped, otherwise, a directory will be created " + "to contain ripped titles." + ), + ) + mount_timeout: float = Field( + default=15, + validation_alias=AliasChoices("mount-timeout", "mount_timeout"), + description="Amount of time to wait for a mountpoint to be mounted.", + ) + + +def _parse_titles_arg(titles_arg: str) -> list[int] | None: + """Parse the --titles argument into a list of title numbers. + + Args: + titles_arg: A string like "*", "1,3-5,7", or "2". + + Raises: + UserError: If the format is invalid. + + Returns: + List of title numbers, or None for all titles. + """ + if titles_arg == "*": + return None + + result: set[int] = set() + for s in titles_arg.split(","): + m = _NUM_RANGE_REGEX.match(s) + if not m: + msg = f"--titles must be * or list of integer ranges, found {titles_arg!r}" + raise UserError(msg) + start, end, only_val = m.groups() + if only_val is not None: + result.add(int(only_val)) + else: + start_int = int(start) if start else 1 + result.update(range(start_int, int(end) + 1)) + return sorted(result) + + +def _run(settings: Settings) -> None: + """Execute the main dvdrip workflow. + + Args: + settings: Parsed CLI settings. + """ + if not settings.scan and settings.output is None: + msg = "output argument is required" + raise UserError(msg) + + dvd = DVD( + settings.input, + verbose=settings.verbose, + mount_timeout=settings.mount_timeout, + ) + _logger.info("Reading from %r", dvd.mountpoint) + title_numbers = _parse_titles_arg(settings.titles) + titles = dvd.scan_titles(title_numbers) + + if settings.scan: + display_scan(titles) + return + + _rip(settings, dvd, titles) + + +def _rip(settings: Settings, dvd: DVD, titles: list) -> None: + """Rip titles from a DVD based on settings. + + Args: + settings: Parsed CLI settings. + dvd: The DVD to rip from. + titles: List of scanned titles. + """ + if settings.main_feature and len(titles) > 1: + titles = [find_main_feature(titles, verbose=settings.verbose)] + + if not titles: + msg = "No titles to rip" + raise UserError(msg) + + if not settings.output: + msg = "No output specified" + raise UserError(msg) + + _logger.info("Writing to %r", settings.output) + tasks = construct_tasks(titles, chapter_split=settings.chapter_split) + + filenames = task_filenames( + tasks, + settings.output, + dry_run=settings.dry_run, + ) + for filename in filenames: + if Path(filename).exists(): + msg = f"{filename!r} already exists" + raise UserError(msg) + + perform_tasks( + dvd, + tasks, + filenames, + preset=settings.preset, + dry_run=settings.dry_run, + verbose=settings.verbose, + ) + + if not settings.dry_run: + dvd.eject() + + +def main() -> None: + """CLI entrypoint for dvdrip.""" + settings = Settings() # type: ignore[call-arg] # ty: ignore[missing-argument] + + logging.basicConfig( + level=logging.DEBUG if settings.verbose else logging.INFO, + format="%(message)s", + stream=sys.stdout, + ) + + try: + _run(settings) + except FileExistsError as exc: + _logger.error("%s: %r", exc.strerror, exc.filename) + sys.exit(1) + except UserError as exc: + _logger.error("%s", exc.message) + sys.exit(1) diff --git a/src/dvdrip/_display.py b/src/dvdrip/_display.py new file mode 100644 index 0000000..9baf9a9 --- /dev/null +++ b/src/dvdrip/_display.py @@ -0,0 +1,99 @@ +"""Display functions for DVD scan results.""" + +import logging +from typing import Final + +from dvdrip._models import Title +from dvdrip._parsing import ( + compute_aspect_ratio, + parse_audio_tracks, + parse_chapters, + parse_size, + parse_subtitle_tracks, +) + +MAX_BAR_WIDTH: Final[int] = 50 + +_logger = logging.getLogger(__name__) + + +def render_bar( + start: int, + length: int, + total: int, + width: int, +) -> str: + """Render a Unicode progress bar segment. + + Args: + start: Start position in the total range. + length: Length of this segment. + total: Total range. + width: Character width of the bar. + + Returns: + A string of Unicode block and dot characters. + """ + end = start + length + bar_start = round(start * (width - 1) / total) + bar_length = round(end * (width - 1) / total) - bar_start + 1 + return ( + "\u2025" * bar_start + "\u25a0" * bar_length + "\u2025" * (width - bar_start - bar_length) + ) + + +def display_scan(titles: list[Title]) -> None: + """Display a formatted scan of DVD titles. + + Args: + titles: List of Title models to display. + """ + max_title_seconds = max(title.info.duration.in_seconds() for title in titles) + + for title in titles: + info = title.info + size = parse_size(info.size) + xaspect, yaspect = compute_aspect_ratio(size) + duration = info.duration + title_seconds = duration.in_seconds() + _logger.info( + "Title %3d/%3d: %s %d\u00d7%d %d:%d %3g fps", + title.number, + len(titles), + duration, + size.width, + size.height, + xaspect, + yaspect, + size.fps, + ) + for at in parse_audio_tracks(info.audio_tracks): + _logger.info( + " audio %3d: %s (%sch) [%s]", + at.number, + at.lang, + at.channels, + at.extras, + ) + for sub in parse_subtitle_tracks(info.subtitle_tracks): + _logger.info(" sub %3d: %s", sub.number, sub.info) + position = 0 + if title_seconds > 0: + for chapter in parse_chapters(info.chapters): + seconds = chapter.duration.in_seconds() + bar_width = round( + MAX_BAR_WIDTH * title_seconds / max_title_seconds, + ) + bar = render_bar( + position, + seconds, + title_seconds, + bar_width, + ) + _logger.info( + " chapter %3d: %s \u25d6%s\u25d7", + chapter.number, + chapter.duration, + bar, + ) + position += seconds diff --git a/src/dvdrip/_dvd.py b/src/dvdrip/_dvd.py new file mode 100644 index 0000000..99c710a --- /dev/null +++ b/src/dvdrip/_dvd.py @@ -0,0 +1,276 @@ +"""DVD scanning, ripping, and ejection.""" + +import ctypes +import logging +import os +import re +import stat +import subprocess +import sys +import time +from pathlib import Path +from pprint import pformat +from typing import Final + +from dvdrip._errors import UserError +from dvdrip._models import Task, Title, TitleInfo +from dvdrip._parsing import ( + extract_duration, + extract_title_scan, + parse_title_scan, +) +from dvdrip._subprocess import check_err, check_output + +_logger = logging.getLogger(__name__) + +HANDBRAKE: Final[str] = "HandBrakeCLI" +TOTAL_EJECT_SECONDS: Final[int] = 5 +EJECT_ATTEMPTS_PER_SECOND: Final[int] = 10 + + +def find_mount_point(dev: str, timeout: float) -> str: + """Find the mount point for a block device. + + Args: + dev: Path to the block device. + timeout: Maximum seconds to wait for the device to be mounted. + + Raises: + UserError: If the device is not mounted within the timeout. + """ + regex = re.compile(r"^" + re.escape(os.path.realpath(dev)) + r"\b") + + now = time.time() + end_time = now + timeout + while end_time >= now: + for line in check_output(["df", "-P"]).split("\n"): + m = regex.match(line) + if m: + parts = line.split(None, 5) + if len(parts) > 1: + return parts[-1] + time.sleep(0.1) + now = time.time() + msg = f"{dev!r} not mounted." + raise UserError(msg) + + +class DVD: + """Represents a DVD volume for scanning and ripping.""" + + def __init__( + self, + mountpoint: str, + *, + verbose: bool, + mount_timeout: float = 0, + ) -> None: + """Initialize a DVD from a mountpoint or block device. + + Args: + mountpoint: Path to the DVD mount or block device. + verbose: If True, print scan output. + mount_timeout: Seconds to wait for device mount. + """ + if stat.S_ISBLK(Path(mountpoint).stat().st_mode): + mountpoint = find_mount_point(mountpoint, mount_timeout) + if not Path(mountpoint).is_dir(): + msg = f"{mountpoint!r} is not a directory" + raise UserError(msg) + self.mountpoint = mountpoint + self.verbose = verbose + + def rip_title( + self, + task: Task, + output: str, + *, + preset: str, + dry_run: bool, + verbose: bool, + ) -> None: + """Rip a single DVD title or chapter using HandBrakeCLI. + + Args: + task: The ripping task containing title and chapter info. + output: Output file path. + preset: HandBrakeCLI preset name. + dry_run: If True, do not actually write files. + verbose: If True, print detailed progress. + """ + if verbose: + _logger.debug("Title Scan:\n%s", pformat(task.title.info.model_dump())) + + audio_tracks = list(task.title.info.audio_tracks.keys()) + audio_encoders = ["faac"] * len(audio_tracks) + subtitles = list(task.title.info.subtitle_tracks.keys()) + + args = [ + HANDBRAKE, + "--title", + str(task.title.number), + "--preset", + preset, + "--audio", + ",".join(audio_tracks), + "--aencoder", + ",".join(audio_encoders), + ] + if task.chapter is not None: + args += ["--chapters", str(task.chapter)] + if subtitles: + args += ["--subtitle", ",".join(subtitles)] + args += [ + "--markers", + "--optimize", + "--input", + self.mountpoint, + "--output", + output, + ] + if verbose: + _logger.debug( + "HandBrakeCLI args:\n%s", + " ".join(("\n " + a) if a.startswith("-") else a for a in args), + ) + if not dry_run: + if verbose: + subprocess.call(args) # noqa: S603 + else: + check_err(args) + + def scan_title(self, i: int) -> list[str]: + """Scan a single DVD title and return output lines. + + Args: + i: Title number to scan. + + Returns: + List of scan output lines. + """ + lines = [] + for line in check_err( + [ + HANDBRAKE, + "--scan", + "--title", + str(i), + "-i", + self.mountpoint, + ], + stdout=subprocess.PIPE, + ).split(os.linesep): + if self.verbose: + _logger.debug("< %s", line.rstrip()) + lines.append(line) + return lines + + def scan_all(self) -> list[str]: + """Scan all DVD titles at once and return output lines. + + Returns: + List of scan output lines. + """ + lines = [] + for line in check_err( + [ + HANDBRAKE, + "--scan", + "--title", + "0", + "-i", + self.mountpoint, + ], + stdout=subprocess.PIPE, + ).split(os.linesep): + if self.verbose: + _logger.debug("< %s", line.rstrip()) + lines.append(line) + return lines + + def scan_titles( + self, + title_numbers: list[int] | None, + ) -> list[Title]: + """Scan multiple DVD titles and return parsed Title models. + + Args: + title_numbers: List of title numbers to scan, or None for all. + + Returns: + List of parsed Title models. + """ + raw_scan = self.scan_all() + scan_lines = extract_title_scan(raw_scan) + parsed = parse_title_scan(scan_lines) + _logger.info("Disc has %d title(s).", len(parsed)) + + titles = [] + for title_name, title_info in parsed.items(): + title = _make_title(title_name, title_info) + if title_numbers and title.number not in title_numbers: + continue + titles.append(title) + return titles + + def eject(self) -> None: + """Eject the DVD drive.""" + if os.name == "nt": + self._eject_windows() + return + + cmd = ( + ["diskutil", "eject", self.mountpoint] + if sys.platform == "darwin" + else ["eject", self.mountpoint] + ) + for _ in range( + TOTAL_EJECT_SECONDS * EJECT_ATTEMPTS_PER_SECOND, + ): + if not subprocess.call(cmd): # noqa: S603 + return + time.sleep(1.0 / EJECT_ATTEMPTS_PER_SECOND) + + def _eject_windows(self) -> None: + """Eject DVD drive on Windows using MCI commands.""" + mp = self.mountpoint + if len(mp) < 4 and mp[1] == ":": # noqa: PLR2004 + letter = mp[0] + ctypes.windll.WINMM.mciSendStringW( # type: ignore[attr-defined] # ty: ignore[unresolved-attribute] + f"open {letter}: type CDAudio alias {letter}_drive", + None, + 0, + None, + ) + ctypes.windll.WINMM.mciSendStringW( # type: ignore[attr-defined] # ty: ignore[unresolved-attribute] + f"set {letter}_drive door open", + None, + 0, + None, + ) + + +_TITLE_NAME_RE = re.compile(r"^title (\d+)$") + + +def _make_title(name: str, info: dict) -> Title: + """Create a Title model from raw parsed data. + + Args: + name: Title name from scan (e.g. "title 2"). + info: Raw parsed info dictionary. + + Raises: + ValueError: If the title name format is unexpected. + + Returns: + A Title model. + """ + m = _TITLE_NAME_RE.match(name) + if not m: + msg = f"Unexpected title name format: {name!r}" + raise ValueError(msg) + number = int(m.group(1)) + info = {k: v for k, v in info.items() if k is not None} + info["duration"] = extract_duration("duration " + info["duration"]) + return Title(number=number, info=TitleInfo(**info)) diff --git a/src/dvdrip/_errors.py b/src/dvdrip/_errors.py new file mode 100644 index 0000000..5b42337 --- /dev/null +++ b/src/dvdrip/_errors.py @@ -0,0 +1,14 @@ +"""Error types for dvdrip.""" + + +class UserError(Exception): + """An error caused by invalid user input or configuration.""" + + def __init__(self, message: str) -> None: + """Initialize with an error message. + + Args: + message: Human-readable error description. + """ + super().__init__(message) + self.message = message diff --git a/src/dvdrip/_models.py b/src/dvdrip/_models.py new file mode 100644 index 0000000..5377e17 --- /dev/null +++ b/src/dvdrip/_models.py @@ -0,0 +1,98 @@ +"""Pydantic data models for DVD ripping.""" + +from pydantic import BaseModel, ConfigDict, Field + + +class Duration(BaseModel): + """A time duration with hours, minutes, and seconds.""" + + model_config = ConfigDict(frozen=True) + + hours: int + minutes: int + seconds: int + + def __str__(self) -> str: + """Format as HH:MM:SS.""" + return f"{self.hours:02d}:{self.minutes:02d}:{self.seconds:02d}" + + def in_seconds(self) -> int: + """Convert to total seconds.""" + return 60 * (60 * self.hours + self.minutes) + self.seconds + + +class Size(BaseModel): + """Video frame dimensions and pixel aspect ratio.""" + + model_config = ConfigDict(frozen=True) + + width: int + height: int + pix_aspect_width: int + pix_aspect_height: int + fps: float + + +class Chapter(BaseModel): + """A DVD chapter with its number and duration.""" + + model_config = ConfigDict(frozen=True) + + number: int + duration: Duration + + +class AudioTrack(BaseModel): + """An audio track on a DVD title.""" + + model_config = ConfigDict(frozen=True) + + number: int + lang: str + codec: str + channels: str + iso639_2: str + extras: str + + +class SubtitleTrack(BaseModel): + """A subtitle track on a DVD title.""" + + model_config = ConfigDict(frozen=True) + + number: int + info: str + + +class TitleInfo(BaseModel): + """Parsed information about a DVD title from HandBrakeCLI scan output.""" + + model_config = ConfigDict( + frozen=True, + populate_by_name=True, + extra="ignore", + ) + + duration: Duration + size: str + chapters: dict[str, str] + audio_tracks: dict[str, str] = Field(validation_alias="audio tracks") + subtitle_tracks: dict[str, str] = Field(validation_alias="subtitle tracks") + + +class Title(BaseModel): + """A DVD title with its number and parsed info.""" + + model_config = ConfigDict(frozen=True) + + number: int + info: TitleInfo + + +class Task(BaseModel): + """A ripping task: one title, optionally one chapter.""" + + model_config = ConfigDict(frozen=True) + + title: Title + chapter: int | None = None diff --git a/src/dvdrip/_parsing.py b/src/dvdrip/_parsing.py new file mode 100644 index 0000000..deb207f --- /dev/null +++ b/src/dvdrip/_parsing.py @@ -0,0 +1,337 @@ +"""Parsing functions for HandBrakeCLI scan output.""" + +import logging +import re +from collections.abc import Iterable +from math import gcd +from typing import Any + +from dvdrip._models import ( + AudioTrack, + Chapter, + Duration, + Size, + SubtitleTrack, +) + +_TITLE_COUNT_REGEXES = [ + re.compile(r"^Scanning title \d+ of (\d+)\.\.\.$"), + re.compile(r"^\[\d\d:\d\d:\d\d] scan: DVD has (\d+) title\(s\)$"), +] + +_STRUCTURED_LINE_RE = re.compile(r"( *)\+ (([a-z0-9 ]+):)?(.*)") + +_TRACK_VALUE_RE = re.compile(r"(\d+), (.*)") + +_SIZE_REGEX = re.compile( + r"^\s*(\d+)x(\d+),\s*" + r"pixel aspect: (\d+)/(\d+),\s*" + r"display aspect: (?:\d+(?:\.\d+)),\s*" + r"(\d+(?:\.\d+)) fps\s*$", +) + +_DURATION_REGEX = re.compile( + r"^(?:.*,)?\s*duration\s+(\d\d):(\d\d):(\d\d)\s*(?:,.*)?$", +) + +_AUDIO_TRACK_REGEX = re.compile( + r"^(\S+)\s*((?:\([^)]*\)\s*)*)(?:,\s*(.*))?$", +) + +_AUDIO_TRACK_FIELD_REGEX = re.compile( + r"^\(([^),]+)\)\s*\(([^)]*?)\s*ch\)\s*" + r"((?:\([^()]*\)\s*)*)\(iso639-2:\s*([^)]+)\)$", +) + +_AUDIO_TRACK_FIELD_COMBINED_REGEX = re.compile( + r"^\(([^,]+),\s*([^,]*?)\s*ch(?:,\s*[^)]*)?\)\s*" + r"((?:\([^()]*\)\s*)*)\(iso639-2:\s*([^)]+)\)$", +) + +_logger = logging.getLogger(__name__) + + +def only[T](iterable: Iterable[T]) -> T: + """Return the one and only element in an iterable. + + Raises: + ValueError: If iterable does not have exactly one item. + """ + (result,) = iterable + return result + + +def find_title_count( + scan: tuple[str, ...], + *, + verbose: bool, +) -> int: + """Find the total number of titles on a DVD from scan output. + + Args: + scan: Lines of HandBrakeCLI scan output. + verbose: If True, print scan lines on failure. + + Raises: + AssertionError: If title count cannot be found. + """ + for regex in _TITLE_COUNT_REGEXES: + m = None + for line in scan: + m = regex.match(line) + if m: + break + if m: + return int(m.group(1)) + if verbose: + for line in scan: + _logger.debug(line) + msg = "Can't find TITLE_COUNT_REGEX in scan" + raise AssertionError(msg) + + +def extract_title_scan(scan: Iterable[str]) -> tuple[str, ...]: + """Extract structured title scan lines from raw scan output. + + Args: + scan: Iterable of scan output lines. + + Returns: + Tuple of structured lines from the title scan section. + """ + result: list[str] = [] + in_title_scan = False + for line in scan: + if not in_title_scan and line.startswith("+"): + in_title_scan = True + if in_title_scan: + m = _STRUCTURED_LINE_RE.match(line) + if m: + result.append(line) + else: + break + return tuple(result) + + +def _massage_track_data(node: dict[str, Any], key: str) -> None: + """Normalize track data from list format to dict format.""" + if key in node: + track_data = node[key] + if isinstance(track_data, list): + new_track_data = {} + for track in track_data: + m = _TRACK_VALUE_RE.match(track) + assert m is not None # noqa: S101 + k, v = m.groups() + new_track_data[k] = v + node[key] = new_track_data + + +def parse_title_scan(scan: tuple[str, ...]) -> dict[str, Any]: + """Parse structured title scan lines into a nested dictionary. + + Args: + scan: Tuple of structured scan lines. + + Returns: + Dictionary mapping title names to their parsed info. + """ + _pos, result = _parse_title_scan_helper(scan, pos=0, indent=0) + + for value in result.values(): + _massage_track_data(value, "audio tracks") + _massage_track_data(value, "subtitle tracks") + return result + + +def _parse_title_scan_helper( + scan: tuple[str, ...], + pos: int, + indent: int, +) -> tuple[int, Any]: + """Recursively parse scan lines at a given indentation level.""" + result: dict[str | None, Any] = {} + cruft: list[str] = [] + while True: + pos, node = _parse_node(scan, pos=pos, indent=indent) + if node: + if isinstance(node, tuple): + k, v = node + result[k] = v + else: + cruft.append(node) + result[None] = cruft + else: + break + if len(result) == 1 and None in result: + return pos, result[None] + return pos, result + + +def _parse_node( + scan: tuple[str, ...], + pos: int, + indent: int, +) -> tuple[int, Any]: + """Parse a single node from scan lines.""" + if pos >= len(scan): + return pos, None + line = scan[pos] + m = _STRUCTURED_LINE_RE.match(line) + assert m is not None # noqa: S101 + spaces_str, colon, name, value = m.groups() + spaces = len(spaces_str) / 2 + if spaces < indent: + return pos, None + assert spaces == indent, f"{indent} <> {line!r}" # noqa: S101 + pos += 1 + if colon: + if value: + node = (name, value) + else: + pos, children = _parse_title_scan_helper(scan, pos, indent + 1) + node = (name, children) + else: + node = value + return pos, node + + +def parse_size(s: str) -> Size: + """Parse a size string from HandBrakeCLI into a Size model. + + Args: + s: Size string like "720x576, pixel aspect: 16/15, ...". + + Returns: + A Size model with parsed dimensions. + """ + match = _SIZE_REGEX.match(s) + assert match is not None # noqa: S101 + w, h, paw, pah, fps_str = match.groups() + return Size( + width=int(w), + height=int(h), + pix_aspect_width=int(paw), + pix_aspect_height=int(pah), + fps=float(fps_str), + ) + + +def compute_aspect_ratio(size: Size) -> tuple[int, int]: + """Compute the display aspect ratio from a Size model. + + Args: + size: A Size model with pixel aspect information. + + Returns: + Tuple of (width_ratio, height_ratio). + """ + w = size.width * size.pix_aspect_width + h = size.height * size.pix_aspect_height + d = gcd(w, h) + return (w // d, h // d) + + +def parse_duration(s: str) -> int: + """Parse a colon-separated duration string into total seconds. + + Args: + s: Duration string like "02:25:33". + + Returns: + Total number of seconds. + """ + result = 0 + for field in s.strip().split(":"): + result *= 60 + result += int(field) + return result + + +def extract_duration(s: str) -> Duration: + """Extract a Duration model from a HandBrakeCLI duration string. + + Args: + s: String containing "duration HH:MM:SS". + + Returns: + A Duration model. + """ + match = _DURATION_REGEX.match(s) + assert match is not None # noqa: S101 + hours, minutes, seconds = (int(x) for x in match.groups()) + return Duration(hours=hours, minutes=minutes, seconds=seconds) + + +def parse_chapters(d: dict[str, str]) -> list[Chapter]: + """Parse a dictionary of chapter data into Chapter models. + + Args: + d: Dictionary mapping chapter number strings to info strings. + + Returns: + List of Chapter models sorted by number. + """ + chapters = [] + for number, info in sorted( + ((int(n), info) for n, info in d.items()), + ): + chapters.append( + Chapter(number=number, duration=extract_duration(info)), + ) + return chapters + + +def parse_audio_tracks(d: dict[str, str]) -> list[AudioTrack]: + """Parse a dictionary of audio track data into AudioTrack models. + + Args: + d: Dictionary mapping track number strings to info strings. + + Returns: + List of AudioTrack models sorted by number. + """ + tracks = [] + for number, info in sorted( + ((int(n), info) for n, info in d.items()), + ): + m = _AUDIO_TRACK_REGEX.match(info) + if m: + lang, field_string, extras = m.groups() + m2 = _AUDIO_TRACK_FIELD_REGEX.match(field_string) + if not m2: + m2 = _AUDIO_TRACK_FIELD_COMBINED_REGEX.match(field_string) + if m2: + codec, channels, more_extras, iso639_2 = m2.groups() + if more_extras: + extras = more_extras + extras + tracks.append( + AudioTrack( + number=number, + lang=lang, + codec=codec, + channels=channels, + iso639_2=iso639_2, + extras=extras if extras else "", + ), + ) + else: + _logger.warning("Cannot parse audio track fields %r", field_string) + else: + _logger.warning("Cannot parse audio track info %r", info) + return tracks + + +def parse_subtitle_tracks(d: dict[str, str]) -> list[SubtitleTrack]: + """Parse a dictionary of subtitle track data into SubtitleTrack models. + + Args: + d: Dictionary mapping track number strings to info strings. + + Returns: + List of SubtitleTrack models sorted by number. + """ + return [ + SubtitleTrack(number=int(n), info=info) + for n, info in sorted(((int(n), info) for n, info in d.items())) + ] diff --git a/src/dvdrip/_subprocess.py b/src/dvdrip/_subprocess.py new file mode 100644 index 0000000..ecf3790 --- /dev/null +++ b/src/dvdrip/_subprocess.py @@ -0,0 +1,45 @@ +"""Subprocess wrappers for running external commands.""" + +import os +import subprocess +from typing import Final + +CHAR_ENCODING: Final[str] = "UTF-8" + + +def check_err( + args: list[str], + *, + stdout: int | None = None, +) -> str: + """Run a subprocess and return its decoded stderr output. + + Args: + args: Command and arguments to run. + stdout: How to handle stdout (e.g. subprocess.PIPE). + + Raises: + subprocess.CalledProcessError: If the process exits with a non-zero code. + """ + process = subprocess.Popen( # noqa: S603 + args, + stderr=subprocess.PIPE, + stdout=stdout, + ) + _, stderr = process.communicate() + retcode = process.poll() + if retcode: + raise subprocess.CalledProcessError(retcode, args, output=stderr) + assert stderr is not None # noqa: S101 + return stderr.decode(CHAR_ENCODING, "replace") + + +def check_output(args: list[str]) -> str: + """Run a subprocess and return its decoded stdout output. + + Args: + args: Command and arguments to run. + """ + raw = subprocess.check_output(args) # noqa: S603 + s = raw.decode(CHAR_ENCODING) + return s.replace(os.linesep, "\n") diff --git a/src/dvdrip/_tasks.py b/src/dvdrip/_tasks.py new file mode 100644 index 0000000..3472aa2 --- /dev/null +++ b/src/dvdrip/_tasks.py @@ -0,0 +1,149 @@ +"""Task construction, filename generation, and execution.""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING + +from dvdrip._errors import UserError +from dvdrip._models import Task, Title +from dvdrip._parsing import parse_duration + +if TYPE_CHECKING: + from dvdrip._dvd import DVD + +_logger = logging.getLogger(__name__) + + +def find_main_feature( + titles: list[Title], + *, + verbose: bool = False, +) -> Title: + """Find the longest title on the disc (the main feature). + + Args: + titles: List of titles to search. + verbose: If True, print selection details. + + Returns: + The Title with the longest duration. + """ + if verbose: + _logger.debug("Attempting to determine main feature of %d titles...", len(titles)) + main_feature = max( + titles, + key=lambda title: parse_duration(str(title.info.duration)), + ) + if verbose: + _logger.debug("Selected %r as main feature.", main_feature.number) + return main_feature + + +def construct_tasks( + titles: list[Title], + *, + chapter_split: bool, +) -> list[Task]: + """Create ripping tasks from titles. + + Args: + titles: List of titles to create tasks for. + chapter_split: If True, create one task per chapter. + + Returns: + List of Task models. + """ + tasks: list[Task] = [] + for title in titles: + num_chapters = len(title.info.chapters) + if chapter_split and num_chapters > 1: + tasks.extend( + Task(title=title, chapter=chapter) for chapter in range(1, num_chapters + 1) + ) + else: + tasks.append(Task(title=title)) + return tasks + + +def task_filenames( + tasks: list[Task], + output: str, + *, + dry_run: bool = False, +) -> list[str]: + """Generate output filenames for a list of tasks. + + Args: + tasks: List of ripping tasks. + output: Base output path. + dry_run: If True, do not create directories. + + Raises: + UserError: If multiple tasks would produce the same filename. + + Returns: + List of output file paths. + """ + if len(tasks) > 1: + result = [_multi_filename(task, output) for task in tasks] + if not dry_run: + Path(output).mkdir(parents=True) + else: + result = [f"{output}.mp4" for _ in tasks] + + if len(set(result)) != len(result): + msg = "multiple tasks use same filename" + raise UserError(msg) + return result + + +def _multi_filename(task: Task, output: str) -> str: + """Compute filename for a task when ripping multiple titles.""" + if task.chapter is None: + return str(Path(output) / f"Title{task.title.number:02d}.mp4") + return str( + Path(output) / f"Title{task.title.number:02d}_{task.chapter:02d}.mp4", + ) + + +def perform_tasks( # noqa: PLR0913 + dvd: DVD, + tasks: list[Task], + filenames: list[str], + *, + preset: str, + dry_run: bool = False, + verbose: bool = False, +) -> None: + """Execute all ripping tasks. + + Args: + dvd: A DVD instance. + tasks: List of ripping tasks. + filenames: Output filenames for each task. + preset: HandBrakeCLI preset name. + dry_run: If True, do not actually rip. + verbose: If True, print detailed progress. + """ + title_count = len({t.title.number for t in tasks}) + for task, filename in zip(tasks, filenames, strict=True): + if task.chapter is None: + _logger.info( + "Title %d / %d => %r", + task.title.number, + title_count, + filename, + ) + else: + num_chapters = len(task.title.info.chapters) + _logger.info( + "Title %d / %d , Chapter %d / %d => %r", + task.title.number, + title_count, + task.chapter, + num_chapters, + filename, + ) + dvd.rip_title(task, filename, preset=preset, dry_run=dry_run, verbose=verbose) diff --git a/src/dvdrip/py.typed b/src/dvdrip/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/test_cli.py b/test/test_cli.py new file mode 100644 index 0000000..09676e0 --- /dev/null +++ b/test/test_cli.py @@ -0,0 +1,33 @@ +"""Tests for dvdrip._cli.""" + +import pytest + +from dvdrip._cli import _parse_titles_arg +from dvdrip._errors import UserError + + +class TestParseTitlesArg: + def test_all_titles(self) -> None: + assert _parse_titles_arg("*") is None + + def test_single_number(self) -> None: + assert _parse_titles_arg("3") == [3] + + def test_comma_separated(self) -> None: + assert _parse_titles_arg("1,3,5") == [1, 3, 5] + + def test_range(self) -> None: + assert _parse_titles_arg("2-5") == [2, 3, 4, 5] + + def test_range_no_start(self) -> None: + assert _parse_titles_arg("-3") == [1, 2, 3] + + def test_mixed(self) -> None: + assert _parse_titles_arg("1,3-5,7") == [1, 3, 4, 5, 7] + + def test_deduplicates(self) -> None: + assert _parse_titles_arg("1,1,2") == [1, 2] + + def test_invalid_raises(self) -> None: + with pytest.raises(UserError, match="integer ranges"): + _parse_titles_arg("abc") diff --git a/test/test_display.py b/test/test_display.py new file mode 100644 index 0000000..f360a6c --- /dev/null +++ b/test/test_display.py @@ -0,0 +1,47 @@ +"""Tests for dvdrip._display.""" + +import logging + +import pytest + +from dvdrip._display import display_scan, render_bar +from dvdrip._models import Duration, Title, TitleInfo + + +class TestRenderBar: + def test_full_bar(self) -> None: + bar = render_bar(0, 100, 100, 10) + assert "\u25a0" in bar + assert len(bar) == 10 + + def test_partial_bar(self) -> None: + bar = render_bar(0, 50, 100, 10) + assert "\u25a0" in bar + assert "\u2025" in bar + + def test_end_segment(self) -> None: + bar = render_bar(50, 50, 100, 10) + assert "\u25a0" in bar + + +class TestDisplayScan: + def test_basic_output(self, caplog: pytest.LogCaptureFixture) -> None: + title = Title( + number=1, + info=TitleInfo( + duration=Duration(hours=1, minutes=30, seconds=0), + size=" 720x576, pixel aspect: 16/15, display aspect: 1.33, 25.0 fps", + chapters={"1": "duration 00:45:00", "2": "duration 00:45:00"}, + audio_tracks={ + "1": "English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz, 448000bps", + }, + subtitle_tracks={"1": "English (Bitmap)(VOBSUB)"}, + ), + ) + with caplog.at_level(logging.INFO): + display_scan([title]) + assert "Title" in caplog.text + assert "720" in caplog.text + assert "audio" in caplog.text + assert "chapter" in caplog.text + assert "\u25d6" in caplog.text diff --git a/test/test_dvd.py b/test/test_dvd.py new file mode 100644 index 0000000..dfc38c5 --- /dev/null +++ b/test/test_dvd.py @@ -0,0 +1,350 @@ +"""Tests for dvdrip._dvd.""" + +import logging +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from dvdrip._dvd import DVD, _make_title, find_mount_point +from dvdrip._errors import UserError +from dvdrip._models import Duration, Task, Title, TitleInfo + + +def _make_title_info() -> TitleInfo: + return TitleInfo( + duration=Duration(hours=1, minutes=0, seconds=0), + size=" 720x576, pixel aspect: 16/15, display aspect: 1.33, 25.0 fps", + chapters={"1": "duration 00:30:00", "2": "duration 00:30:00"}, + audio_tracks={"1": "English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz"}, + subtitle_tracks={"1": "English (Bitmap)(VOBSUB)"}, + ) + + +class TestDVDInit: + def test_directory_mountpoint(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + assert dvd.mountpoint == str(tmp_path) + + def test_not_directory_raises(self, tmp_path: Path) -> None: + filepath = tmp_path / "file.txt" + filepath.write_text("test") + with pytest.raises(UserError, match="not a directory"): + DVD(str(filepath), verbose=False) + + def test_block_device_looks_up_mount(self, tmp_path: Path) -> None: + with ( + patch("dvdrip._dvd.Path") as mock_path_cls, + patch("dvdrip._dvd.stat") as mock_stat, + patch("dvdrip._dvd.find_mount_point") as mock_find, + ): + mock_path_instance = MagicMock() + mock_path_instance.stat.return_value.st_mode = 0o060000 + mock_path_instance.is_dir.return_value = True + mock_path_cls.return_value = mock_path_instance + mock_stat.S_ISBLK.return_value = True + mock_find.return_value = str(tmp_path) + + dvd = DVD("/dev/sr0", verbose=False, mount_timeout=5) + assert dvd.mountpoint == str(tmp_path) + + +class TestFindMountPoint: + def test_finds_mount(self) -> None: + with patch("dvdrip._dvd.check_output") as mock: + mock.return_value = "/dev/sr0 1024 512 512 50% /media/dvd\n" + with patch("dvdrip._dvd.os.path.realpath", return_value="/dev/sr0"): + result = find_mount_point("/dev/sr0", timeout=1) + assert result == "/media/dvd" + + def test_timeout_raises(self) -> None: + with ( + patch("dvdrip._dvd.check_output") as mock, + patch("dvdrip._dvd.os.path.realpath", return_value="/dev/sr0"), + patch("dvdrip._dvd.time.sleep"), + patch("dvdrip._dvd.time.time", side_effect=[0, 0, 100]), + ): + mock.return_value = "no matching device\n" + with pytest.raises(UserError, match="not mounted"): + find_mount_point("/dev/sr0", timeout=0.01) + + +class TestDVDRipTitle: + def test_dry_run_does_not_call(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + info = _make_title_info() + title = Title(number=1, info=info) + task = Task(title=title, chapter=None) + + with patch("dvdrip._dvd.subprocess.call") as mock_call: + dvd.rip_title(task, "output.mp4", preset="Test", dry_run=True, verbose=False) + mock_call.assert_not_called() + + def test_verbose_logs( + self, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, + ) -> None: + dvd = DVD(str(tmp_path), verbose=False) + info = _make_title_info() + title = Title(number=1, info=info) + task = Task(title=title, chapter=None) + + with caplog.at_level(logging.DEBUG): + dvd.rip_title(task, "output.mp4", preset="Test", dry_run=True, verbose=True) + assert "Title Scan:" in caplog.text + + def test_calls_handbrake(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + info = _make_title_info() + title = Title(number=1, info=info) + task = Task(title=title, chapter=None) + + with patch("dvdrip._dvd.check_err") as mock: + dvd.rip_title(task, "output.mp4", preset="Test", dry_run=False, verbose=False) + mock.assert_called_once() + args = mock.call_args[0][0] + assert args[0] == "HandBrakeCLI" + + def test_verbose_calls_subprocess(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + info = _make_title_info() + title = Title(number=1, info=info) + task = Task(title=title, chapter=None) + + with patch("dvdrip._dvd.subprocess.call") as mock_call: + dvd.rip_title(task, "output.mp4", preset="Test", dry_run=False, verbose=True) + mock_call.assert_called_once() + + def test_with_chapter(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + info = _make_title_info() + title = Title(number=1, info=info) + task = Task(title=title, chapter=2) + + with patch("dvdrip._dvd.check_err") as mock: + dvd.rip_title(task, "output.mp4", preset="Test", dry_run=False, verbose=False) + args = mock.call_args[0][0] + assert "--chapters" in args + assert "2" in args + + def test_with_subtitles(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + info = _make_title_info() + title = Title(number=1, info=info) + task = Task(title=title) + + with patch("dvdrip._dvd.check_err") as mock: + dvd.rip_title(task, "output.mp4", preset="Test", dry_run=False, verbose=False) + args = mock.call_args[0][0] + assert "--subtitle" in args + + +class TestDVDScanTitle: + def test_returns_lines(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + with patch("dvdrip._dvd.check_err") as mock: + mock.return_value = "line1\nline2\n" + lines = dvd.scan_title(1) + assert "line1" in lines + + def test_verbose_logs( + self, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, + ) -> None: + dvd = DVD(str(tmp_path), verbose=True) + with patch("dvdrip._dvd.check_err") as mock, caplog.at_level(logging.DEBUG): + mock.return_value = "scan output\n" + dvd.scan_title(1) + assert "< scan output" in caplog.text + + +class TestDVDScanAll: + def test_returns_lines(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + with patch("dvdrip._dvd.check_err") as mock: + mock.return_value = "line1\nline2\n" + lines = dvd.scan_all() + assert "line1" in lines + args = mock.call_args[0][0] + assert "--title" in args + assert "0" in args + + def test_verbose_logs( + self, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, + ) -> None: + dvd = DVD(str(tmp_path), verbose=True) + with patch("dvdrip._dvd.check_err") as mock, caplog.at_level(logging.DEBUG): + mock.return_value = "scan output\n" + dvd.scan_all() + assert "< scan output" in caplog.text + + +class TestDVDScanTitles: + def test_single_title(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + scan_output = [ + "Scanning title 1 of 1...", + "+ title 1:", + " + duration: 01:30:00", + " + size: 720x480, pixel aspect: 8/9, display aspect: 1.33, 29.97 fps", + " + chapters:", + " + 1: duration 00:45:00", + " + audio tracks:", + " + 1, English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz", + " + subtitle tracks:", + " + 1, English (Bitmap)(VOBSUB)", + ] + with patch.object(dvd, "scan_all", return_value=scan_output): + titles = dvd.scan_titles(None) + assert len(titles) == 1 + assert titles[0].number == 1 + + def test_multiple_titles(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + scan_output = [ + "Scanning title 1 of 2...", + "+ title 1:", + " + duration: 01:00:00", + " + size: 720x480, pixel aspect: 8/9, display aspect: 1.33, 29.97 fps", + " + chapters:", + " + 1: duration 01:00:00", + " + audio tracks:", + " + 1, English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz", + " + subtitle tracks:", + " + 1, English (Bitmap)(VOBSUB)", + "+ title 2:", + " + duration: 00:30:00", + " + size: 720x480, pixel aspect: 8/9, display aspect: 1.33, 29.97 fps", + " + chapters:", + " + 1: duration 00:30:00", + " + audio tracks:", + " + 1, English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz", + " + subtitle tracks:", + " + 1, English (Bitmap)(VOBSUB)", + ] + with patch.object(dvd, "scan_all", return_value=scan_output): + titles = dvd.scan_titles(None) + assert len(titles) == 2 + assert titles[0].number == 1 + assert titles[1].number == 2 + + def test_skipped_title_numbers(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + scan_output = [ + "Scanning title 2 of 8...", + "+ title 2:", + " + duration: 03:00:18", + " + size: 720x480, pixel aspect: 8/9, display aspect: 1.33, 29.97 fps", + " + chapters:", + " + 1: duration 03:00:18", + " + audio tracks:", + " + 1, English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz", + " + subtitle tracks:", + " + 1, English (Bitmap)(VOBSUB)", + "+ title 3:", + " + duration: 00:30:00", + " + size: 720x480, pixel aspect: 8/9, display aspect: 1.33, 29.97 fps", + " + chapters:", + " + 1: duration 00:30:00", + " + audio tracks:", + " + 1, English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz", + " + subtitle tracks:", + " + 1, English (Bitmap)(VOBSUB)", + ] + with patch.object(dvd, "scan_all", return_value=scan_output): + titles = dvd.scan_titles(None) + assert len(titles) == 2 + assert titles[0].number == 2 + assert titles[1].number == 3 + + def test_filter_by_title_numbers(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + scan_output = [ + "+ title 2:", + " + duration: 01:00:00", + " + size: 720x480, pixel aspect: 8/9, display aspect: 1.33, 29.97 fps", + " + chapters:", + " + 1: duration 01:00:00", + " + audio tracks:", + " + 1, English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz", + " + subtitle tracks:", + " + 1, English (Bitmap)(VOBSUB)", + "+ title 3:", + " + duration: 00:30:00", + " + size: 720x480, pixel aspect: 8/9, display aspect: 1.33, 29.97 fps", + " + chapters:", + " + 1: duration 00:30:00", + " + audio tracks:", + " + 1, English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz", + " + subtitle tracks:", + " + 1, English (Bitmap)(VOBSUB)", + ] + with patch.object(dvd, "scan_all", return_value=scan_output): + titles = dvd.scan_titles([3]) + assert len(titles) == 1 + assert titles[0].number == 3 + + def test_extra_fields_ignored(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + scan_output = [ + "+ title 2:", + " + Main Feature", + " + index 2", + " + duration: 03:00:18", + " + size: 720x480, pixel aspect: 8/9, display aspect: 1.33, 29.970 fps", + " + autocrop: 0/0/8/8", + " + chapters:", + " + 1: duration 03:00:18", + " + audio tracks:", + " + 1, English (AC3, 2.0 ch, 192 kbps) (iso639-2: eng), 48000Hz, 192000bps", + " + subtitle tracks:", + " + 1, English (4:3) [VOBSUB]", + ] + with patch.object(dvd, "scan_all", return_value=scan_output): + titles = dvd.scan_titles(None) + assert len(titles) == 1 + assert titles[0].number == 2 + + +class TestDVDEject: + def test_unix_eject(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + with patch("dvdrip._dvd.subprocess.call", return_value=0) as mock: + dvd.eject() + mock.assert_called_once() + + def test_unix_eject_retries(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + with ( + patch("dvdrip._dvd.subprocess.call", side_effect=[1, 0]) as mock, + patch("dvdrip._dvd.time.sleep") as mock_sleep, + ): + dvd.eject() + assert mock.call_count == 2 + mock_sleep.assert_called_once() + + def test_windows_eject_non_drive_letter(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + dvd.mountpoint = "C:\\Users\\test" + with patch("dvdrip._dvd.os.name", "nt"): + dvd.eject() + + def test_windows_eject_drive_letter(self, tmp_path: Path) -> None: + dvd = DVD(str(tmp_path), verbose=False) + dvd.mountpoint = "F:" + with ( + patch("dvdrip._dvd.os.name", "nt"), + patch("dvdrip._dvd.ctypes") as mock_ctypes, + ): + dvd.eject() + assert mock_ctypes.windll.WINMM.mciSendStringW.call_count == 2 + + +class TestMakeTitle: + def test_invalid_name_raises(self) -> None: + with pytest.raises(ValueError, match="Unexpected title name"): + _make_title("not a title", {}) diff --git a/test/test_errors.py b/test/test_errors.py new file mode 100644 index 0000000..a7a1b94 --- /dev/null +++ b/test/test_errors.py @@ -0,0 +1,17 @@ +"""Tests for dvdrip._errors.""" + +import pytest + +from dvdrip._errors import UserError + + +class TestUserError: + def test_message(self) -> None: + err = UserError("something went wrong") + assert err.message == "something went wrong" + assert str(err) == "something went wrong" + + def test_is_exception(self) -> None: + msg = "bad input" + with pytest.raises(UserError, match=msg): + raise UserError(msg) diff --git a/test/test_init.py b/test/test_init.py new file mode 100644 index 0000000..e8193d4 --- /dev/null +++ b/test/test_init.py @@ -0,0 +1,9 @@ +"""Initial testing module.""" + +import dvdrip + + +def test_version() -> None: + version = getattr(dvdrip, "__version__", None) + assert version is not None + assert isinstance(version, str) diff --git a/test/test_models.py b/test/test_models.py new file mode 100644 index 0000000..794edf9 --- /dev/null +++ b/test/test_models.py @@ -0,0 +1,135 @@ +"""Tests for dvdrip._models.""" + +import pytest +from pydantic import ValidationError + +from dvdrip._models import ( + AudioTrack, + Chapter, + Duration, + Size, + SubtitleTrack, + Task, + Title, + TitleInfo, +) + + +class TestDuration: + def test_str(self) -> None: + d = Duration(hours=2, minutes=5, seconds=3) + assert str(d) == "02:05:03" + + def test_in_seconds(self) -> None: + d = Duration(hours=1, minutes=30, seconds=15) + assert d.in_seconds() == 5415 + + def test_zero(self) -> None: + d = Duration(hours=0, minutes=0, seconds=0) + assert d.in_seconds() == 0 + assert str(d) == "00:00:00" + + def test_frozen(self) -> None: + d = Duration(hours=1, minutes=2, seconds=3) + with pytest.raises(ValidationError): + d.hours = 5 + + +class TestSize: + def test_fields(self) -> None: + s = Size(width=720, height=576, pix_aspect_width=16, pix_aspect_height=15, fps=25.0) + assert s.width == 720 + assert s.height == 576 + assert s.fps == 25.0 + + def test_frozen(self) -> None: + s = Size(width=720, height=576, pix_aspect_width=16, pix_aspect_height=15, fps=25.0) + with pytest.raises(ValidationError): + s.width = 1920 + + +class TestChapter: + def test_fields(self) -> None: + d = Duration(hours=0, minutes=24, seconds=15) + c = Chapter(number=1, duration=d) + assert c.number == 1 + assert c.duration.in_seconds() == 1455 + + +class TestAudioTrack: + def test_fields(self) -> None: + at = AudioTrack( + number=1, + lang="English", + codec="AC3", + channels="5.1", + iso639_2="eng", + extras="48000Hz, 448000bps", + ) + assert at.lang == "English" + assert at.channels == "5.1" + + +class TestSubtitleTrack: + def test_fields(self) -> None: + st = SubtitleTrack(number=1, info="English (Bitmap)(VOBSUB)") + assert st.number == 1 + + +class TestTitleInfo: + def test_with_aliases(self) -> None: + info = TitleInfo( + duration=Duration(hours=1, minutes=0, seconds=0), + size=" 720x576, pixel aspect: 16/15, display aspect: 1.33, 25 fps", + chapters={"1": "duration 00:30:00"}, + **{"audio tracks": {"1": "track info"}, "subtitle tracks": {"1": "sub info"}}, + ) + assert info.audio_tracks == {"1": "track info"} + assert info.subtitle_tracks == {"1": "sub info"} + + def test_populate_by_name(self) -> None: + info = TitleInfo( + duration=Duration(hours=1, minutes=0, seconds=0), + size="test", + chapters={"1": "ch"}, + audio_tracks={"1": "at"}, + subtitle_tracks={"1": "st"}, + ) + assert info.chapters == {"1": "ch"} + + +class TestTitle: + def test_fields(self) -> None: + info = TitleInfo( + duration=Duration(hours=1, minutes=0, seconds=0), + size="test", + chapters={}, + audio_tracks={}, + subtitle_tracks={}, + ) + title = Title(number=1, info=info) + assert title.number == 1 + + +class TestTask: + def test_default_chapter(self) -> None: + info = TitleInfo( + duration=Duration(hours=1, minutes=0, seconds=0), + size="test", + chapters={}, + audio_tracks={}, + subtitle_tracks={}, + ) + task = Task(title=Title(number=1, info=info)) + assert task.chapter is None + + def test_with_chapter(self) -> None: + info = TitleInfo( + duration=Duration(hours=1, minutes=0, seconds=0), + size="test", + chapters={}, + audio_tracks={}, + subtitle_tracks={}, + ) + task = Task(title=Title(number=1, info=info), chapter=3) + assert task.chapter == 3 diff --git a/test/test_parsing.py b/test/test_parsing.py new file mode 100644 index 0000000..a1fce1f --- /dev/null +++ b/test/test_parsing.py @@ -0,0 +1,192 @@ +"""Tests for dvdrip._parsing.""" + +import logging + +import pytest + +from dvdrip._models import Duration, Size +from dvdrip._parsing import ( + compute_aspect_ratio, + extract_duration, + extract_title_scan, + find_title_count, + only, + parse_audio_tracks, + parse_chapters, + parse_duration, + parse_size, + parse_subtitle_tracks, + parse_title_scan, +) + + +class TestOnly: + def test_single_element(self) -> None: + assert only([42]) == 42 + + def test_empty_raises(self) -> None: + with pytest.raises(ValueError, match="not enough"): + only([]) + + def test_multiple_raises(self) -> None: + with pytest.raises(ValueError, match="too many"): + only([1, 2]) + + +class TestFindTitleCount: + def test_scanning_format(self) -> None: + scan = ("Scanning title 1 of 5...",) + assert find_title_count(scan, verbose=False) == 5 + + def test_dvd_has_format(self) -> None: + scan = ("[12:34:56] scan: DVD has 3 title(s)",) + assert find_title_count(scan, verbose=False) == 3 + + def test_not_found_raises(self) -> None: + with pytest.raises(AssertionError, match="TITLE_COUNT"): + find_title_count(("no match here",), verbose=False) + + def test_not_found_verbose_logs(self, caplog: pytest.LogCaptureFixture) -> None: + with caplog.at_level(logging.DEBUG), pytest.raises(AssertionError): + find_title_count(("some line",), verbose=True) + assert "some line" in caplog.text + + +class TestExtractTitleScan: + def test_basic(self) -> None: + scan = [ + "some preamble", + "+ title 1:", + " + duration: 01:30:00", + "end of scan", + ] + result = extract_title_scan(scan) + assert result == ("+ title 1:", " + duration: 01:30:00") + + def test_empty(self) -> None: + assert extract_title_scan([]) == () + + +class TestParseTitleScan: + def test_basic_title(self) -> None: + scan = ( + "+ title 1:", + " + duration: 01:30:00", + " + size: 720x480, pixel aspect: 8/9, display aspect: 1.33, 29.97 fps", + " + chapters:", + " + 1: duration 00:45:00", + " + 2: duration 00:45:00", + " + audio tracks:", + " + 1, English (AC3) (5.1 ch) (iso639-2: eng)", + " + subtitle tracks:", + " + 1, English (Bitmap)(VOBSUB)", + ) + result = parse_title_scan(scan) + assert "title 1" in result + info = result["title 1"] + assert info["duration"] == " 01:30:00" + assert "1" in info["audio tracks"] + assert "1" in info["subtitle tracks"] + + +class TestParseSize: + def test_standard(self) -> None: + s = " 720x576, pixel aspect: 16/15, display aspect: 1.33, 25.0 fps" + size = parse_size(s) + assert size == Size( + width=720, + height=576, + pix_aspect_width=16, + pix_aspect_height=15, + fps=25.0, + ) + + +class TestComputeAspectRatio: + def test_4_3(self) -> None: + size = Size(width=720, height=576, pix_aspect_width=16, pix_aspect_height=15, fps=25.0) + assert compute_aspect_ratio(size) == (4, 3) + + def test_16_9(self) -> None: + size = Size(width=1920, height=1080, pix_aspect_width=1, pix_aspect_height=1, fps=24.0) + assert compute_aspect_ratio(size) == (16, 9) + + +class TestParseDuration: + def test_hhmmss(self) -> None: + assert parse_duration("02:25:33") == 8733 + + def test_mmss(self) -> None: + assert parse_duration("25:33") == 1533 + + def test_zero(self) -> None: + assert parse_duration("00:00:00") == 0 + + +class TestExtractDuration: + def test_basic(self) -> None: + d = extract_duration("duration 01:30:15") + assert d == Duration(hours=1, minutes=30, seconds=15) + + def test_with_surrounding(self) -> None: + d = extract_duration("cells 0->0, duration 00:24:15, something") + assert d == Duration(hours=0, minutes=24, seconds=15) + + +class TestParseChapters: + def test_basic(self) -> None: + d = {"2": "duration 00:10:00", "1": "duration 00:20:00"} + chapters = parse_chapters(d) + assert len(chapters) == 2 + assert chapters[0].number == 1 + assert chapters[0].duration.in_seconds() == 1200 + assert chapters[1].number == 2 + + +class TestParseAudioTracks: + def test_basic(self) -> None: + d = {"1": "English (AC3) (5.1 ch) (iso639-2: eng), 48000Hz, 448000bps"} + tracks = parse_audio_tracks(d) + assert len(tracks) == 1 + assert tracks[0].lang == "English" + assert tracks[0].channels == "5.1" + assert tracks[0].iso639_2 == "eng" + + def test_combined_format(self) -> None: + d = {"1": "English (AC3, 2.0 ch, 192 kbps) (iso639-2: eng), 48000Hz, 192000bps"} + tracks = parse_audio_tracks(d) + assert len(tracks) == 1 + assert tracks[0].lang == "English" + assert tracks[0].codec == "AC3" + assert tracks[0].channels == "2.0" + assert tracks[0].iso639_2 == "eng" + + def test_with_more_extras(self) -> None: + d = {"1": "English (AC3) (5.1 ch) (extra info) (iso639-2: eng), 48000Hz"} + tracks = parse_audio_tracks(d) + assert len(tracks) == 1 + assert "(extra info)" in tracks[0].extras + + def test_unparseable_info_warns(self, caplog: pytest.LogCaptureFixture) -> None: + with caplog.at_level(logging.WARNING): + d = {"1": ""} + tracks = parse_audio_tracks(d) + assert len(tracks) == 0 + assert "Cannot parse audio track info" in caplog.text + + def test_unparseable_fields_warns(self, caplog: pytest.LogCaptureFixture) -> None: + with caplog.at_level(logging.WARNING): + d = {"1": "English (bad fields format)"} + tracks = parse_audio_tracks(d) + assert len(tracks) == 0 + assert "Cannot parse audio track fields" in caplog.text + + +class TestParseSubtitleTracks: + def test_basic(self) -> None: + d = {"2": "French (VOBSUB)", "1": "English (VOBSUB)"} + tracks = parse_subtitle_tracks(d) + assert len(tracks) == 2 + assert tracks[0].number == 1 + assert tracks[0].info == "English (VOBSUB)" + assert tracks[1].number == 2 diff --git a/test/test_subprocess.py b/test/test_subprocess.py new file mode 100644 index 0000000..d5f0bb1 --- /dev/null +++ b/test/test_subprocess.py @@ -0,0 +1,53 @@ +"""Tests for dvdrip._subprocess.""" + +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from dvdrip._subprocess import check_err, check_output + + +class TestCheckErr: + def test_returns_decoded_stderr(self) -> None: + with patch("dvdrip._subprocess.subprocess.Popen") as mock_popen: + process = MagicMock() + process.communicate.return_value = (None, b"stderr output") + process.poll.return_value = 0 + mock_popen.return_value = process + + result = check_err(["echo", "test"]) + assert result == "stderr output" + + def test_raises_on_nonzero_exit(self) -> None: + with patch("dvdrip._subprocess.subprocess.Popen") as mock_popen: + process = MagicMock() + process.communicate.return_value = (None, b"error") + process.poll.return_value = 1 + mock_popen.return_value = process + + with pytest.raises(subprocess.CalledProcessError) as exc_info: + check_err(["bad", "cmd"]) + assert exc_info.value.returncode == 1 + + def test_passes_stdout_kwarg(self) -> None: + with patch("dvdrip._subprocess.subprocess.Popen") as mock_popen: + process = MagicMock() + process.communicate.return_value = (None, b"") + process.poll.return_value = 0 + mock_popen.return_value = process + + check_err(["cmd"], stdout=subprocess.PIPE) + mock_popen.assert_called_once_with( + ["cmd"], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + +class TestCheckOutput: + def test_returns_decoded_stdout(self) -> None: + with patch("dvdrip._subprocess.subprocess.check_output") as mock: + mock.return_value = b"hello world" + result = check_output(["echo", "hello"]) + assert result == "hello world" diff --git a/test/test_tasks.py b/test/test_tasks.py new file mode 100644 index 0000000..3b020f3 --- /dev/null +++ b/test/test_tasks.py @@ -0,0 +1,124 @@ +"""Tests for dvdrip._tasks.""" + +import logging +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from dvdrip._errors import UserError +from dvdrip._models import Duration, Task, Title, TitleInfo +from dvdrip._tasks import construct_tasks, find_main_feature, perform_tasks, task_filenames + + +def _make_title(number: int, duration_seconds: int, num_chapters: int = 1) -> Title: + hours = duration_seconds // 3600 + minutes = (duration_seconds % 3600) // 60 + seconds = duration_seconds % 60 + chapters = {str(i): f"duration 00:10:0{i}" for i in range(1, num_chapters + 1)} + return Title( + number=number, + info=TitleInfo( + duration=Duration(hours=hours, minutes=minutes, seconds=seconds), + size=" 720x576, pixel aspect: 16/15, display aspect: 1.33, 25.0 fps", + chapters=chapters, + audio_tracks={"1": "English"}, + subtitle_tracks={}, + ), + ) + + +class TestFindMainFeature: + def test_selects_longest(self) -> None: + titles = [_make_title(1, 3600), _make_title(2, 7200), _make_title(3, 1800)] + result = find_main_feature(titles) + assert result.number == 2 + + def test_verbose(self, caplog: pytest.LogCaptureFixture) -> None: + titles = [_make_title(1, 3600)] + with caplog.at_level(logging.DEBUG): + find_main_feature(titles, verbose=True) + assert "main feature" in caplog.text.lower() + + +class TestConstructTasks: + def test_no_split(self) -> None: + titles = [_make_title(1, 3600, 3)] + tasks = construct_tasks(titles, chapter_split=False) + assert len(tasks) == 1 + assert tasks[0].chapter is None + + def test_chapter_split(self) -> None: + titles = [_make_title(1, 3600, 3)] + tasks = construct_tasks(titles, chapter_split=True) + assert len(tasks) == 3 + assert tasks[0].chapter == 1 + assert tasks[2].chapter == 3 + + def test_single_chapter_no_split(self) -> None: + titles = [_make_title(1, 3600, 1)] + tasks = construct_tasks(titles, chapter_split=True) + assert len(tasks) == 1 + assert tasks[0].chapter is None + + +class TestTaskFilenames: + def test_single_task(self) -> None: + title = _make_title(1, 3600) + tasks = [Task(title=title)] + filenames = task_filenames(tasks, "output/test", dry_run=True) + assert filenames == ["output/test.mp4"] + + def test_multiple_tasks(self) -> None: + t1 = _make_title(1, 3600) + t2 = _make_title(2, 1800) + tasks = [Task(title=t1), Task(title=t2)] + filenames = task_filenames(tasks, "output/test", dry_run=True) + assert "output/test/Title01.mp4" in filenames[0] + assert "output/test/Title02.mp4" in filenames[1] + + def test_chapter_filenames(self) -> None: + title = _make_title(1, 3600) + tasks = [Task(title=title, chapter=1), Task(title=title, chapter=2)] + filenames = task_filenames(tasks, "output/test", dry_run=True) + assert "Title01_01.mp4" in filenames[0] + assert "Title01_02.mp4" in filenames[1] + + def test_duplicate_filenames_raises(self) -> None: + title = _make_title(1, 3600) + tasks = [Task(title=title), Task(title=title)] + with pytest.raises(UserError, match="same filename"): + task_filenames(tasks, "output/test", dry_run=True) + + def test_creates_directory(self, tmp_path: Path) -> None: + out = tmp_path / "dvd_out" + t1 = _make_title(1, 3600) + t2 = _make_title(2, 1800) + tasks = [Task(title=t1), Task(title=t2)] + task_filenames(tasks, str(out), dry_run=False) + assert out.is_dir() + + +class TestPerformTasks: + def test_calls_rip_title(self, caplog: pytest.LogCaptureFixture) -> None: + dvd = MagicMock() + title = _make_title(1, 3600, 2) + tasks = [Task(title=title, chapter=1), Task(title=title, chapter=2)] + filenames = ["out_01.mp4", "out_02.mp4"] + + with caplog.at_level(logging.INFO): + perform_tasks(dvd, tasks, filenames, preset="Test", dry_run=False, verbose=False) + + assert dvd.rip_title.call_count == 2 + assert "Title 1" in caplog.text + assert "Chapter" in caplog.text + + def test_no_chapter(self, caplog: pytest.LogCaptureFixture) -> None: + dvd = MagicMock() + title = _make_title(1, 3600) + tasks = [Task(title=title)] + filenames = ["out.mp4"] + + with caplog.at_level(logging.INFO): + perform_tasks(dvd, tasks, filenames, preset="Test", dry_run=True, verbose=False) + assert "Title 1" in caplog.text