diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 0d3111e..c2f9f82 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,5 +1,7 @@ +--- name: Continuous Integration +# yamllint disable-line rule:truthy on: pull_request: branches: [main] @@ -14,12 +16,12 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v5 + uses: actions/checkout@v6 - - name: Set up Python 3.12 - uses: actions/setup-python@v5 + - name: Set up Python 3.11 + uses: actions/setup-python@v6 with: - python-version: '3.12' + python-version: '3.11' - name: Run pre-commit hooks uses: pre-commit/action@v3.0.1 @@ -29,33 +31,44 @@ jobs: needs: lint strategy: matrix: - python-version: ['3.9', '3.10', '3.11', '3.12', '3.13'] + python-version: ['3.9', '3.10', '3.11', '3.12', '3.13', '3.14'] steps: - name: Checkout code - uses: actions/checkout@v5 + uses: actions/checkout@v6 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} - name: Install test dependencies - run: pip install pytest + run: pip install -e ".[dev]" - - name: Run tests - run: python -m pytest + - name: Run tests with coverage + run: >- + python -m pytest tests/ + --cov=timerun + --cov-branch + --cov-report=xml + --cov-report=term + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + flags: python${{ matrix.python-version }} build: runs-on: ubuntu-latest needs: test steps: - name: Checkout code - uses: actions/checkout@v5 + uses: actions/checkout@v6 - - name: Set up Python 3.12 - uses: actions/setup-python@v5 + - name: Set up Python 3.11 + uses: actions/setup-python@v6 with: - python-version: '3.12' + python-version: '3.11' - name: Install build dependencies run: pip install build twine diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 09ad364..c5113cc 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -12,12 +12,12 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v5 + uses: actions/checkout@v6 - - name: Set up Python 3.12 - uses: actions/setup-python@v5 + - name: Set up Python 3.11 + uses: actions/setup-python@v6 with: - python-version: '3.12' + python-version: '3.11' - name: Install build dependencies run: pip install build diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 43dfaf4..098c0ca 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,35 +11,24 @@ repos: - id: check-yaml - id: check-toml - - repo: https://github.com/adrienverge/yamllint - rev: v1.37.1 - hooks: - - id: yamllint - - - repo: https://github.com/pycqa/isort - rev: 6.0.1 - hooks: - - id: isort - - - repo: https://github.com/psf/black - rev: 25.1.0 - hooks: - - id: black - - - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.17.1 + - repo: https://github.com/HH-MWB/pyenforce + rev: v0.1.0 hooks: + - id: ruff-format + - id: ruff-check - id: mypy - - - repo: https://github.com/pylint-dev/pylint - rev: v3.3.8 - hooks: + additional_dependencies: + - ".[mypy]" # Required to re-adds mypy as a dependency + - pytest - id: pylint additional_dependencies: + - ".[pylint]" # Required to re-adds Pylint as a dependency - pytest + - id: bandit + - id: semgrep + - id: vulture - - repo: https://github.com/pycqa/bandit - rev: '1.8.6' + - repo: https://github.com/adrienverge/yamllint + rev: v1.37.1 hooks: - - id: bandit - args: ['-c', 'pyproject.toml'] + - id: yamllint diff --git a/LICENSE b/LICENSE index 179f8f7..91e918a 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2019-2025 HH-MWB +Copyright (c) 2019-2026 HH-MWB Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 3139015..45a9db9 100644 --- a/README.md +++ b/README.md @@ -7,11 +7,11 @@

TimeRun - Python library for elapsed time measurement.

- License - PyPI Latest Release - Package Status - Code style: black - Imports: isort + Version + Status + License + Coverage + Total Downloads

