Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.28
0.1.29
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "skribe"
version = "0.1.28"
version = "0.1.29"
description = "Property testing for Stylus smart contracts"
readme = "README.md"
requires-python = "~=3.10"
Expand Down Expand Up @@ -47,6 +47,9 @@ dev = [
[tool.hatch.metadata]
allow-direct-references = true

[tool.hatch.build.targets.sdist]
exclude = ["src/tests/"]

[tool.isort]
profile = "black"
line_length = 120
Expand Down
71 changes: 43 additions & 28 deletions src/skribe/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from functools import cached_property, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, TypeAlias
from typing import TYPE_CHECKING, Any, NamedTuple, TypeAlias

from eth_abi.grammar import TupleType, parse
from eth_abi.tools._strategies import get_abi_strategy
Expand All @@ -14,10 +14,9 @@
from pyk.kast.inner import KSort
from pyk.utils import run_process, single

from skribe.simulation import call_data
from .simulation import call_data

if TYPE_CHECKING:

from hypothesis.strategies import SearchStrategy


Expand Down Expand Up @@ -120,30 +119,46 @@ def is_foundry_test(ctr: EVMContract) -> bool:
return False


def get_arg_types(m: Method) -> tuple[str, ...]:
"""
Return the argument type strings of a method.

This function exists because `Method.arg_types` flattens tuple arguments,
into their component types. That flattening loses information about the
original ABI shape (for example, whether an argument was a tuple or an
array of tuples).
"""

sig = m.signature
arg_types_from_sig = sig[sig.index('(') :]

if arg_types_from_sig == '()':
return ()
else:
parsed_arg_types = parse(arg_types_from_sig)
assert isinstance(parsed_arg_types, TupleType)
return tuple(c.to_type_str() for c in parsed_arg_types.components)
class Signature(NamedTuple):
contract_name: str
name: str
arg_types: tuple[str, ...]

@staticmethod
def from_method(method: Method) -> Signature:
return Signature(
contract_name=method.contract_name,
name=method.name,
arg_types=Signature._extract_arg_types(method),
)

def argument_strategy(m: Method) -> SearchStrategy[bytes]:
arg_types = get_arg_types(m)
input_strategies = (get_abi_strategy(arg) for arg in arg_types)
tuple_strategy = strategies.tuples(*input_strategies)
encoder = partial(call_data, m.name, arg_types)
return tuple_strategy.map(encoder)
@staticmethod
def _extract_arg_types(method: Method) -> tuple[str, ...]:
"""
Return the argument type strings of a method.

This function exists because `Method.arg_types` flattens tuple arguments,
into their component types. That flattening loses information about the
original ABI shape (for example, whether an argument was a tuple or an
array of tuples).
"""

sig = method.signature
arg_types_from_sig = sig[sig.index('(') :]

if arg_types_from_sig == '()':
return ()
else:
parsed_arg_types = parse(arg_types_from_sig)
assert isinstance(parsed_arg_types, TupleType)
return tuple(c.to_type_str() for c in parsed_arg_types.components)

@property
def qualified_name(self) -> str:
return f'{self.contract_name}.{self.name}'

def argument_strategy(self) -> SearchStrategy[bytes]:
input_strategies = (get_abi_strategy(arg) for arg in self.arg_types)
tuple_strategy = strategies.tuples(*input_strategies)
encoder = partial(call_data, self.name, self.arg_types)
return tuple_strategy.map(encoder)
16 changes: 8 additions & 8 deletions src/skribe/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@

from rich.progress import TaskID

from .contract import Method
from .contract import Signature


class FuzzProgress(Progress):
fuzz_tasks: list[FuzzTask]

def __init__(self, bindings: Iterable[Method], max_examples: int):
def __init__(self, signatures: Iterable[Signature], max_examples: int):
super().__init__(
TextColumn('[progress.description]{task.description}'),
BarColumn(),
Expand All @@ -27,19 +27,19 @@ def __init__(self, bindings: Iterable[Method], max_examples: int):
self.fuzz_tasks = []

# Add all tests to the progress display before running them
for binding in bindings:
description = f'{binding.contract_name}.{binding.name}'
for signature in signatures:
description = signature.qualified_name
task_id = self.add_task(description, total=max_examples, start=False, status='Waiting')
self.fuzz_tasks.append(FuzzTask(binding, task_id, self))
self.fuzz_tasks.append(FuzzTask(signature, task_id, self))


class FuzzTask:
binding: Method
signature: Signature
task_id: TaskID
progress: FuzzProgress

def __init__(self, binding: Method, task_id: TaskID, progress: FuzzProgress):
self.binding = binding
def __init__(self, signature: Signature, task_id: TaskID, progress: FuzzProgress):
self.signature = signature
self.task_id = task_id
self.progress = progress

Expand Down
123 changes: 68 additions & 55 deletions src/skribe/skribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from functools import cached_property
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, NamedTuple

from eth_abi import decode, encode
from kontrol.foundry import Foundry
Expand All @@ -21,8 +21,7 @@
from pyk.utils import run_process
from pykwasm.wasm2kast import wasm2kast

from skribe.contract import StylusContract, argument_strategy, get_arg_types, is_foundry_test, setup_method

from .contract import Signature, StylusContract, is_foundry_test, setup_method
from .kast.syntax import (
call_stylus,
check_foundry_success,
Expand All @@ -37,14 +36,13 @@
from .utils import RECURSION_LIMIT, PykHooks, SkribeError, subst_on_k_cell

if TYPE_CHECKING:
from collections.abc import Mapping
from collections.abc import Iterable, Mapping
from typing import Any

from pyk.kast.inner import KInner
from pyk.kore.syntax import Pattern

from skribe.contract import ArbitrumContract, Method

from .contract import ArbitrumContract
from .progress import FuzzTask
from .utils import SkribeDefinition

Expand All @@ -60,8 +58,26 @@
TEST_CONTRACT_ID = 0x7FA9385BE102AC3EAC297483DD6233D62B3E1496


class Skribe:
class FuzzSpec(NamedTuple):
template: Pattern
signatures: tuple[Signature, ...]

@property
def dict(self) -> dict[str, Any]:
return {
'template': self.template.text,
'signatures': [
{
'contract_name': signature.contract_name,
'name': signature.name,
'arg_types': list(signature.arg_types),
}
for signature in self.signatures
],
}


class Skribe:
definition: SkribeDefinition
contract_dir: Path

Expand Down Expand Up @@ -155,19 +171,16 @@ def call_setup(setup: bool) -> tuple[KInner, ...]:

def run_test(
self,
conf: KInner,
subst: dict[str, KInner],
binding: Method,
template_pattern: Pattern,
signature: Signature,
max_examples: int,
task: FuzzTask,
) -> None:
"""Given a configuration with a deployed test contract, fuzz over the tests for the supplied binding.
"""Given a configuration with a deployed test contract, fuzz over the tests for the supplied signature.

Args:
conf: The template configuration.
subst: A substitution mapping such that 'Subst(subst).apply(conf)' gives the initial configuration with the
deployed contract.
binding: The contract binding that specifies the test name and parameters.
template_pattern: The template KORE configuration.
signature: The signature of the test to fuzz over.
max_examples: The maximum number of fuzzing test cases to generate and execute.

Raises:
Expand All @@ -177,22 +190,12 @@ def run_test(
def calldata_to_kore(data: bytes) -> Pattern:
return kast_to_kore(self.definition.kdefinition, bytesToken(data), BYTES)

k_steps = [
set_exit_code(1),
call_stylus(TEST_CALLER_ID, TEST_CONTRACT_ID, CALLDATA, 0),
check_foundry_success(),
set_exit_code(0),
]
subst['K_CELL'] = steps_of(k_steps)

template_config = Subst(subst).apply(conf)
template_config_kore = kast_to_kore(self.definition.kdefinition, template_config, GENERATED_TOP_CELL)
template_subst = {CALLDATA_EVAR: argument_strategy(binding).map(calldata_to_kore)}
template_subst = {CALLDATA_EVAR: signature.argument_strategy().map(calldata_to_kore)}

task.start()
fuzz(
self.definition.path,
template_config_kore,
template_pattern,
template_subst,
check_exit_code=True,
max_examples=max_examples,
Expand All @@ -202,21 +205,6 @@ def calldata_to_kore(data: bytes) -> Pattern:
)
task.end()

def select_tests(self, contract: ArbitrumContract, id: str | None) -> list[Method]:
test_methods = []
for m in contract.methods:
if m.is_test:
test_methods.append(m)

if id is None:
tests = test_methods
else:
tests = [b for b in test_methods if b.name == id]
if not tests:
raise KeyError(f'Test function {id!r} not found.')

return tests

def deploy_and_run(self, max_examples: int, id: str | None = None) -> list[FuzzError]:

test_contracts: list[ArbitrumContract]
Expand All @@ -236,7 +224,26 @@ def deploy_and_run(self, max_examples: int, id: str | None = None) -> list[FuzzE
def deploy_and_run_contract(
self, contract: ArbitrumContract, max_examples: int, id: str | None = None
) -> list[FuzzError]:
spec = self.create_spec(contract)
signatures = _filter_signatures(spec.signatures, id=id)

errors: list[FuzzError] = []
with FuzzProgress(signatures, max_examples) as progress:
for task in progress.fuzz_tasks:
try:
self.run_test(spec.template, task.signature, max_examples, task)
except FuzzError as e:
task.fail()
errors.append(e)

return errors

def create_spec(self, contract: ArbitrumContract) -> FuzzSpec:
template = self.create_template_pattern(contract)
signatures = tuple(Signature.from_method(method) for method in contract.methods if method.is_test)
return FuzzSpec(template=template, signatures=signatures)

def create_template_pattern(self, contract: ArbitrumContract) -> Pattern:
contract_kast: KInner
if isinstance(contract, StylusContract):
contract_kast = wasm2kast(BytesIO(contract.deployed_bytecode))
Expand All @@ -250,18 +257,17 @@ def deploy_and_run_contract(

init_config = self.deploy_test(contract_kast, setup is not None)
template_conf, init_subst = split_config_from(init_config)
k_steps = [
set_exit_code(1),
call_stylus(TEST_CALLER_ID, TEST_CONTRACT_ID, CALLDATA, 0),
check_foundry_success(),
set_exit_code(0),
]
init_subst['K_CELL'] = steps_of(k_steps)
template_conf = Subst(init_subst).apply(template_conf)
template_pattern = kast_to_kore(self.definition.kdefinition, template_conf, GENERATED_TOP_CELL)

tests = self.select_tests(contract, id)
errors: list[FuzzError] = []
with FuzzProgress(tests, max_examples) as progress:
for task in progress.fuzz_tasks:
try:
self.run_test(template_conf, init_subst, task.binding, max_examples, task)
except FuzzError as e:
task.fail()
errors.append(e)

return errors
return template_pattern


class KometFuzzHandler(KFuzzHandler):
Expand Down Expand Up @@ -294,8 +300,8 @@ def handle_failure(self, args: Mapping[EVar, Pattern]) -> None:
calldata_kast = self.definition.krun.kore_to_kast(args[CALLDATA_EVAR])
assert isinstance(calldata_kast, KToken)
calldata = pretty_bytes(calldata_kast)
decoded = decode(get_arg_types(self.task.binding), calldata[4:])
description = f'{self.task.binding.contract_name}.{self.task.binding.name}'
decoded = decode(self.task.signature.arg_types, calldata[4:])
description = self.task.signature.qualified_name
raise FuzzError(description, decoded)


Expand All @@ -313,3 +319,10 @@ def __init__(self, description: str, counterexample: tuple[KInner, ...]):


class InitializationError(SkribeError): ...


def _filter_signatures(signatures: Iterable[Signature], id: str | None) -> list[Signature]:
if id is None:
return list(signatures)

return [sig for sig in signatures if sig.name == id]
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading