diff --git a/problemtools/judge/__init__.py b/problemtools/judge/__init__.py index 10972ded..e338bc62 100644 --- a/problemtools/judge/__init__.py +++ b/problemtools/judge/__init__.py @@ -1,17 +1,17 @@ +from .cache import CacheKey from .execute import execute_testcase from .result import ( SubmissionResult, - TimeLimits, Verdict, - classify_result, ) +from .submission_judge import SubmissionJudge from .validate import validate_output __all__ = [ + 'CacheKey', + 'SubmissionJudge', 'SubmissionResult', - 'TimeLimits', 'Verdict', - 'classify_result', 'execute_testcase', 'validate_output', ] diff --git a/problemtools/judge/cache.py b/problemtools/judge/cache.py new file mode 100644 index 00000000..f3f0c77a --- /dev/null +++ b/problemtools/judge/cache.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import copy +from concurrent.futures import Future +from dataclasses import dataclass +from threading import Lock +from typing import TYPE_CHECKING + +from .result import SubmissionResult + +if TYPE_CHECKING: + from ..verifyproblem import TestCase + + +@dataclass(frozen=True) +class CacheKey: + input_hash: bytes + ans_hash: bytes + validator_flags: tuple[str, ...] + + +@dataclass +class _CacheEntry: + result: SubmissionResult + run_timelim: float + + +def _reclassify(result: SubmissionResult, timelim: float) -> SubmissionResult: + """Reclassify a cached result against a (possibly lower) time limit.""" + if result.runtime > timelim: + if result.validator_first and result.verdict == 'WA': + # Interactive: validator exited first with WA. This can cause the submission to run + # longer than it should. Cap runtimes at timelim so this doesn't inflate the time limit. + wa = copy.copy(result) + wa.runtime = timelim + return wa + tle = SubmissionResult('TLE') + tle.runtime = result.runtime + return tle + return result + + +def _with_test_node(result: SubmissionResult, testcase: TestCase) -> SubmissionResult: + """Return result with test_node and runtime_testcase set to testcase, copying only if needed.""" + if result.test_node is testcase and result.runtime_testcase is testcase: + return result + result = copy.copy(result) + result.test_node = testcase + result.runtime_testcase = testcase + return result + + +class ResultStore: + """Thread-safe store mapping testcase reuse keys to execution results. + + Background workers populate the store via claim()/complete(); the consumer + reads results via get(). A key progresses through three states: absent + (not yet claimed), in-flight (claimed, Future not yet resolved), and + completed (_CacheEntry). + + Because results are always run at the high time limit, a completed entry + can serve any query whose time limit is <= the run limit: a result whose + runtime exceeds the query limit is reclassified as TLE. A query with a + higher limit than the run limit cannot be served from cache and returns None. + """ + + def __init__(self) -> None: + self._lock = Lock() + self._store: dict[CacheKey, Future[SubmissionResult] | _CacheEntry] = {} + + def claim(self, testcase: TestCase) -> bool: + """Atomically claim testcase for execution. + + Returns True if the key was unclaimed; the caller must eventually call + complete(). Returns False if the key is already in-flight or completed. + """ + key = testcase.reuse_key + with self._lock: + if key in self._store: + return False + self._store[key] = Future() + return True + + def complete(self, testcase: TestCase, result: SubmissionResult, run_timelim: float) -> None: + """Store the completed result and wake any consumer waiting on the future.""" + key = testcase.reuse_key + with self._lock: + future = self._store[key] + self._store[key] = _CacheEntry(result=result, run_timelim=run_timelim) + assert isinstance(future, Future) + future.set_result(result) # outside lock — callbacks may acquire other locks + + def get(self, testcase: TestCase, timelim: float) -> SubmissionResult | Future[SubmissionResult] | None: + """Look up a result for testcase at timelim. + + Returns: + SubmissionResult — completed result, already reclassified for timelim; use directly. + Future — in-flight; resolves to a reclassified SubmissionResult. + None — not present, or was run at a lower limit than timelim and + cannot be reused; caller must run the testcase synchronously. + """ + key = testcase.reuse_key + with self._lock: + val = self._store.get(key) + if val is None: + return None + if isinstance(val, Future): + chained: Future[SubmissionResult] = Future() + val.add_done_callback(lambda f: chained.set_result(_with_test_node(_reclassify(f.result(), timelim), testcase))) + return chained + if timelim > val.run_timelim: + # Entry was produced at a lower limit; cannot safely reclassify upward. + return None + return _with_test_node(_reclassify(val.result, timelim), testcase) diff --git a/problemtools/judge/execute.py b/problemtools/judge/execute.py index d9396f33..220d94a4 100644 --- a/problemtools/judge/execute.py +++ b/problemtools/judge/execute.py @@ -32,7 +32,7 @@ if TYPE_CHECKING: from ..verifyproblem import TestCase -from .result import SubmissionResult, TimeLimits, classify_result +from .result import SubmissionResult from .validate import _parse_validator_result, _validate_output _INTERACTIVE_OUTPUT_RE = re.compile(r'\d+ \d+\.\d+ \d+ \d+\.\d+ (validator|submission)') @@ -206,16 +206,18 @@ def execute_testcase( sub: Program, output_validator: Program, metadata: Metadata, - timelimits: TimeLimits, + timelim: float, base_dir: Path, diag: Diagnostics, -) -> tuple[SubmissionResult, SubmissionResult, SubmissionResult]: - """Run sub on testcase and return (nominal, low, high) SubmissionResults.""" +) -> SubmissionResult: + """Run sub on a single testcase.""" with tempfile.TemporaryDirectory(dir=base_dir) as exec_dir: execution_dir = Path(exec_dir) (execution_dir / 'feedback').mkdir() if metadata.is_multi_pass(): - raw = _run_multipass(testcase, sub, output_validator, metadata, timelimits.high, execution_dir, diag) + result = _run_multipass(testcase, sub, output_validator, metadata, timelim, execution_dir, diag) else: - raw = _run_pass(testcase.infile_path, testcase, sub, output_validator, metadata, timelimits.high, execution_dir, diag) - return classify_result(raw, timelimits) + result = _run_pass(testcase.infile_path, testcase, sub, output_validator, metadata, timelim, execution_dir, diag) + result.test_node = testcase + result.runtime_testcase = testcase + return result diff --git a/problemtools/judge/grade.py b/problemtools/judge/grade.py new file mode 100644 index 00000000..6a62c6fb --- /dev/null +++ b/problemtools/judge/grade.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import os +import re +import tempfile +from pathlib import Path +from typing import cast + +from ..diagnostics import Diagnostics +from ..run import Program +from .result import SubmissionResult, Verdict + +_GRADER_OUTPUT_RE = re.compile(r'^((AC)|(WA)|(TLE)|(RTE)|(JE))\s+-?[0-9.]+\s*$') + + +def grade_group( + sub_results: list[SubmissionResult], + grader: Program, + grader_flags: list[str], + base_dir: Path, + diag: Diagnostics, +) -> tuple[Verdict, float | None]: + """Run grader on sub_results and return (verdict, score). + + Returns ('AC', 0.0) immediately if sub_results is empty. + Returns ('JE', None) on any grader error. + """ + if not sub_results: + return ('AC', 0.0) + + if not grader.compile()[0]: + diag.error(f'Failed to compile grader {grader}') + return ('JE', None) + + grader_input = ''.join(f'{r.verdict} {0 if r.score is None else r.score}\n' for r in sub_results) + diag.debug(f'Grading {len(sub_results)} results:\n{grader_input}') + diag.debug(f'Grader flags: {grader_flags}') + + with tempfile.TemporaryDirectory(dir=base_dir) as tmpdir: + infile = Path(tmpdir) / 'grader_in' + outfile = Path(tmpdir) / 'grader_out' + errfile = Path(tmpdir) / 'grader_err' + infile.write_text(grader_input) + + status, _runtime = grader.run(str(infile), str(outfile), str(errfile), args=grader_flags) + + grader_output = outfile.read_text(errors='replace') if outfile.exists() else '' + stderr_content = errfile.read_text(errors='replace') if errfile.exists() else '' + + if not os.WIFEXITED(status) or os.WEXITSTATUS(status) != 0: + if not os.WIFEXITED(status): + diag.error(f'Judge error: {grader} crashed') + else: + diag.error(f'Judge error: exit code {os.WEXITSTATUS(status)} for grader {grader}, expected 0') + if stderr_content: + diag.error(f'Grader stderr:\n{stderr_content}') + diag.debug(f'Grader input:\n{grader_input}') + return ('JE', None) + + if not _GRADER_OUTPUT_RE.match(grader_output): + diag.error('Judge error: invalid format of grader output') + diag.debug(f'Output must match: "{_GRADER_OUTPUT_RE.pattern}"') + diag.debug(f'Output was: "{grader_output}"') + return ('JE', None) + + verdict_str, score_str = grader_output.split() + diag.debug(f'Grader result: {verdict_str} ({score_str})') + return cast(Verdict, verdict_str), float(score_str) diff --git a/problemtools/judge/result.py b/problemtools/judge/result.py index 7a2530c2..3aacc0a4 100644 --- a/problemtools/judge/result.py +++ b/problemtools/judge/result.py @@ -1,10 +1,9 @@ from __future__ import annotations -from dataclasses import dataclass from typing import TYPE_CHECKING, Literal if TYPE_CHECKING: - from ..verifyproblem import TestCase + from ..verifyproblem import TestCase, TestCaseGroup Verdict = Literal['AC', 'TLE', 'OLE', 'MLE', 'RTE', 'WA', 'PAC', 'JE'] @@ -21,18 +20,10 @@ def __init__( self.score = score self.reason = reason self.additional_info = additional_info - self.testcase: TestCase | None = None + self.test_node: TestCase | TestCaseGroup | None = None self.runtime_testcase: TestCase | None = None self.runtime = -1.0 - self.ac_runtime = -1.0 - self.ac_runtime_testcase: TestCase | None = None - self.validator_first = False - self.sample_failures: list[SubmissionResult] = [] - - def set_ac_runtime(self) -> None: - if self.verdict == 'AC': - self.ac_runtime = self.runtime - self.ac_runtime_testcase = self.runtime_testcase + self.validator_first = False # Needed to work around interactive giving unreliable runtime on WA def __str__(self) -> str: verdict = self.verdict @@ -41,47 +32,8 @@ def __str__(self) -> str: verdict += f' ({self.score:.0f})' if self.reason is not None: details.append(self.reason) - if self.testcase is not None: - details.append(f'testcase: {self.testcase}') + if self.test_node is not None and not self.test_node.is_group: + details.append(f'testcase: {self.test_node}') if self.runtime != -1: details.append(f'CPU: {self.runtime:.2f}s @ {self.runtime_testcase}') return verdict if not details else f'{verdict} [{", ".join(details)}]' - - -@dataclass -class TimeLimits: - nominal: float # official limit; verdict based on this - low: float # below this is comfortably AC; above is "sensitive to time limit" - high: float # wall-clock ceiling enforced on the process - - -def classify_result( - result: SubmissionResult, - tl: TimeLimits, -) -> tuple[SubmissionResult, SubmissionResult, SubmissionResult]: - """Map a raw high-limit result into the (nominal, low, high) triple.""" - runtime = result.runtime - if runtime <= tl.low: - nominal = low = high = result - elif runtime <= tl.nominal: - tle = SubmissionResult('TLE') - tle.runtime = runtime - nominal, low, high = result, tle, result - elif result.validator_first and result.verdict == 'WA': - # Interactive: validator exited first with WA. This can cause the submission to run - # longer than it should. Cap runtimes at tl.low so this doesn't inflate the time limit. - import copy - - high = copy.copy(result) - high.runtime = min(runtime, tl.low) - wa = SubmissionResult('WA') - wa.validator_first = True - wa.runtime = high.runtime - nominal = low = wa - else: - tle = SubmissionResult('TLE') - tle.runtime = runtime - nominal, low, high = tle, tle, result - for r in (nominal, low, high): - r.set_ac_runtime() - return nominal, low, high diff --git a/problemtools/judge/submission_judge.py b/problemtools/judge/submission_judge.py new file mode 100644 index 00000000..ad0a5d4f --- /dev/null +++ b/problemtools/judge/submission_judge.py @@ -0,0 +1,224 @@ +from __future__ import annotations + +import copy +import sys +from concurrent.futures import Future +from pathlib import Path +from threading import Lock +from typing import TYPE_CHECKING + +from ..context import Context +from ..diagnostics import Diagnostics +from ..metadata import Metadata +from ..run import Program, get_tool +from .cache import ResultStore +from .execute import execute_testcase +from .grade import grade_group +from .result import SubmissionResult + +if TYPE_CHECKING: + from ..verifyproblem import TestCase, TestCaseGroup + + +class _Cancelled: + """Thread-safe set of cancelled testcase identities (by Path to infile).""" + + def __init__(self) -> None: + self._lock = Lock() + self._ids: set[Path] = set() + + def __contains__(self, testcase: TestCase) -> bool: + with self._lock: + return testcase.infile_path in self._ids + + def add(self, testcase: TestCase) -> None: + with self._lock: + self._ids.add(testcase.infile_path) + + +class SubmissionJudge: + """Run a submission against a test case group tree and collect results. + + The typical flow uses two phases: + + 1. precompute(timelim) — submits all filtered testcases as background jobs that + execute the submission and populate a result cache. Returns immediately. + 2. judge(timelim) — walks the test tree in DFS order, consuming cached results + (blocking on any still in-flight) or running synchronously if a worker missed + a testcase. Returns a flat list of SubmissionResults, one per testcase plus + one aggregate per group, with the root group's result last. + + This lets the submission run on all testcases in parallel while the consumer + processes results in order for grading and early-exit logic. + + When an on_reject:break group encounters a non-AC result, pending (not-yet-started) + background jobs for the remaining testcases in that subtree are skipped. In-flight + jobs complete normally; their results are simply not consumed by judge(). + """ + + _default_grader: Program | None = get_tool('default_grader') + + def __init__( + self, + sub: Program, + output_validator: Program, + metadata: Metadata, + root: TestCaseGroup, + base_dir: Path, + context: Context, + diag: Diagnostics, + custom_grader: Program | None = None, + ) -> None: + self._sub = sub + self._output_validator = output_validator + self._metadata = metadata + self._base_dir = base_dir + self._context = context + self._diag = diag + self._custom_grader = custom_grader + self._store = ResultStore() + self._root = root + self._cancelled = _Cancelled() + self._precompute_started = False + + def precompute(self, timelim: float) -> None: + """Submit all filtered testcases as background jobs. + + Returns immediately; workers run concurrently and deposit results into the + cache as they finish. Call judge() afterwards to consume results in DFS order. + May be called at most once. + """ + assert not self._precompute_started, 'precompute() called more than once' + self._precompute_started = True + filtered_testcases = (item for item in self._root.get_all_testcases() if item.matches_filter(self._context.data_filter)) + for testcase in filtered_testcases: + self._context.submit_background_work(self._populate_cache_for_testcase, testcase, timelim) + + def judge(self, timelim: float) -> list[SubmissionResult]: + """Walk the test tree in DFS order and return results as a flat list. + + Each SubmissionResult has test_node set to the TestCase or TestCaseGroup it + covers. Group results immediately follow all their descendants; the root + group's result is the last element. Returns an empty list if all testcases + were filtered out. + + Blocks on any cache entry still being computed by a precompute() worker. + Testcases not yet claimed by a worker are run synchronously. Safe to call + multiple times with different timelim values; subsequent calls almost always + hit the cache without new work. When querying multiple time limits, call + with the largest first so that cached results can be reused for smaller limits. + """ + return self._judge_group(self._root, timelim) + + def _run(self, testcase: TestCase, timelim: float) -> SubmissionResult: + return execute_testcase( + testcase, + self._sub, + self._output_validator, + self._metadata, + timelim, + self._base_dir, + self._diag, + ) + + def _populate_cache_for_testcase(self, testcase: TestCase, timelim: float) -> None: + if testcase in self._cancelled: + return + if not self._store.claim(testcase): + return # duplicate testcase (same reuse_key) or already in store + try: + result = self._run(testcase, timelim) + except Exception as e: + result = SubmissionResult('JE', reason=f'Internal error: {e}') + self._store.complete(testcase, result, timelim) + + def _judge_testcase(self, testcase: TestCase, timelim: float) -> SubmissionResult: + val = self._store.get(testcase, timelim) + if isinstance(val, Future): + return val.result() # block until worker finishes + if val is not None: + return val + # Synchronous fallback: worker hasn't claimed this testcase yet, or second + # judge() call with a timelim the store can't serve. Claim so any pending + # worker for it bails out rather than duplicating work. + claimed = self._store.claim(testcase) + result = self._run(testcase, timelim) + if claimed: + self._store.complete(testcase, result, timelim) + return result + + def _cancel_subtree(self, group: TestCaseGroup) -> None: + for testcase in group.get_all_testcases(): + self._cancelled.add(testcase) + + def _grader_for(self, group: TestCaseGroup) -> Program | None: + if group.config.get('grading') == 'custom': + return self._custom_grader + return self._default_grader + + def _judge_group(self, group: TestCaseGroup, timelim: float) -> list[SubmissionResult]: + all_results: list[SubmissionResult] = [] # Results of all children, groups and test cases, in DFS order. Our return value + child_results: list[SubmissionResult] = [] # Results of our direct children, what we'll pass to the grader + + filtered_items = (item for item in group._items if item.matches_filter(self._context.data_filter)) + for item in filtered_items: + if item.is_group: + sub = self._judge_group(item, timelim) + if not sub: # If everything in a group is filtered, it returns an empty list. + continue + all_results.extend(sub) + result = sub[-1] # last element is the subgroup's own result + else: + if sys.stdout.isatty(): + msg = f'Running {self._sub} on {item}...' + sys.stdout.write(msg) + sys.stdout.flush() + result = self._judge_testcase(item, timelim) + if sys.stdout.isatty(): + sys.stdout.write('\b \b' * len(msg)) + + # Apply default score here - after we've entered it into the cache, as it may also be present in other groups with different defaults + if result.score is None: + result = copy.copy(result) + if result.verdict == 'AC': + result.score = group.config['accept_score'] + else: + result.score = group.config['reject_score'] + all_results.append(result) + + child_results.append(result) + if result.verdict != 'AC' and group.config.get('on_reject') == 'break': + self._cancel_subtree(group) # Stop starting more precomputations for submissions in this group or below + break + + if not all_results: # All our children were filtered + return [] + + group_verdict = self._aggregate_group_result(child_results, group) + all_results.append(group_verdict) + return all_results + + def _aggregate_group_result(self, child_results: list[SubmissionResult], group: TestCaseGroup) -> SubmissionResult: + judge_error = next((r for r in child_results if r.verdict == 'JE'), None) + if judge_error: + result = copy.copy(judge_error) + else: + grader = self._grader_for(group) + if grader is None: + result = SubmissionResult('JE', reason='grader not found') + else: + grader_flags = group.config.get('grader_flags', '').split() + verdict, score = grade_group(child_results, grader, grader_flags, self._base_dir, self._diag) + result = SubmissionResult(verdict, score=score) + slowest = max(child_results, key=lambda r: r.runtime) + result.runtime = slowest.runtime + result.runtime_testcase = slowest.runtime_testcase + # The grader doesn't tell us why it gave a certain result. We still want to propagate reason + # and additional_info. As a heuristic, look for the last entry with the same verdict as the + # group got, and copy from there. + matching = next((r for r in reversed(child_results) if r.verdict == verdict), None) + if matching: + result.reason = matching.reason + result.additional_info = matching.additional_info + result.test_node = group + return result diff --git a/problemtools/verifyproblem.py b/problemtools/verifyproblem.py index 4b3d1fdb..ac53c53f 100644 --- a/problemtools/verifyproblem.py +++ b/problemtools/verifyproblem.py @@ -4,8 +4,6 @@ import argparse import math -import threading -import queue import glob import string import hashlib @@ -16,7 +14,6 @@ import logging import tempfile import sys -import copy import random import traceback import uuid @@ -35,11 +32,12 @@ from .context import Context, PROBLEM_PARTS from .diagnostics import Diagnostics, LoggingDiagnostics, VerifyError from .formatversion import FormatVersion, get_format_version -from .judge import SubmissionResult, Verdict, TimeLimits, validate_output, execute_testcase +from .judge import CacheKey, SubmissionJudge, SubmissionResult, Verdict, validate_output from .version import add_version_arg from abc import ABC -from typing import Any, Callable, ClassVar, Pattern, Match, ParamSpec, TypeVar, cast +from functools import cached_property +from typing import Any, Callable, ClassVar, Literal, Pattern, Match, ParamSpec, TypeVar from pydantic import ValidationError random.seed(42) @@ -128,7 +126,7 @@ def check(self, context: Context) -> bool: class TestCase(ProblemAspect): - Result = tuple[SubmissionResult, SubmissionResult, SubmissionResult] + is_group: Literal[False] = False # Temporary workaround for a circular import in judge/submission_judge.py def __init__(self, problem: Problem, base: str, testcasegroup: TestCaseGroup) -> None: super().__init__(f'test.{testcasegroup.name}.{os.path.basename(base)}', problem) @@ -137,7 +135,6 @@ def __init__(self, problem: Problem, base: str, testcasegroup: TestCaseGroup) -> self.ansfile = f'{base}.ans' self._problem = problem self.testcasegroup = testcasegroup - self.reuse_result_from: TestCase | None = None self.counter = len(problem.testcase_by_infile) problem.testcase_by_infile[self.infile] = self @@ -182,6 +179,14 @@ def output_validator_flags(self) -> list[str]: + self.testcasegroup.config.get('output_validator_flags', '').split() ) + @cached_property + def reuse_key(self) -> CacheKey: + return CacheKey( + input_hash=hashlib.sha256(self.infile_path.read_bytes()).digest(), + ans_hash=hashlib.sha256(self.ansfile_path.read_bytes()).digest(), + validator_flags=tuple(self.output_validator_flags), + ) + def is_in_sample_group(self) -> bool: return self.strip_path_prefix(self.infile).startswith('sample') @@ -218,7 +223,6 @@ def check(self, context: Context) -> bool: self.error(f'judge answer file got {val_res} on testcase {self.strip_path_prefix(self.ansfile)}') else: self.warning(f'judge answer file got {val_res} on testcase {self.strip_path_prefix(self.ansfile)}') - self._check_symlinks() return self._check_res def __str__(self) -> str: @@ -227,83 +231,15 @@ def __str__(self) -> str: def matches_filter(self, filter_re: Pattern[str]) -> bool: return filter_re.search(self.strip_path_prefix(self._base)) is not None - def set_symlinks(self) -> None: - if not os.path.islink(self.infile): - return - target = os.path.realpath(self.infile) - if target in self._problem.testcase_by_infile: - self.reuse_result_from = self._problem.testcase_by_infile[target] - - def _check_symlinks(self) -> bool: - if not os.path.islink(self.infile): - return True - nicepath = os.path.relpath(self.infile, self._problem.probdir) - in_target = os.path.realpath(self.infile) - ans_target = os.path.realpath(self.ansfile) - if not in_target.endswith('.in'): - self.error(f"Symbolic link does not point to a .in file for input '{nicepath}'") - return False - if ans_target != f'{in_target[:-3]}.ans': - self.error(f"Symbolic link '{nicepath}' must have a corresponding link for answer file") - return False - if self.reuse_result_from is None: - self.error(f"Symbolic link points outside data/ directory for file '{nicepath}'") - return False - if ( - self.testcasegroup.config['output_validator_flags'] - != self.reuse_result_from.testcasegroup.config['output_validator_flags'] - ): - self.error(f"Symbolic link '{nicepath}' points to testcase with different output validator flags") - return False - return True - - def run_submission(self, sub, runner: Runner, context: Context) -> Result: - (res, res_low, res_high), reused = runner.run(self) - res = self._init_result_for_testcase(res) - res_low = self._init_result_for_testcase(res_low) - res_high = self._init_result_for_testcase(res_high) - msg = 'Reused test file result' if reused else 'Test file result' - self.info(f'{msg}: {res}') - if res.verdict != 'AC' and self.is_in_sample_group(): - res.sample_failures.append(res) - - return (res, res_low, res_high) - - def run_submission_real(self, sub, context: Context, timelim: float, timelim_low: float, timelim_high: float) -> Result: - # This may be called off-main thread. - timelimits = TimeLimits(nominal=timelim, low=timelim_low, high=timelim_high) - return execute_testcase( - testcase=self, - sub=sub, - output_validator=self._problem.output_validators.output_validator, - metadata=self._problem.metadata, - timelimits=timelimits, - base_dir=Path(self.problem.tmpdir), - diag=self._diag, - ) - - def _init_result_for_testcase(self, res: SubmissionResult) -> SubmissionResult: - res = copy.copy(res) - res.testcase = self - res.runtime_testcase = self - if res.score is None: - if res.verdict == 'AC': - res.score = self.testcasegroup.config['accept_score'] - else: - res.score = self.testcasegroup.config['reject_score'] - return res - def get_all_testcases(self) -> list[TestCase]: return [self] - def all_datasets(self) -> list[str]: - return [self._base] - class TestCaseGroup(ProblemAspect): name: str _DEFAULT_CONFIG = config.load_config('testdata.yaml') _SCORING_ONLY_KEYS = ['accept_score', 'reject_score', 'range'] + is_group: Literal[True] = True # Temporary workaround for a circular import in judge/submission_judge.py def __init__(self, problem: Problem, datadir: str | None = None, parent: TestCaseGroup | None = None): self._parent = parent @@ -366,23 +302,16 @@ def __init__(self, problem: Problem, datadir: str | None = None, parent: TestCas if ext == '.ans' and os.path.isfile(f'{base}.in'): self._items.append(TestCase(problem, base, self)) - if not parent: - self.set_symlinks() - def start_background_work(self, context: Context) -> None: pass def __str__(self) -> str: return f'testcase group {self.name}' - def set_symlinks(self) -> None: - for sub in self._items: - sub.set_symlinks() - def matches_filter(self, filter_re: Pattern[str]) -> bool: return True - def get_all_testcases(self) -> list: + def get_all_testcases(self) -> list[TestCase]: res: list = [] for child in self._items: res += child.get_all_testcases() @@ -394,12 +323,6 @@ def get_testcases(self) -> list[TestCase]: def get_subgroups(self) -> list[TestCaseGroup]: return [child for child in self._items if isinstance(child, TestCaseGroup)] - def get_subgroup(self, name: str) -> TestCaseGroup | None: - return next( - (child for child in self._items if isinstance(child, TestCaseGroup) and os.path.basename(child._datadir) == name), - None, - ) - def has_custom_groups(self) -> bool: return any(group.get_subgroups() for group in self.get_subgroups()) @@ -411,6 +334,17 @@ def get_score_range(self) -> tuple[float, float]: except Exception: return (float('-inf'), float('inf')) + def check_score_in_bounds(self, sub: run.Program, score: float) -> None: + # Don't warn twice on the same subgroup, since every submission is likely + # to have the same error. + min_score, max_score = self.get_score_range() + if not (min_score <= score <= max_score) and not self._seen_oob_scores: + self._seen_oob_scores = True + groupname = os.path.relpath(self._datadir, self._problem.probdir) + self.error( + f'submission {sub} got score {score} on group {groupname}, which is outside of expected score range [{min_score}, {max_score}]' + ) + def check(self, context: Context) -> bool: if self._check_res is not None: return self._check_res @@ -556,80 +490,6 @@ def parse_num(s: str, i: int) -> tuple[int, int]: return self._check_res - def run_submission(self, sub, runner: Runner, context: Context) -> TestCase.Result: - self.info(f'Running on {self}') - subres: list[SubmissionResult] = [] - subres_low: list[SubmissionResult] = [] - subres_high: list[SubmissionResult] = [] - active_low, active = True, True - on_reject = self.config['on_reject'] - broken = False - for child in self._items: - if not child.matches_filter(context.data_filter): - continue - res, res_low, res_high = child.run_submission(sub, runner, context) - subres_high.append(res_high) - if active: - subres.append(res) - if active_low: - subres_low.append(res_low) - if on_reject == 'break': - active_low &= res_low.verdict == 'AC' - active &= res.verdict == 'AC' - if res_high.verdict != 'AC': - broken = True - break - - runner.mark_group_done(self, broken) - - return ( - self.aggregate_results(sub, subres), - self.aggregate_results(sub, subres_low, shadow_result=True), - self.aggregate_results(sub, subres_high, shadow_result=True), - ) - - def aggregate_results(self, sub, sub_results: list[SubmissionResult], shadow_result: bool = False) -> SubmissionResult: - res = SubmissionResult('JE') - - for r in sub_results: - if r.runtime > res.runtime: - res.runtime = r.runtime - res.runtime_testcase = r.runtime_testcase - if r.ac_runtime > res.ac_runtime: - res.ac_runtime = r.ac_runtime - res.ac_runtime_testcase = r.ac_runtime_testcase - res.sample_failures.extend(r.sample_failures) - - judge_error = next((r for r in sub_results if r.verdict == 'JE'), None) - if judge_error: - res.verdict = judge_error.verdict - res.reason = judge_error.reason - res.additional_info = judge_error.additional_info - res.testcase = judge_error.testcase - else: - res.verdict, score = self._problem.graders.grade(sub_results, self, shadow_result) - if sub_results: - res.testcase = sub_results[-1].testcase - res.additional_info = sub_results[-1].additional_info - if self._problem.is_scoring(): - res.score = score - min_score, max_score = self.get_score_range() - if score is not None and not (min_score <= score <= max_score) and not self._seen_oob_scores: - # Don't warn twice on the same subgroup, since every submission is likely - # to have the same error. - self._seen_oob_scores = True - groupname = os.path.relpath(self._datadir, self._problem.probdir) - self.error( - f'submission {sub} got {res} on group {groupname}, which is outside of expected score range [{min_score}, {max_score}]' - ) - return res - - def all_datasets(self) -> list: - res: list = [] - for child in self._items: - res += child.all_datasets() - return res - class ProblemStatement(ProblemPart): statements: dict[str, list[Path]] # Maps language code -> statement(s) @@ -1082,94 +942,6 @@ def check(self, context: Context) -> bool: self.fatal(f'Compile error for {self._grader}', msg) return self._check_res - def grade( - self, sub_results: list[SubmissionResult], testcasegroup: TestCaseGroup, shadow_result: bool = False - ) -> tuple[Verdict, float | None]: - if testcasegroup.config['grading'] == 'default': - if not self._default_grader: - self.fatal('Failed to locate default grader') - return ('JE', None) - grader = self._default_grader - else: - if not self._grader: - self.fatal('Problem has grading: custom without any custom grader') - return ('JE', None) - grader = self._grader - - if not grader.compile()[0]: - self.fatal(f'Failed to compile grader {grader}', grader.compile()[1]) - return ('JE', None) - - grader_input = ''.join([f'{r.verdict} {0 if r.score is None else r.score}\n' for r in sub_results]) - grader_output_re = r'^((AC)|(WA)|(TLE)|(RTE)|(JE))\s+-?[0-9.]+\s*$' - verdict: Verdict = 'AC' - score: float = 0 - - if not sub_results: - self.info(f'No results on {testcasegroup}, so no grader ran') - return (verdict, score) - - grader_flags = testcasegroup.config['grader_flags'].split() - self.debug(f'Grading {len(sub_results)} results:\n{grader_input}') - self.debug(f'Grader flags: {grader_flags}') - - infile_path = outfile_path = errfile_path = None - try: - # Create input and output files for grader - # We do it in this awkward way because the files need to be closed before reading/writing - with tempfile.NamedTemporaryFile(mode='w', delete=False) as infile: - infile.write(grader_input) - infile_path = infile.name - - with tempfile.NamedTemporaryFile(delete=False) as outfile: - outfile_path = outfile.name - - with tempfile.NamedTemporaryFile(delete=False) as errfile: - errfile_path = errfile.name - - status, runtime = grader.run(infile_path, outfile_path, errfile_path, args=grader_flags) - - with open(outfile_path, 'r') as fh: - grader_output = fh.read() - - with open(errfile_path, 'r') as errfile: - stderr_content = errfile.read() - - if not os.WIFEXITED(status) or os.WEXITSTATUS(status) != 0: - if not os.WIFEXITED(status): - self.error(f'Judge error: {grader} crashed') - else: - self.error(f'Judge error: exit code {os.WEXITSTATUS(status)} for grader {grader}, expected 0') - self.error(f'Grader stderr:\n{stderr_content}\n') - self.debug(f'Grader input:\n{grader_input}') - return ('JE', None) - - if not re.match(grader_output_re, grader_output): - self.error('Judge error: invalid format of grader output') - self.debug(f'Output must match: "{grader_output_re}"') - self.debug(f'Output was: "{grader_output}"') - return ('JE', None) - - verdict_str, score_str = grader_output.split() - # Make mypy happy by explicitly using cast - verdict = cast(Verdict, verdict_str) - score = float(score_str) - - if not shadow_result: - self.debug(f'Grade on {testcasegroup} is {verdict} ({score})') - - return (verdict, score) - except Exception as e: - self.error(f'Grader failed with exception {e}') - return ('JE', None) - finally: - for path in [infile_path, outfile_path, errfile_path]: - if path: - try: - os.remove(path) - except OSError: - pass - class OutputValidators(ProblemPart): _default_validator = run.get_tool('default_validator') @@ -1280,115 +1052,6 @@ def run_junk_case(case_desc: str, junk_content: bytes, testcases: list[TestCase] return self._check_res -class Runner: - def __init__(self, problem: Problem, sub, context: Context, timelim: float, timelim_low: float, timelim_high: float) -> None: - self._problem = problem - self._sub = sub - self._context = context - self._multithreaded = context.executor is not None - self._timelim = timelim - self._timelim_low = timelim_low - self._timelim_high = timelim_high - self._cache: dict[TestCase, TestCase.Result] = {} - if self._multithreaded: - self._queues: dict[TestCase, queue.Queue[TestCase.Result]] = {} - self._lock = threading.Lock() - self._started_jobs: set[TestCase] = set() - self._done_groups: set[TestCaseGroup] = set() - self._remaining_jobs: list[TestCase] = [] - self._recompute_jobs() - - def __enter__(self) -> Runner: - if self._multithreaded: - for i in range(len(self._remaining_jobs)): - self._context.submit_background_work(self._work) - return self - - def __exit__(self, *exc) -> None: - if self._multithreaded: - with self._lock: - self._remaining_jobs = [] - - def run(self, testcase: TestCase) -> tuple[TestCase.Result, bool]: - while testcase.reuse_result_from: - testcase = testcase.reuse_result_from - - if testcase in self._cache: - return (self._cache[testcase], True) - - if sys.stdout.isatty(): - msg = f'Running {self._sub} on {testcase}...' - sys.stdout.write(msg) - sys.stdout.flush() - - if self._multithreaded: - result = self._queues[testcase].get() - else: - result = self._run_submission_real(testcase) - - if sys.stdout.isatty(): - sys.stdout.write('\b \b' * len(msg)) - - self._cache[testcase] = result - return (result, False) - - def mark_group_done(self, group: TestCaseGroup, broken: bool) -> None: - if self._multithreaded: - self._done_groups.add(group) - if broken: - # Since a group was broken out of, some test cases may no - # longer be relevant to run. Recompute the work list. - self._recompute_jobs() - - def _run_submission_real(self, item: TestCase) -> TestCase.Result: - return item.run_submission_real(self._sub, self._context, self._timelim, self._timelim_low, self._timelim_high) - - def _work(self) -> None: - item = self._next_job() - if item: - res = self._run_submission_real(item) - self._queues[item].put(res) - - def _gather_testcases(self, item: TestCase | TestCaseGroup) -> list[TestCase]: - if not item.matches_filter(self._context.data_filter): - return [] - if isinstance(item, TestCase): - # If testcase is symlink, recursively follow the symlinks until we get a real testcase, ignoring - # whether the name of testcases pointed to matches the filter - while item.reuse_result_from: - item = item.reuse_result_from - - return [item] - elif item not in self._done_groups: - ret = [] - for child in item.get_testcases() + item.get_subgroups(): - ret.extend(self._gather_testcases(child)) - return ret - else: - return [] - - def _next_job(self) -> TestCase | None: - with self._lock: - if self._remaining_jobs: - job = self._remaining_jobs.pop() - self._started_jobs.add(job) - return job - else: - return None - - def _recompute_jobs(self) -> None: - with self._lock: - seen = set(self._started_jobs) - self._remaining_jobs = [] - for testcase in self._gather_testcases(self._problem.testdata): - if testcase not in seen: - seen.add(testcase) - self._remaining_jobs.append(testcase) - if testcase not in self._queues: - self._queues[testcase] = queue.Queue(maxsize=1) - self._remaining_jobs.reverse() - - class Submissions(ProblemPart): # (verdict, directory, required) _VERDICTS: list[tuple[Verdict, str, bool]] = [ @@ -1421,48 +1084,81 @@ def check_submission( self, sub, context: Context, expected_verdict: Verdict, timelim: float, timelim_high: float ) -> SubmissionResult: desc = f'{expected_verdict} submission {sub}' - partial = False - if expected_verdict == 'PAC': - expected_verdict = 'AC' - partial = True - # For partially accepted, we don't want to use them to lower bound the time limit, but we do want - # to warn if they're slow enough that they would have affected the time limit, had they been used - # to compute it. - timelim_low = timelim / self.problem.metadata.limits.time_multipliers.ac_to_time_limit - else: - timelim_low = timelim - - with Runner(self.problem, sub, context, timelim, timelim_low, timelim_high) as runner: - result, result_low, result_high = self.problem.testdata.run_submission(sub, runner, context) + partial = expected_verdict == 'PAC' - if result.verdict == 'AC' and expected_verdict == 'AC' and not partial and result.sample_failures: - res = result.sample_failures[0] - self.warning(f'{desc} got {res.verdict} on sample: {res}') - - if result_low.verdict != result_high.verdict or result_low.score != result_high.score: - r1, r2 = ( - (result_low, result_high) - if result_low.verdict == result_high.verdict - else (result_low.verdict, result_high.verdict) - ) + judge = SubmissionJudge( + sub=sub, + output_validator=self.problem.output_validators.output_validator, + metadata=self.problem.metadata, + root=self.problem.testdata, + base_dir=Path(self.problem.tmpdir), + context=context, + diag=self._diag, + custom_grader=self.problem.graders._grader, + ) + if context.executor is not None: + judge.precompute(timelim_high) + results_high = judge.judge(timelim_high) + if not results_high: + self.fatal('check_submission called, but found no test cases to run on.') + result_high = results_high[-1] + + results = judge.judge(timelim) + result = results[-1] + + # Check if scores were outside of the range for any groups + if self.problem.is_scoring(): + for r in results: + if r.score is not None and isinstance(r.test_node, TestCaseGroup): + r.test_node.check_score_in_bounds(sub, r.score) + + # Warn if AC (but not PAC) submissions fail on samples. It's not uncommon for sample cases to be + # ignored, so failing on them could be silent otherwise. Skip warning if the result isn't AC - + # then something worse has gone wrong, and we'll error later. + if expected_verdict == 'AC' and result.verdict == 'AC': + if sample_failure := self._find_sample_failure(results): + self.warning(f'{desc} got {sample_failure.verdict} on sample: {sample_failure}') + + # Warn if a PAC submission would affect time limit, had it been use to compute the time limit. Only do this + # if it gets AC on the computed time limit, otherwise we have other warnings below. + if partial and result.verdict == 'AC': + self._warn_pac_too_slow(judge, results, timelim, desc) + + if result.verdict != result_high.verdict or result.score != result_high.score: self.warning( - f'{desc} sensitive to time limit: limit of {timelim_low} secs -> {r1}, limit of {timelim_high} secs -> {r2}' + f'{desc} sensitive to time limit: limit of {timelim} secs -> {result}, limit of {timelim_high} secs -> {result_high}' ) + required_verdict: Verdict = 'AC' if partial else expected_verdict if partial and self.fully_accepted(result): - self.warning(f'{desc} got {result}') - elif result.verdict == expected_verdict: + self.warning(f'{desc} was fully accepted: {result}') + elif result.verdict == required_verdict: self.msg(f' {desc} OK: {result}') - if expected_verdict == 'AC' and not partial and not self.fully_accepted(result) and self.full_score_finite(): + if not partial and required_verdict == 'AC' and not self.fully_accepted(result) and self.full_score_finite(): # For some heuristic problems, this is expected. Thus, only warn. self.warning(f'{desc} did not attain full score (consider moving it to partially_accepted)') - elif result_high.verdict == expected_verdict and not (partial and self.fully_accepted(result_high)): + elif result_high.verdict == required_verdict and not (partial and self.fully_accepted(result_high)): self.msg(f' {desc} OK with extra time: {result_high}') else: self.error(f'{desc} got {result}', result_high.additional_info) return result + def _find_sample_failure(self, results: list[SubmissionResult]) -> SubmissionResult | None: + for r in results: + if r.verdict != 'AC' and isinstance(r.test_node, TestCase) and r.test_node.is_in_sample_group(): + return r + return None + + def _warn_pac_too_slow(self, judge: SubmissionJudge, results: list[SubmissionResult], timelim: float, desc: str) -> None: + """Warn if a PAC submission is slow enough that it would have affected the time limit.""" + runtime_without_affecting_tl = timelim / self.problem.metadata.limits.time_multipliers.ac_to_time_limit + if judge.judge(runtime_without_affecting_tl)[-1].verdict == 'AC': + return + for t in sorted(r.runtime for r in results if r.runtime > runtime_without_affecting_tl): + if judge.judge(t)[-1].verdict == 'AC': + self.warning(f'{desc} is slower than all AC submissions. It needs {t:.2f}s to get AC') + def full_score_finite(self) -> bool: min_score, max_score = self.problem.testdata.get_score_range() if self.problem.metadata.legacy_grading.objective == 'min': @@ -1515,6 +1211,10 @@ def check(self, context: Context) -> bool: if limits.time_limit is not None and context.fixed_timelim is not None: self.warning('There is a fixed time limit in problem.yaml, and you provided one on command line. Using command line.') + has_testcases = any(tc.matches_filter(context.data_filter) for tc in self.problem.testdata.get_all_testcases()) + if not has_testcases: + self.warning('Found no test cases to run on. Did you filter them all out?') + for verdict in Submissions._VERDICTS: acr = verdict[0] if verdict[2] and not self._submissions[acr]: @@ -1538,11 +1238,12 @@ def check(self, context: Context) -> bool: self.error(f'Compile error for {acr} submission {sub}', additional_info=msg) continue - timelim, timelim_high = self._compute_time_limit(fixed_limit, lower_bound_runtime) - res = self.check_submission(sub, context, acr, timelim, timelim_high) - runtimes.append(res.runtime) + if has_testcases: + timelim, timelim_high = self._compute_time_limit(fixed_limit, lower_bound_runtime) + res = self.check_submission(sub, context, acr, timelim, timelim_high) + runtimes.append(res.runtime) - if acr == 'AC': + if acr == 'AC' and has_testcases: if len(runtimes) > 0: lower_bound_runtime = max(runtimes)