TimeRun is a simple, yet elegant elapsed time measurement library for [Python](https://www.python.org). It is distributed as a single file module and has no dependencies other than the [Python Standard Library](https://docs.python.org/3/library/). @@ -45,24 +45,79 @@ pip install git+https://github.com/HH-MWB/timerun.git ### Measure Code Block ```python +>>> import time >>> from timerun import Timer >>> with Timer() as timer: -... pass # put your code here +... time.sleep(0.1) # your code here >>> print(timer.duration) -0:00:00.000000100 +0:00:00.100000000 ``` ### Measure Function ```python +>>> import time >>> from timerun import Timer >>> timer = Timer() >>> @timer ... def func(): -... pass # put your code here +... time.sleep(0.1) # your code here >>> func() >>> print(timer.duration) -0:00:00.000000100 +0:00:00.100000000 +``` + +### Measure Async Function + +```python +>>> import asyncio +>>> from timerun import Timer +>>> timer = Timer() +>>> @timer +... async def async_func(): +... await asyncio.sleep(0.1) # your code here +>>> asyncio.run(async_func()) +>>> print(timer.duration) +0:00:00.100000000 +``` + +### Measure Async Code Block + +```python +>>> import asyncio +>>> from timerun import Timer +>>> async def async_code(): +... async with Timer() as timer: +... await asyncio.sleep(0.1) # your code here +... print(timer.duration) +>>> asyncio.run(async_code()) +0:00:00.100000000 +``` + +### Multiple Measurements + +```python +>>> import time +>>> from timerun import Timer +>>> timer = Timer() +>>> with timer: +... time.sleep(0.1) # your code here +>>> with timer: +... time.sleep(0.1) # your code here +>>> print(timer.duration) # Last duration +0:00:00.100000000 +>>> print(timer.durations) # All durations +(ElapsedTime(nanoseconds=100000000), ElapsedTime(nanoseconds=100000000)) +``` + +### Advanced Options + +```python +>>> from timerun import Timer +>>> # Exclude sleep time from measurements +>>> timer = Timer(count_sleep=False) +>>> # Limit storage to last 10 measurements +>>> timer = Timer(max_len=10) ``` ## Contributing @@ -71,4 +126,4 @@ We welcome contributions! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for guid ## License -This project is licensed under the MIT License - see the [LICENSE](https://github.com/HH-MWB/timerun/blob/master/LICENSE) file for details. +This project is licensed under the MIT License - see the [LICENSE](https://github.com/HH-MWB/timerun/blob/main/LICENSE) file for details. diff --git a/pyproject.toml b/pyproject.toml index 4651e33..67d8443 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,13 +28,14 @@ classifiers = [ "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: System :: Monitoring", ] dynamic = ["version"] [project.optional-dependencies] -dev = ["pytest", "pytest-cov"] +dev = ["pytest", "pytest-asyncio", "pytest-cov"] [project.urls] Homepage = "https://github.com/HH-MWB/timerun" @@ -49,19 +50,3 @@ license-files = ["LICENSE"] [tool.setuptools.dynamic] version = { attr = "timerun.__version__" } - -[tool.bandit] -skips = ["B101"] # Skip assert_used test - -[tool.black] -line-length = 79 -target-version = [ - "py39", - "py310", - "py311", - "py312", - "py313", -] - -[tool.isort] -profile = "black" diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..683ed66 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Test suite for timerun.""" diff --git a/tests/conftest.py b/tests/conftest.py index 9709a79..9a42247 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,36 +1,22 @@ """A collection of shared PyTest fixtures for timerun.""" -from contextlib import contextmanager -from typing import Callable, ContextManager, Iterable, Iterator, Tuple +from collections.abc import Callable, Iterable, Iterator +from contextlib import AbstractContextManager, contextmanager from unittest.mock import Mock -from pytest import MonkeyPatch, fixture +import pytest from timerun import ElapsedTime, Stopwatch, Timer -__all__: Tuple[str, ...] = ( - # -- Patcheres -- - "patch_clock", - "patch_split", - # -- Initiated Instances -- - "stopwatch", - "timer", - # -- Elapsed Time -- - "elapsed_1_ns", - "elapsed_100_ns", - "elapsed_1_ms", - "elapsed_1_pt_5_ms", - "elapsed_1_sec", -) - - # =========================================================================== # # Patcheres # # =========================================================================== # -@fixture -def patch_clock(monkeypatch: MonkeyPatch) -> Callable[[int], ContextManager]: +@pytest.fixture +def patch_clock( + monkeypatch: pytest.MonkeyPatch, +) -> Callable[[int], AbstractContextManager[None]]: """Patch the clock method in Stopwatch. Parameters @@ -40,14 +26,15 @@ def patch_clock(monkeypatch: MonkeyPatch) -> Callable[[int], ContextManager]: Returns ------- - Callable[[int], ContextManager] + Callable[[int], AbstractContextManager[None]] A context manager takes integer argument and patch that value as the return value of the clock method. Examples -------- - >>> with patch_clock(elapsed_ns=1): + >>> with patch_clock(1): ... pass + """ @contextmanager @@ -58,34 +45,41 @@ def patch(elapsed_ns: int) -> Iterator[None]: ---------- elapsed_ns : int The value should be returned by the clock method. + + Yields + ------ + None + Control is yielded back to the caller. + """ - monkeypatch.setattr(Stopwatch, "_clock", lambda self: elapsed_ns) + monkeypatch.setattr(Stopwatch, "_clock", lambda _: elapsed_ns) yield return patch -@fixture +@pytest.fixture def patch_split( - monkeypatch: MonkeyPatch, -) -> Callable[[Iterable[int]], ContextManager]: + monkeypatch: pytest.MonkeyPatch, +) -> Callable[[Iterable[int]], AbstractContextManager[None]]: """Patch the split method in Timer. Parameters ---------- monkeypatch : MonkeyPatch - The fixture has been used to patch the split method. + The fixture has been used to patch the split method. Returns ------- - Callable[[Iterable[int]], ContextManager] + Callable[[Iterable[int]], AbstractContextManager[None]] A context manager takes a list of integers as nanoseconds and patch those as the return values of the elapse method. Examples -------- - >>> with patch_split(elapsed_times=[100, 200, 300]): + >>> with patch_split([100, 200, 300]): ... pass + """ @contextmanager @@ -96,11 +90,17 @@ def patch(elapsed_times: Iterable[int]) -> Iterator[None]: ---------- elapsed_times : Iterable[int] The nanoseconds should be returned by the split method. + + Yields + ------ + None + Control is yielded back to the caller. + """ mock_stopwatch = Mock(spec=["reset", "split"]) - mock_stopwatch.split.side_effect = [ - ElapsedTime(nanoseconds=t) for t in elapsed_times - ] + mock_stopwatch.split.configure_mock( + side_effect=[ElapsedTime(nanoseconds=t) for t in elapsed_times], + ) monkeypatch.setattr(Timer, "_stopwatch", mock_stopwatch) yield @@ -113,17 +113,31 @@ def patch(elapsed_times: Iterable[int]) -> Iterator[None]: # =========================================================================== # -@fixture +@pytest.fixture def stopwatch() -> Stopwatch: - """A newly created Stopwatch started at time ``0``.""" + """Create a Stopwatch started at time ``0``. + + Returns + ------- + Stopwatch + A stopwatch started at time ``0``. + + """ watch: Stopwatch = Stopwatch() - watch._start = 0 # pylint: disable=protected-access + watch._start = 0 # pylint: disable=protected-access # noqa: SLF001 return watch -@fixture +@pytest.fixture def timer() -> Timer: - """A newly created Timer with unlimited storage size.""" + """Create a Timer with unlimited storage size. + + Returns + ------- + Timer + A newly created Timer. + + """ return Timer() @@ -132,31 +146,66 @@ def timer() -> Timer: # =========================================================================== # -@fixture +@pytest.fixture def elapsed_1_ns() -> ElapsedTime: - """Elapsed Time of 1 nanosecond.""" + """Elapsed Time of 1 nanosecond. + + Returns + ------- + ElapsedTime + Elapsed time of 1 nanosecond. + + """ return ElapsedTime(nanoseconds=1) -@fixture +@pytest.fixture def elapsed_100_ns() -> ElapsedTime: - """Elapsed Time of 100 nanoseconds.""" + """Elapsed Time of 100 nanoseconds. + + Returns + ------- + ElapsedTime + Elapsed time of 100 nanoseconds. + + """ return ElapsedTime(nanoseconds=100) -@fixture +@pytest.fixture def elapsed_1_ms() -> ElapsedTime: - """Elapsed Time of 1 microsecond.""" + """Elapsed Time of 1 microsecond. + + Returns + ------- + ElapsedTime + Elapsed time of 1 microsecond. + + """ return ElapsedTime(nanoseconds=1000) -@fixture +@pytest.fixture def elapsed_1_pt_5_ms() -> ElapsedTime: - """Elapsed Time of 1.5 microseconds.""" + """Elapsed Time of 1.5 microseconds. + + Returns + ------- + ElapsedTime + Elapsed time of 1.5 microseconds. + + """ return ElapsedTime(nanoseconds=1500) -@fixture +@pytest.fixture def elapsed_1_sec() -> ElapsedTime: - """Elapsed Time of 1 second.""" + """Elapsed Time of 1 second. + + Returns + ------- + ElapsedTime + Elapsed time of 1 second. + + """ return ElapsedTime(nanoseconds=int(1e9)) diff --git a/tests/test_elapsedtime.py b/tests/test_elapsedtime.py index 99c598c..f54ff70 100644 --- a/tests/test_elapsedtime.py +++ b/tests/test_elapsedtime.py @@ -1,9 +1,11 @@ """A collection of tests for class ``ElapsedTime``.""" +# pylint: disable=no-self-use,magic-value-comparison + from dataclasses import FrozenInstanceError from datetime import timedelta -from pytest import raises +import pytest from timerun import ElapsedTime @@ -35,9 +37,10 @@ def test_modify_after_init(self, elapsed_1_ns: ElapsedTime) -> None: ---------- elapsed_1_ns : ElapsedTime A ElapsedTime instance will be using to update attribute. + """ - with raises(FrozenInstanceError): - elapsed_1_ns.nanoseconds = 0 # type: ignore + with pytest.raises(FrozenInstanceError): + elapsed_1_ns.nanoseconds = 0 # type: ignore[misc] assert elapsed_1_ns.nanoseconds == 1 @@ -84,11 +87,13 @@ def test_microseconds_accuracy(self, elapsed_1_ms: ElapsedTime) -> None: ---------- elapsed_1_ms : ElapsedTime Elapsed Time of 1 microsecond. + """ assert elapsed_1_ms.timedelta == timedelta(microseconds=1) def test_nanoseconds_accuracy( - self, elapsed_1_pt_5_ms: ElapsedTime + self, + elapsed_1_pt_5_ms: ElapsedTime, ) -> None: """Test using ElapsedTime of 1.5 microseconds. @@ -99,6 +104,7 @@ def test_nanoseconds_accuracy( ---------- elapsed_1_pt_5_ms : ElapsedTime Elapsed Time of 1.5 microseconds. + """ assert elapsed_1_pt_5_ms.timedelta == timedelta(microseconds=1) @@ -107,7 +113,8 @@ class TestStr: """Test suite for calling str function on ElapsedTime.""" def test_elapsed_time_seconds_as_decimals( - self, elapsed_100_ns: ElapsedTime + self, + elapsed_100_ns: ElapsedTime, ) -> None: """Test elapsed time in seconds is in decimal. @@ -118,11 +125,13 @@ def test_elapsed_time_seconds_as_decimals( ---------- elapsed_100_ns : ElapsedTime Elapsed Time to be used to call ``str``. + """ assert str(elapsed_100_ns) == "0:00:00.000000100" def test_elapsed_time_seconds_as_integer( - self, elapsed_1_sec: ElapsedTime + self, + elapsed_1_sec: ElapsedTime, ) -> None: """Test elapsed time in seconds is an integer. @@ -133,6 +142,7 @@ def test_elapsed_time_seconds_as_integer( ---------- elapsed_1_sec : ElapsedTime Elapsed Time to be used to call ``str``. + """ assert str(elapsed_1_sec) == "0:00:01" @@ -150,5 +160,6 @@ def test_repr(self, elapsed_100_ns: ElapsedTime) -> None: ---------- elapsed_100_ns : ElapsedTime Elapsed Time to be used to call ``repr``. + """ assert repr(elapsed_100_ns) == "ElapsedTime(nanoseconds=100)" diff --git a/tests/test_stopwatch.py b/tests/test_stopwatch.py index 8d5cca9..1828b1b 100644 --- a/tests/test_stopwatch.py +++ b/tests/test_stopwatch.py @@ -1,10 +1,18 @@ """A collection of tests for class ``Stopwatch``.""" +# pylint: disable=no-self-use + +from __future__ import annotations + from time import perf_counter_ns, process_time_ns -from typing import Callable +from typing import TYPE_CHECKING from timerun import ElapsedTime, Stopwatch +if TYPE_CHECKING: + from collections.abc import Callable + from contextlib import AbstractContextManager + class TestInit: """Test suite for stopwatch initialization.""" @@ -13,7 +21,7 @@ def test_include_sleep(self) -> None: """Test initialize stopwatch take sleep in to count.""" stopwatch: Stopwatch = Stopwatch(count_sleep=True) assert ( - stopwatch._clock # pylint: disable=protected-access + stopwatch._clock # pylint: disable=protected-access # noqa: SLF001 == perf_counter_ns ) @@ -21,7 +29,7 @@ def test_exclude_sleep(self) -> None: """Test initialize stopwatch do not take sleep in to count.""" stopwatch: Stopwatch = Stopwatch(count_sleep=False) assert ( - stopwatch._clock # pylint: disable=protected-access + stopwatch._clock # pylint: disable=protected-access # noqa: SLF001 == process_time_ns ) @@ -30,15 +38,19 @@ def test_default_measurer(self) -> None: default: Stopwatch = Stopwatch() include: Stopwatch = Stopwatch(count_sleep=True) assert ( - default._clock # pylint: disable=protected-access - == include._clock # pylint: disable=protected-access + default._clock # pylint: disable=protected-access # noqa: SLF001 + == include._clock # pylint: disable=protected-access # noqa: SLF001 ) class TestReset: # pylint: disable=too-few-public-methods """Test suite for starting stopwatch.""" - def test_reset(self, patch_clock: Callable, stopwatch: Stopwatch) -> None: + def test_reset( + self, + patch_clock: Callable[[int], AbstractContextManager[None]], + stopwatch: Stopwatch, + ) -> None: """Test to reset a stopwatch. Expected to have a stopwatch whose `_start` attribute is not @@ -50,11 +62,12 @@ def test_reset(self, patch_clock: Callable, stopwatch: Stopwatch) -> None: Patcher has been used to set the starting time at ``1``. stopwatch : Stopwatch A started Stopwatch, which will be reset. + """ - assert stopwatch._start != 1 # pylint: disable=protected-access - with patch_clock(elapsed_ns=1): + assert stopwatch._start != 1 # pylint: disable=protected-access # noqa: SLF001 + with patch_clock(1): stopwatch.reset() - assert stopwatch._start == 1 # pylint: disable=protected-access + assert stopwatch._start == 1 # pylint: disable=protected-access # noqa: SLF001 class TestSplit: @@ -62,7 +75,7 @@ class TestSplit: def test_calculation( self, - patch_clock: Callable, + patch_clock: Callable[[int], AbstractContextManager[None]], stopwatch: Stopwatch, elapsed_100_ns: ElapsedTime, ) -> None: @@ -80,16 +93,17 @@ def test_calculation( A stopwatch started at time ``0``. elapsed_100_ns : ElapsedTime Elapsed Time of 100 nanoseconds. + """ - assert stopwatch._start == 0 # pylint: disable=protected-access + assert not stopwatch._start # pylint: disable=protected-access # noqa: SLF001 - with patch_clock(elapsed_ns=100): + with patch_clock(100): elapsed: ElapsedTime = stopwatch.split() assert elapsed == elapsed_100_ns def test_split_multiple_times( self, - patch_clock: Callable, + patch_clock: Callable[[int], AbstractContextManager[None]], stopwatch: Stopwatch, elapsed_100_ns: ElapsedTime, elapsed_1_ms: ElapsedTime, @@ -111,13 +125,14 @@ def test_split_multiple_times( Elapsed Time of 100 nanoseconds. elapsed_1_ms : ElapsedTime Elapsed Time of 1 microsecond. + """ - assert stopwatch._start == 0 # pylint: disable=protected-access + assert not stopwatch._start # pylint: disable=protected-access # noqa: SLF001 - with patch_clock(elapsed_ns=100): + with patch_clock(100): first_elapsed: ElapsedTime = stopwatch.split() assert first_elapsed == elapsed_100_ns - with patch_clock(elapsed_ns=1000): + with patch_clock(1000): second_elapsed: ElapsedTime = stopwatch.split() assert second_elapsed == elapsed_1_ms diff --git a/tests/test_timer.py b/tests/test_timer.py index 4e7e9ca..af8fd96 100644 --- a/tests/test_timer.py +++ b/tests/test_timer.py @@ -1,10 +1,19 @@ """A collection of tests for class ``Timer``.""" -from typing import Callable, List +# pylint: disable=no-self-use -from pytest import raises +from __future__ import annotations -from timerun import ElapsedTime, NoDurationCaptured, Timer +import asyncio +from typing import TYPE_CHECKING, cast + +import pytest + +from timerun import ElapsedTime, NoDurationCapturedError, Timer + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable + from contextlib import AbstractContextManager # =========================================================================== # # Test suite for using Timer as a context manager. # @@ -12,7 +21,7 @@ def test_use_timer_as_context_manager_single_run( - patch_split: Callable, + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], timer: Timer, elapsed_1_ms: ElapsedTime, ) -> None: @@ -29,16 +38,16 @@ def test_use_timer_as_context_manager_single_run( A newly created Timer with unlimited storage size. elapsed_1_ms : ElapsedTime Elapsed Time of 1 microsecond. + """ - with patch_split(elapsed_times=[1000]): - with timer: - pass + with patch_split([1000]), timer: + pass assert timer.duration == elapsed_1_ms def test_use_timer_as_context_manager_multiple_run( - patch_split: Callable, + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], timer: Timer, elapsed_100_ns: ElapsedTime, elapsed_1_ms: ElapsedTime, @@ -61,8 +70,9 @@ def test_use_timer_as_context_manager_multiple_run( Elapsed Time of 1 microsecond. elapsed_1_pt_5_ms : ElapsedTime Elapsed Time of 1.5 microseconds. + """ - with patch_split(elapsed_times=[100, 1000, 1500]): + with patch_split([100, 1000, 1500]): for _ in range(3): with timer: pass @@ -79,7 +89,7 @@ class TestAsDecorator: def test_single_run( self, - patch_split: Callable, + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], timer: Timer, elapsed_1_ms: ElapsedTime, ) -> None: @@ -96,19 +106,20 @@ def test_single_run( A newly created Timer with unlimited storage size. elapsed_1_ms : ElapsedTime Elapsed Time of 1 microsecond. + """ @timer def func() -> None: pass - with patch_split(elapsed_times=[1000]): + with patch_split([1000]): func() assert timer.duration == elapsed_1_ms def test_multiple_run( # pylint: disable=too-many-arguments,too-many-positional-arguments self, - patch_split: Callable, + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], timer: Timer, elapsed_100_ns: ElapsedTime, elapsed_1_ms: ElapsedTime, @@ -131,13 +142,14 @@ def test_multiple_run( # pylint: disable=too-many-arguments,too-many-positional Elapsed Time of 1 microsecond. elapsed_1_pt_5_ms : ElapsedTime Elapsed Time of 1.5 microseconds. + """ @timer def func() -> None: pass - with patch_split(elapsed_times=[100, 1000, 1500]): + with patch_split([100, 1000, 1500]): for _ in range(3): func() @@ -155,14 +167,15 @@ def test_access_duration_attr_before_run(self, timer: Timer) -> None: """Test access duration attribute before capturing anything. Test tries to access duration attribute before capturing - anything, expected to see ``NoDurationCaptured`` exception. + anything, expected to see ``NoDurationCapturedError`` exception. Parameters ---------- timer : Timer A newly created Timer with unlimited storage size. + """ - with raises(NoDurationCaptured): + with pytest.raises(NoDurationCapturedError): _ = timer.duration @@ -171,15 +184,15 @@ class TestInit: def test_use_customized_duration_list(self) -> None: """Test capture durations into an existing list.""" - durations: List[ElapsedTime] = [] + durations: list[ElapsedTime] = [] timer = Timer(storage=durations) assert ( - timer._durations is durations # pylint: disable=protected-access + timer._durations is durations # pylint: disable=protected-access # noqa: SLF001 ) def test_max_storage_limitation( self, - patch_split: Callable, + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], elapsed_1_ms: ElapsedTime, elapsed_1_pt_5_ms: ElapsedTime, ) -> None: @@ -196,12 +209,274 @@ def test_max_storage_limitation( Elapsed Time of 1 microsecond. elapsed_1_pt_5_ms : ElapsedTime Elapsed Time of 1.5 microseconds. + """ timer = Timer(max_len=2) - with patch_split(elapsed_times=[100, 1000, 1500]): + with patch_split([100, 1000, 1500]): for _ in range(3): with timer: pass assert timer.durations == (elapsed_1_ms, elapsed_1_pt_5_ms) + + +# =========================================================================== # +# Test suite for using Timer as an async context manager. # +# =========================================================================== # + + +@pytest.mark.asyncio +async def test_use_timer_as_async_context_manager_single_run( + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], + timer: Timer, + elapsed_1_ms: ElapsedTime, +) -> None: + """Test using it as an async context manager. + + Test using the timer and ``async with`` to capture the duration time + for async code block. + + Parameters + ---------- + patch_split : Callable + Patcher has been used to set the captured duration time. + timer : Timer + A newly created Timer with unlimited storage size. + elapsed_1_ms : ElapsedTime + Elapsed Time of 1 microsecond. + + """ + with patch_split([1000]): + async with timer: + await asyncio.sleep(0) + + assert timer.duration == elapsed_1_ms + + +@pytest.mark.asyncio +async def test_use_timer_as_async_context_manager_multiple_run( + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], + timer: Timer, + elapsed_100_ns: ElapsedTime, + elapsed_1_ms: ElapsedTime, + elapsed_1_pt_5_ms: ElapsedTime, +) -> None: + """Test run multiple times with the same timer (async). + + Test run timer using ``async with`` ``3`` times and expected to see + all three captured duration times. + + Parameters + ---------- + patch_split : Callable + Patcher has been used to set the captured duration time. + timer : Timer + A newly created Timer with unlimited storage size. + elapsed_100_ns : ElapsedTime + Elapsed Time of 100 nanoseconds. + elapsed_1_ms : ElapsedTime + Elapsed Time of 1 microsecond. + elapsed_1_pt_5_ms : ElapsedTime + Elapsed Time of 1.5 microseconds. + + """ + with patch_split([100, 1000, 1500]): + for _ in range(3): + async with timer: + await asyncio.sleep(0) + + assert timer.durations == ( + elapsed_100_ns, + elapsed_1_ms, + elapsed_1_pt_5_ms, + ) + + +class TestAsAsyncDecorator: + """Test suite for using Timer as an async function decorator.""" + + @pytest.mark.asyncio + async def test_single_run( + self, + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], + timer: Timer, + elapsed_1_ms: ElapsedTime, + ) -> None: + """Test the async function with a single run. + + Test run decorated async function and expected to get the captured + duration afterward. + + Parameters + ---------- + patch_split : Callable + Patcher has been used to set the captured duration time. + timer : Timer + A newly created Timer with unlimited storage size. + elapsed_1_ms : ElapsedTime + Elapsed Time of 1 microsecond. + + """ + + @timer + async def async_func() -> None: + await asyncio.sleep(0) + + with patch_split([1000]): + await cast("Awaitable[None]", async_func()) + assert timer.duration == elapsed_1_ms + + @pytest.mark.asyncio + async def test_multiple_run( # pylint: disable=too-many-arguments,too-many-positional-arguments + self, + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], + timer: Timer, + elapsed_100_ns: ElapsedTime, + elapsed_1_ms: ElapsedTime, + elapsed_1_pt_5_ms: ElapsedTime, + ) -> None: + """Test the async function with multiple runs. + + Test run decorated async function ``3`` times and expected to see all + three captured duration times. + + Parameters + ---------- + patch_split : Callable + Patcher has been used to set the captured duration time. + timer : Timer + A newly created Timer with unlimited storage size. + elapsed_100_ns : ElapsedTime + Elapsed Time of 100 nanoseconds. + elapsed_1_ms : ElapsedTime + Elapsed Time of 1 microsecond. + elapsed_1_pt_5_ms : ElapsedTime + Elapsed Time of 1.5 microseconds. + + """ + + @timer + async def async_func() -> None: + await asyncio.sleep(0) + + with patch_split([100, 1000, 1500]): + for _ in range(3): + await cast("Awaitable[None]", async_func()) + + assert timer.durations == ( + elapsed_100_ns, + elapsed_1_ms, + elapsed_1_pt_5_ms, + ) + + +class TestAsAsyncGeneratorDecorator: + """Test suite for using Timer as an async generator function decorator.""" + + @pytest.mark.asyncio + async def test_single_run( + self, + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], + timer: Timer, + elapsed_1_ms: ElapsedTime, + ) -> None: + """Test the async generator function with a single run. + + Test run decorated async generator function and expected to get the + captured duration afterward. + + Parameters + ---------- + patch_split : Callable + Patcher has been used to set the captured duration time. + timer : Timer + A newly created Timer with unlimited storage size. + elapsed_1_ms : ElapsedTime + Elapsed Time of 1 microsecond. + + """ + + @timer + async def async_gen_func() -> AsyncGenerator[int]: + """Async generator function for testing. + + Yields + ------ + int + Sequential integers for testing. + + """ + await asyncio.sleep(0) + yield 1 + await asyncio.sleep(0) + yield 2 + + with patch_split([1000]): + items: list[int] = [ + item + async for item in cast( + "AsyncGenerator[int]", + async_gen_func(), + ) + ] + + assert items == [1, 2] + assert timer.duration == elapsed_1_ms + + @pytest.mark.asyncio + async def test_multiple_run( # pylint: disable=too-many-arguments,too-many-positional-arguments + self, + patch_split: Callable[[Iterable[int]], AbstractContextManager[None]], + timer: Timer, + elapsed_100_ns: ElapsedTime, + elapsed_1_ms: ElapsedTime, + elapsed_1_pt_5_ms: ElapsedTime, + ) -> None: + """Test the async generator function with multiple runs. + + Test run decorated async generator function ``3`` times and expected + to see all three captured duration times. + + Parameters + ---------- + patch_split : Callable + Patcher has been used to set the captured duration time. + timer : Timer + A newly created Timer with unlimited storage size. + elapsed_100_ns : ElapsedTime + Elapsed Time of 100 nanoseconds. + elapsed_1_ms : ElapsedTime + Elapsed Time of 1 microsecond. + elapsed_1_pt_5_ms : ElapsedTime + Elapsed Time of 1.5 microseconds. + + """ + + @timer + async def async_gen_func() -> AsyncGenerator[int]: + """Async generator function for testing. + + Yields + ------ + int + Sequential integers for testing. + + """ + await asyncio.sleep(0) + yield 1 + + with patch_split([100, 1000, 1500]): + for _ in range(3): + async_gen: AsyncGenerator[int] = cast( + "AsyncGenerator[int]", + async_gen_func(), + ) + async for _ in async_gen: + pass + + assert timer.durations == ( + elapsed_100_ns, + elapsed_1_ms, + elapsed_1_pt_5_ms, + ) diff --git a/timerun.py b/timerun.py index 712f6f8..68aa05b 100644 --- a/timerun.py +++ b/timerun.py @@ -3,24 +3,32 @@ from __future__ import annotations from collections import deque -from collections.abc import Iterator from contextlib import ContextDecorator from dataclasses import dataclass from datetime import timedelta +from inspect import isasyncgenfunction, iscoroutinefunction from time import perf_counter_ns, process_time_ns -from typing import Callable, Protocol, TypeVar +from typing import TYPE_CHECKING, Protocol, TypeVar, cast -__all__: tuple[str, ...] = ( +if TYPE_CHECKING: + from collections.abc import ( + AsyncGenerator, + Awaitable, + Callable, + Iterator, + ) + +__all__: tuple[str, ...] = ( # noqa: RUF022 # -- Core -- "ElapsedTime", "Stopwatch", "Timer", # -- Exceptions -- - "TimeRunException", - "NoDurationCaptured", + "NoDurationCapturedError", + "TimeRunError", ) -__version__: str = "0.3.0" +__version__: str = "0.4.0" # =========================================================================== # @@ -44,10 +52,10 @@ class AppendableSequence(Protocol[T]): """Protocol for sequences that support appending and indexing.""" - def append(self, item: T) -> None: + def append(self, _item: T) -> None: """Add an item to the sequence.""" - def __getitem__(self, index: int) -> T: + def __getitem__(self, _index: int) -> T: """Get item by index (supports negative indexing).""" def __len__(self) -> int: @@ -70,17 +78,18 @@ def __iter__(self) -> Iterator[T]: # =========================================================================== # -class TimeRunException(Exception): - """Base exception for TimeRun""" +class TimeRunError(Exception): + """Base exception for TimeRun.""" -class NoDurationCaptured(TimeRunException, AttributeError): - """No Duration Captured Exception""" +class NoDurationCapturedError(TimeRunError, AttributeError): + """No Duration Captured Exception.""" def __init__(self) -> None: + """Initialize the exception.""" super().__init__( "No duration available. This is likely because the Timer has not " - "been used to measure any code blocks or functions yet." + "been used to measure any code blocks or functions yet.", ) @@ -102,9 +111,7 @@ def __init__(self) -> None: @dataclass(init=True, repr=False, eq=True, order=True, frozen=True) class ElapsedTime: - """Elapsed Time - - An immutable object representing elapsed time in nanoseconds. + """An immutable object representing elapsed time in nanoseconds. Attributes ---------- @@ -126,21 +133,23 @@ class ElapsedTime: ElapsedTime(nanoseconds=10) >>> print(t) 0:00:00.000000010 + """ __slots__ = ["nanoseconds"] nanoseconds: int - def __str__(self) -> str: + def __str__(self) -> str: # type: ignore[explicit-override] + """Return the string representation of the elapsed time.""" integer_part = timedelta(seconds=self.nanoseconds // int(1e9)) - decimal_part = self.nanoseconds % int(1e9) - if decimal_part == 0: + if not (decimal_part := self.nanoseconds % int(1e9)): return str(integer_part) return f"{integer_part}.{decimal_part:09}" - def __repr__(self) -> str: + def __repr__(self) -> str: # type: ignore[explicit-override] + """Return the representation of the elapsed time.""" return f"ElapsedTime(nanoseconds={self.nanoseconds})" @property @@ -166,10 +175,9 @@ def timedelta(self) -> timedelta: class Stopwatch: - """Stopwatch + """A stopwatch with the highest available resolution (in nanoseconds). - A stopwatch with the highest available resolution (in nanoseconds) - to measure elapsed time. It can be set to include or exclude the + It measures elapsed time. It can be set to include or exclude the sleeping time. Parameters @@ -192,11 +200,13 @@ class Stopwatch: >>> stopwatch.reset() >>> stopwatch.split() ElapsedTime(nanoseconds=100) + """ __slots__ = ["_clock", "_start"] - def __init__(self, count_sleep: bool | None = None) -> None: + def __init__(self, *, count_sleep: bool | None = None) -> None: + """Initialize the stopwatch.""" if count_sleep is None: count_sleep = True @@ -211,7 +221,14 @@ def reset(self) -> None: self._start = self._clock() def split(self) -> ElapsedTime: - """Get the elapsed time between now and the starting time.""" + """Get the elapsed time between now and the starting time. + + Returns + ------- + ElapsedTime + The elapsed time captured by the stopwatch. + + """ return ElapsedTime(self._clock() - self._start) @@ -229,10 +246,7 @@ def split(self) -> ElapsedTime: class Timer(ContextDecorator): - """Timer - - A context decorator that can capture and save the measured elapsed - time. + """A context decorator that can capture and save the measured elapsed time. Attributes ---------- @@ -256,41 +270,154 @@ class Timer(ContextDecorator): Examples -------- + >>> import time >>> with Timer() as timer: - ... pass + ... time.sleep(0.1) # your code here >>> print(timer.duration) - 0:00:00.000000100 + >>> import time >>> timer = Timer() >>> @timer ... def func(): - ... pass + ... time.sleep(0.1) # your code here >>> func() >>> print(timer.duration) - 0:00:00.000000100 + + >>> import asyncio + >>> timer = Timer() + >>> @timer + ... async def async_func(): + ... await asyncio.sleep(0.1) # your code here + >>> asyncio.run(async_func()) + >>> print(timer.duration) + + >>> async def async_code(): + ... async with Timer() as timer: + ... await asyncio.sleep(0.1) # your code here + ... print(timer.duration) + >>> asyncio.run(async_code()) + """ - __slots__ = ["_stopwatch", "_durations"] + __slots__ = ["_durations", "_stopwatch"] def __init__( self, + *, count_sleep: bool | None = None, storage: AppendableSequence[ElapsedTime] | None = None, max_len: int | None = None, ) -> None: - self._stopwatch: Stopwatch = Stopwatch(count_sleep) + """Initialize the timer.""" + self._stopwatch: Stopwatch = Stopwatch(count_sleep=count_sleep) self._durations: AppendableSequence[ElapsedTime] = ( storage if storage is not None else deque(maxlen=max_len) ) - def __enter__(self) -> Timer: + def __enter__(self) -> Timer: # noqa: PYI034 + """Start the timer.""" + self._stopwatch.reset() + return self + + def __exit__(self, *_: object) -> None: + """Stop the timer and save the duration.""" + duration: ElapsedTime = self._stopwatch.split() + self._durations.append(duration) + + async def __aenter__(self) -> Timer: # noqa: PYI034 + """Start the timer (async context manager).""" self._stopwatch.reset() return self - def __exit__(self, *exc) -> None: + async def __aexit__(self, *_: object) -> None: + """Stop the timer and save the duration (async context manager).""" duration: ElapsedTime = self._stopwatch.split() self._durations.append(duration) + def _wrap_async_function( # type: ignore[explicit-any] + self, + func: Callable[..., Awaitable[object]], + ) -> Callable[..., Awaitable[object]]: + """Wrap an async function to measure its execution time.""" + + async def async_wrapper(*args: object, **kwargs: object) -> object: + """Wrap async function execution with timing. + + Parameters + ---------- + *args : object + Positional arguments passed to the wrapped function. + **kwargs : object + Keyword arguments passed to the wrapped function. + + Returns + ------- + object + The result of the wrapped async function. + + """ + async with self: + return await func(*args, **kwargs) + + return async_wrapper + + def _wrap_async_generator( # type: ignore[explicit-any] + self, + func: Callable[..., object], + ) -> Callable[..., AsyncGenerator[object]]: + """Wrap an async generator function to measure its execution time.""" + + async def async_gen_wrapper( + *args: object, + **kwargs: object, + ) -> AsyncGenerator[object]: + """Wrap async generator function execution with timing. + + Parameters + ---------- + *args : object + Positional arguments passed to the wrapped function. + **kwargs : object + Keyword arguments passed to the wrapped function. + + Yields + ------ + object + Items yielded from the wrapped async generator function. + + """ + async with self: + async for item in cast( + "AsyncGenerator[object]", + func(*args, **kwargs), + ): + yield item + + return async_gen_wrapper + + def __call__( # type: ignore[override,explicit-override,explicit-any] + self, + func: Callable[..., object] | Callable[..., Awaitable[object]], + ) -> Callable[..., object] | Callable[..., Awaitable[object]]: + """Wrap a function (sync or async) to measure its execution time. + + Parameters + ---------- + func : Callable + The function to be decorated (can be sync or async). + + Returns + ------- + Callable + A wrapped function that measures execution time. + + """ + if iscoroutinefunction(func): + return self._wrap_async_function(func) + if isasyncgenfunction(func): + return self._wrap_async_generator(func) + return super().__call__(func) + @property def durations(self) -> tuple[ElapsedTime, ...]: """The captured duration times as a tuple. @@ -301,6 +428,7 @@ def durations(self) -> tuple[ElapsedTime, ...]: Examples -------- >>> first_duration, second_duration = timer.durations + """ return tuple(self._durations) @@ -310,12 +438,13 @@ def duration(self) -> ElapsedTime: Raises ------ - NoDurationCaptured + NoDurationCapturedError Error that occurs when accessing an empty durations list, which is usually because the measurer has not been triggered yet. + """ try: return self._durations[-1] except IndexError as error: - raise NoDurationCaptured from error + raise NoDurationCapturedError from error