Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions problemtools/judge/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from .cache import CacheKey
from .execute import execute_testcase
from .result import (
SubmissionResult,
TimeLimits,
Verdict,
classify_result,
)
from .submission_judge import SubmissionJudge
from .validate import validate_output

__all__ = [
'CacheKey',
'SubmissionJudge',
'SubmissionResult',
'TimeLimits',
'Verdict',
'classify_result',
'execute_testcase',
'validate_output',
]
114 changes: 114 additions & 0 deletions problemtools/judge/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from __future__ import annotations

import copy
from concurrent.futures import Future
from dataclasses import dataclass
from threading import Lock
from typing import TYPE_CHECKING

from .result import SubmissionResult

if TYPE_CHECKING:
from ..verifyproblem import TestCase


@dataclass(frozen=True)
class CacheKey:
input_hash: bytes
ans_hash: bytes
validator_flags: tuple[str, ...]


@dataclass
class _CacheEntry:
result: SubmissionResult
run_timelim: float


def _reclassify(result: SubmissionResult, timelim: float) -> SubmissionResult:
"""Reclassify a cached result against a (possibly lower) time limit."""
if result.runtime > timelim:
if result.validator_first and result.verdict == 'WA':
# Interactive: validator exited first with WA. This can cause the submission to run
# longer than it should. Cap runtimes at timelim so this doesn't inflate the time limit.
wa = copy.copy(result)
wa.runtime = timelim
return wa
tle = SubmissionResult('TLE')
tle.runtime = result.runtime
return tle
return result


def _with_test_node(result: SubmissionResult, testcase: TestCase) -> SubmissionResult:
"""Return result with test_node and runtime_testcase set to testcase, copying only if needed."""
if result.test_node is testcase and result.runtime_testcase is testcase:
return result
result = copy.copy(result)
result.test_node = testcase
result.runtime_testcase = testcase
return result


class ResultStore:
"""Thread-safe store mapping testcase reuse keys to execution results.

Background workers populate the store via claim()/complete(); the consumer
reads results via get(). A key progresses through three states: absent
(not yet claimed), in-flight (claimed, Future not yet resolved), and
completed (_CacheEntry).

Because results are always run at the high time limit, a completed entry
can serve any query whose time limit is <= the run limit: a result whose
runtime exceeds the query limit is reclassified as TLE. A query with a
higher limit than the run limit cannot be served from cache and returns None.
"""

def __init__(self) -> None:
self._lock = Lock()
self._store: dict[CacheKey, Future[SubmissionResult] | _CacheEntry] = {}

def claim(self, testcase: TestCase) -> bool:
"""Atomically claim testcase for execution.

Returns True if the key was unclaimed; the caller must eventually call
complete(). Returns False if the key is already in-flight or completed.
"""
key = testcase.reuse_key
with self._lock:
if key in self._store:
return False
self._store[key] = Future()
return True

def complete(self, testcase: TestCase, result: SubmissionResult, run_timelim: float) -> None:
"""Store the completed result and wake any consumer waiting on the future."""
key = testcase.reuse_key
with self._lock:
future = self._store[key]
self._store[key] = _CacheEntry(result=result, run_timelim=run_timelim)
assert isinstance(future, Future)
future.set_result(result) # outside lock — callbacks may acquire other locks

def get(self, testcase: TestCase, timelim: float) -> SubmissionResult | Future[SubmissionResult] | None:
"""Look up a result for testcase at timelim.

Returns:
SubmissionResult — completed result, already reclassified for timelim; use directly.
Future — in-flight; resolves to a reclassified SubmissionResult.
None — not present, or was run at a lower limit than timelim and
cannot be reused; caller must run the testcase synchronously.
"""
key = testcase.reuse_key
with self._lock:
val = self._store.get(key)
if val is None:
return None
if isinstance(val, Future):
chained: Future[SubmissionResult] = Future()
val.add_done_callback(lambda f: chained.set_result(_with_test_node(_reclassify(f.result(), timelim), testcase)))
return chained
if timelim > val.run_timelim:
# Entry was produced at a lower limit; cannot safely reclassify upward.
return None
return _with_test_node(_reclassify(val.result, timelim), testcase)
16 changes: 9 additions & 7 deletions problemtools/judge/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

if TYPE_CHECKING:
from ..verifyproblem import TestCase
from .result import SubmissionResult, TimeLimits, classify_result
from .result import SubmissionResult
from .validate import _parse_validator_result, _validate_output

_INTERACTIVE_OUTPUT_RE = re.compile(r'\d+ \d+\.\d+ \d+ \d+\.\d+ (validator|submission)')
Expand Down Expand Up @@ -206,16 +206,18 @@ def execute_testcase(
sub: Program,
output_validator: Program,
metadata: Metadata,
timelimits: TimeLimits,
timelim: float,
base_dir: Path,
diag: Diagnostics,
) -> tuple[SubmissionResult, SubmissionResult, SubmissionResult]:
"""Run sub on testcase and return (nominal, low, high) SubmissionResults."""
) -> SubmissionResult:
"""Run sub on a single testcase."""
with tempfile.TemporaryDirectory(dir=base_dir) as exec_dir:
execution_dir = Path(exec_dir)
(execution_dir / 'feedback').mkdir()
if metadata.is_multi_pass():
raw = _run_multipass(testcase, sub, output_validator, metadata, timelimits.high, execution_dir, diag)
result = _run_multipass(testcase, sub, output_validator, metadata, timelim, execution_dir, diag)
else:
raw = _run_pass(testcase.infile_path, testcase, sub, output_validator, metadata, timelimits.high, execution_dir, diag)
return classify_result(raw, timelimits)
result = _run_pass(testcase.infile_path, testcase, sub, output_validator, metadata, timelim, execution_dir, diag)
result.test_node = testcase
result.runtime_testcase = testcase
return result
68 changes: 68 additions & 0 deletions problemtools/judge/grade.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import annotations

