Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions tests/free_threading/test_real_world.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import concurrent.futures
import random
import threading

import yaml as yaml

try:
from yaml import CDumper as Dumper
except ImportError:
from yaml import Dumper

try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader



NUM_FILES = 200
NUM_THREADS = 8
ERROR_RATE = 0.1

barrier = threading.Barrier(NUM_THREADS)


def _generate_random_github_workflow():
"""Generate a random GitHub Actions workflow-like YAML structure."""
workflow = {
"name": f"Test Workflow {random.randint(1, 1000)}",
"on": ["push", "pull_request"],
"jobs": {
"build": {
"runs-on": "ubuntu-latest",
"steps": [
{
"name": "Checkout",
"uses": "actions/checkout@v3"
},
{
"name": "Setup Python",
"uses": "actions/setup-python@v4",
"with": {
"python-version": random.choice(["3.8", "3.9", "3.10", "3.11"])
}
},
{
"name": "Run tests",
"run": "pytest"
}
]
}
}
}

# Add some randomness to the structure
if random.random() > 0.5:
workflow["jobs"]["deploy"] = {
"runs-on": "ubuntu-latest",
"needs": ["build"],
"steps": [
{
"name": "Deploy step",
"run": "echo Deploying..."
}
]
}

return workflow


def _create_yaml_file(directory, index, valid=True):
barrier.wait()

filepath = directory / f"workflow_{index}.yml"
workflow = _generate_random_github_workflow()
with open(filepath, 'w') as f:
if valid:
yaml.dump(workflow, f, Dumper=Dumper)
else:
# Introduce an error in the YAML structure
f.write("name: Invalid Workflow\non: [push\njobs:")
return filepath, workflow, valid


def _parse_yaml_file(filepath, original_data, expected_valid):
barrier.wait()

try:
with open(filepath, 'r') as f:
data = yaml.load(f, Loader=Loader)
assert expected_valid and data == original_data
except yaml.YAMLError:
assert not expected_valid


def test_multithreaded_yaml_parsing(tmp_path):
# Create test files in threads
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
futures = [
executor.submit(_create_yaml_file, tmp_path, i, random.random() > ERROR_RATE)
for i in range(NUM_FILES)
]

file_info = []
for future in concurrent.futures.as_completed(futures):
filepath, workflow, valid = future.result()
file_info.append((filepath, workflow, valid))

# Parse files using thread pool
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
future_to_file = [
executor.submit(_parse_yaml_file, filepath, workflow, valid)
for filepath, workflow, valid in file_info
]
for future in concurrent.futures.as_completed(future_to_file):
future.result() # Asserts that this does not raise any assertion errors
176 changes: 176 additions & 0 deletions tests/free_threading/test_stress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import concurrent
import random
import re
import threading

import yaml as yaml

try:
from yaml import CDumper as Dumper
except ImportError:
from yaml import Dumper

try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader


NUM_THREADS = 8
NUM_ITERATIONS = 200
barrier = threading.Barrier(NUM_THREADS)


class Dice(tuple):
def __new__(cls, a, b):
return tuple.__new__(cls, (a, b))

def __repr__(self):
return "Dice(%s,%s)" % self


def dice_constructor(loader, node):
value = loader.construct_scalar(node)
a, b = map(int, value.split('d'))
return Dice(a, b)


# Different YAML content types for testing
YAML_LOAD_SAMPLES = [
# Simple key-value pairs
("""\
key1: value1
key2: value2
key3: 123
key4: true
""", {
"key1": "value1",
"key2": "value2",
"key3": 123,
"key4": True
}),

# Nested structures
("""\
config:
database:
host: localhost
port: 5432
credentials:
username: admin
password: secret
logging:
level: INFO
file: /var/log/app.log
""", {
"config": {
"database": {
"host": "localhost",
"port": 5432,
"credentials": {
"username": "admin",
"password": "secret"
}
},
"logging": {
"level": "INFO",
"file": "/var/log/app.log"
}
}
}),

# Lists
("""\
fruits:
- apple
- banana
- orange
numbers: [1, 2, 3, 4, 5]
mixed:
- name: John
age: 30
- name: Alice
age: 25
""", {
"fruits": ["apple", "banana", "orange"],
"numbers": [1, 2, 3, 4, 5],
"mixed": [
{"name": "John", "age": 30},
{"name": "Alice", "age": 25}
]
}),

# Complex with references
("""\
defaults: &defaults
adapter: postgresql
host: localhost

development:
database: myapp_development
<<: *defaults

test:
database: myapp_test
<<: *defaults
""", {
"defaults": {
"adapter": "postgresql",
"host": "localhost"
},
"development": {
"database": "myapp_development",
"adapter": "postgresql",
"host": "localhost"
},
"test": {
"database": "myapp_test",
"adapter": "postgresql",
"host": "localhost"
}
}),

# Dice with resolver
("""\
rolls_resolver:
- 1d6
- 2d4
- 3d1
""", {
"rolls_resolver": [
Dice(1, 6),
Dice(2, 4),
Dice(3, 1)
]
}),
]


class MyLoader(Loader):
pass


class MyDumper(Dumper):
pass


def yaml_load_stress_thread():
for _ in range(NUM_ITERATIONS):
barrier.wait()
yamlcode, result = random.choice(YAML_LOAD_SAMPLES)
thread_id = threading.current_thread().name
randint = random.randint(1, 1000)
yamlcode += f"\nrandom_value_{thread_id}: {randint}"
obj = yaml.load(yamlcode, Loader=MyLoader)
assert obj == {**result, f"random_value_{thread_id}": randint}


def test_yaml_load_stress():
yaml.add_constructor("!dice", dice_constructor, Loader=MyLoader)
yaml.add_implicit_resolver('!dice', re.compile(r'^\d+d\d+$'),
Loader=MyLoader, Dumper=MyDumper)

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
futures = [executor.submit(yaml_load_stress_thread) for _ in range(NUM_THREADS)]
for future in concurrent.futures.as_completed(futures):
future.result()