diff --git a/tests/free_threading/test_real_world.py b/tests/free_threading/test_real_world.py new file mode 100644 index 000000000..9badf8fa9 --- /dev/null +++ b/tests/free_threading/test_real_world.py @@ -0,0 +1,116 @@ +import concurrent.futures +import random +import threading + +import yaml as yaml + +try: + from yaml import CDumper as Dumper +except ImportError: + from yaml import Dumper + +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader + + + +NUM_FILES = 200 +NUM_THREADS = 8 +ERROR_RATE = 0.1 + +barrier = threading.Barrier(NUM_THREADS) + + +def _generate_random_github_workflow(): + """Generate a random GitHub Actions workflow-like YAML structure.""" + workflow = { + "name": f"Test Workflow {random.randint(1, 1000)}", + "on": ["push", "pull_request"], + "jobs": { + "build": { + "runs-on": "ubuntu-latest", + "steps": [ + { + "name": "Checkout", + "uses": "actions/checkout@v3" + }, + { + "name": "Setup Python", + "uses": "actions/setup-python@v4", + "with": { + "python-version": random.choice(["3.8", "3.9", "3.10", "3.11"]) + } + }, + { + "name": "Run tests", + "run": "pytest" + } + ] + } + } + } + + # Add some randomness to the structure + if random.random() > 0.5: + workflow["jobs"]["deploy"] = { + "runs-on": "ubuntu-latest", + "needs": ["build"], + "steps": [ + { + "name": "Deploy step", + "run": "echo Deploying..." + } + ] + } + + return workflow + + +def _create_yaml_file(directory, index, valid=True): + barrier.wait() + + filepath = directory / f"workflow_{index}.yml" + workflow = _generate_random_github_workflow() + with open(filepath, 'w') as f: + if valid: + yaml.dump(workflow, f, Dumper=Dumper) + else: + # Introduce an error in the YAML structure + f.write("name: Invalid Workflow\non: [push\njobs:") + return filepath, workflow, valid + + +def _parse_yaml_file(filepath, original_data, expected_valid): + barrier.wait() + + try: + with open(filepath, 'r') as f: + data = yaml.load(f, Loader=Loader) + assert expected_valid and data == original_data + except yaml.YAMLError: + assert not expected_valid + + +def test_multithreaded_yaml_parsing(tmp_path): + # Create test files in threads + with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: + futures = [ + executor.submit(_create_yaml_file, tmp_path, i, random.random() > ERROR_RATE) + for i in range(NUM_FILES) + ] + + file_info = [] + for future in concurrent.futures.as_completed(futures): + filepath, workflow, valid = future.result() + file_info.append((filepath, workflow, valid)) + + # Parse files using thread pool + with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: + future_to_file = [ + executor.submit(_parse_yaml_file, filepath, workflow, valid) + for filepath, workflow, valid in file_info + ] + for future in concurrent.futures.as_completed(future_to_file): + future.result() # Asserts that this does not raise any assertion errors diff --git a/tests/free_threading/test_stress.py b/tests/free_threading/test_stress.py new file mode 100644 index 000000000..64d9093d8 --- /dev/null +++ b/tests/free_threading/test_stress.py @@ -0,0 +1,176 @@ +import concurrent +import random +import re +import threading + +import yaml as yaml + +try: + from yaml import CDumper as Dumper +except ImportError: + from yaml import Dumper + +try: + from yaml import CLoader as Loader +except ImportError: + from yaml import Loader + + +NUM_THREADS = 8 +NUM_ITERATIONS = 200 +barrier = threading.Barrier(NUM_THREADS) + + +class Dice(tuple): + def __new__(cls, a, b): + return tuple.__new__(cls, (a, b)) + + def __repr__(self): + return "Dice(%s,%s)" % self + + +def dice_constructor(loader, node): + value = loader.construct_scalar(node) + a, b = map(int, value.split('d')) + return Dice(a, b) + + +# Different YAML content types for testing +YAML_LOAD_SAMPLES = [ + # Simple key-value pairs + ("""\ +key1: value1 +key2: value2 +key3: 123 +key4: true +""", { + "key1": "value1", + "key2": "value2", + "key3": 123, + "key4": True + }), + + # Nested structures + ("""\ +config: + database: + host: localhost + port: 5432 + credentials: + username: admin + password: secret + logging: + level: INFO + file: /var/log/app.log +""", { + "config": { + "database": { + "host": "localhost", + "port": 5432, + "credentials": { + "username": "admin", + "password": "secret" + } + }, + "logging": { + "level": "INFO", + "file": "/var/log/app.log" + } + } + }), + + # Lists + ("""\ +fruits: + - apple + - banana + - orange +numbers: [1, 2, 3, 4, 5] +mixed: + - name: John + age: 30 + - name: Alice + age: 25 +""", { + "fruits": ["apple", "banana", "orange"], + "numbers": [1, 2, 3, 4, 5], + "mixed": [ + {"name": "John", "age": 30}, + {"name": "Alice", "age": 25} + ] + }), + + # Complex with references + ("""\ +defaults: &defaults + adapter: postgresql + host: localhost + +development: + database: myapp_development + <<: *defaults + +test: + database: myapp_test + <<: *defaults +""", { + "defaults": { + "adapter": "postgresql", + "host": "localhost" + }, + "development": { + "database": "myapp_development", + "adapter": "postgresql", + "host": "localhost" + }, + "test": { + "database": "myapp_test", + "adapter": "postgresql", + "host": "localhost" + } + }), + + # Dice with resolver + ("""\ +rolls_resolver: + - 1d6 + - 2d4 + - 3d1 +""", { + "rolls_resolver": [ + Dice(1, 6), + Dice(2, 4), + Dice(3, 1) + ] + }), +] + + +class MyLoader(Loader): + pass + + +class MyDumper(Dumper): + pass + + +def yaml_load_stress_thread(): + for _ in range(NUM_ITERATIONS): + barrier.wait() + yamlcode, result = random.choice(YAML_LOAD_SAMPLES) + thread_id = threading.current_thread().name + randint = random.randint(1, 1000) + yamlcode += f"\nrandom_value_{thread_id}: {randint}" + obj = yaml.load(yamlcode, Loader=MyLoader) + assert obj == {**result, f"random_value_{thread_id}": randint} + + +def test_yaml_load_stress(): + yaml.add_constructor("!dice", dice_constructor, Loader=MyLoader) + yaml.add_implicit_resolver('!dice', re.compile(r'^\d+d\d+$'), + Loader=MyLoader, Dumper=MyDumper) + + with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: + futures = [executor.submit(yaml_load_stress_thread) for _ in range(NUM_THREADS)] + for future in concurrent.futures.as_completed(futures): + future.result()