import os
import re
import tempfile
from pathlib import Path
from typing import cast

from ..diagnostics import Diagnostics
from ..run import Program
from .result import SubmissionResult, Verdict

_GRADER_OUTPUT_RE = re.compile(r'^((AC)|(WA)|(TLE)|(RTE)|(JE))\s+-?[0-9.]+\s*$')


def grade_group(
sub_results: list[SubmissionResult],
grader: Program,
grader_flags: list[str],
base_dir: Path,
diag: Diagnostics,
) -> tuple[Verdict, float | None]:
"""Run grader on sub_results and return (verdict, score).

Returns ('AC', 0.0) immediately if sub_results is empty.
Returns ('JE', None) on any grader error.
"""
if not sub_results:
return ('AC', 0.0)

if not grader.compile()[0]:
diag.error(f'Failed to compile grader {grader}')
return ('JE', None)

grader_input = ''.join(f'{r.verdict} {0 if r.score is None else r.score}\n' for r in sub_results)
diag.debug(f'Grading {len(sub_results)} results:\n{grader_input}')
diag.debug(f'Grader flags: {grader_flags}')

with tempfile.TemporaryDirectory(dir=base_dir) as tmpdir:
infile = Path(tmpdir) / 'grader_in'
outfile = Path(tmpdir) / 'grader_out'
errfile = Path(tmpdir) / 'grader_err'
infile.write_text(grader_input)

status, _runtime = grader.run(str(infile), str(outfile), str(errfile), args=grader_flags)

grader_output = outfile.read_text(errors='replace') if outfile.exists() else ''
stderr_content = errfile.read_text(errors='replace') if errfile.exists() else ''

if not os.WIFEXITED(status) or os.WEXITSTATUS(status) != 0:
if not os.WIFEXITED(status):
diag.error(f'Judge error: {grader} crashed')
else:
diag.error(f'Judge error: exit code {os.WEXITSTATUS(status)} for grader {grader}, expected 0')
if stderr_content:
diag.error(f'Grader stderr:\n{stderr_content}')
diag.debug(f'Grader input:\n{grader_input}')
return ('JE', None)

if not _GRADER_OUTPUT_RE.match(grader_output):
diag.error('Judge error: invalid format of grader output')
diag.debug(f'Output must match: "{_GRADER_OUTPUT_RE.pattern}"')
diag.debug(f'Output was: "{grader_output}"')
return ('JE', None)

verdict_str, score_str = grader_output.split()
diag.debug(f'Grader result: {verdict_str} ({score_str})')
return cast(Verdict, verdict_str), float(score_str)
58 changes: 5 additions & 53 deletions problemtools/judge/result.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal

if TYPE_CHECKING:
from ..verifyproblem import TestCase
from ..verifyproblem import TestCase, TestCaseGroup

Verdict = Literal['AC', 'TLE', 'OLE', 'MLE', 'RTE', 'WA', 'PAC', 'JE']

Expand All @@ -21,18 +20,10 @@ def __init__(
self.score = score
self.reason = reason
self.additional_info = additional_info
self.testcase: TestCase | None = None
self.test_node: TestCase | TestCaseGroup | None = None
self.runtime_testcase: TestCase | None = None
self.runtime = -1.0
self.ac_runtime = -1.0
self.ac_runtime_testcase: TestCase | None = None
self.validator_first = False
self.sample_failures: list[SubmissionResult] = []

def set_ac_runtime(self) -> None:
if self.verdict == 'AC':
self.ac_runtime = self.runtime
self.ac_runtime_testcase = self.runtime_testcase
self.validator_first = False # Needed to work around interactive giving unreliable runtime on WA

def __str__(self) -> str:
verdict = self.verdict
Expand All @@ -41,47 +32,8 @@ def __str__(self) -> str:
verdict += f' ({self.score:.0f})'
if self.reason is not None:
details.append(self.reason)
if self.testcase is not None:
details.append(f'testcase: {self.testcase}')
if self.test_node is not None and not self.test_node.is_group:
details.append(f'testcase: {self.test_node}')
if self.runtime != -1:
details.append(f'CPU: {self.runtime:.2f}s @ {self.runtime_testcase}')
return verdict if not details else f'{verdict} [{", ".join(details)}]'


@dataclass
class TimeLimits:
nominal: float # official limit; verdict based on this
low: float # below this is comfortably AC; above is "sensitive to time limit"
high: float # wall-clock ceiling enforced on the process


def classify_result(
result: SubmissionResult,
tl: TimeLimits,
) -> tuple[SubmissionResult, SubmissionResult, SubmissionResult]:
"""Map a raw high-limit result into the (nominal, low, high) triple."""
runtime = result.runtime
if runtime <= tl.low:
nominal = low = high = result
elif runtime <= tl.nominal:
tle = SubmissionResult('TLE')
tle.runtime = runtime
nominal, low, high = result, tle, result
elif result.validator_first and result.verdict == 'WA':
# Interactive: validator exited first with WA. This can cause the submission to run
# longer than it should. Cap runtimes at tl.low so this doesn't inflate the time limit.
import copy

high = copy.copy(result)
high.runtime = min(runtime, tl.low)
wa = SubmissionResult('WA')
wa.validator_first = True
wa.runtime = high.runtime
nominal = low = wa
else:
tle = SubmissionResult('TLE')
tle.runtime = runtime
nominal, low, high = tle, tle, result
for r in (nominal, low, high):
r.set_ac_runtime()
return nominal, low, high
Loading