1+ """Tests perf trampoline in a multi-threaded environment to verify thread safety in free-threaded builds."""
2+
3+ import gc
4+ import sys
5+ import threading
6+ import unittest
7+ from test .support import threading_helper
8+
9+ NTHREADS = 10
10+ ITERATIONS_PER_THREAD = 50
11+
12+
13+ @threading_helper .requires_working_threading ()
14+ class TestPerfTrampolineThreadSafety (unittest .TestCase ):
15+
16+ def test_concurrent_code_compilation (self ):
17+ """Stress test simultaneous allocation of code arenas and tracking hooks."""
18+ if not hasattr (sys , "_perf_trampoline_init" ) or not hasattr (sys , "_perf_trampoline_fini" ):
19+ self .skipTest ("perf trampoline APIs are not supported on this platform" )
20+
21+ try :
22+ sys ._perf_trampoline_init (1 )
23+ except ValueError as exc :
24+ self .skipTest (f"perf trampoline activation failed: { exc } " )
25+ self .addCleanup (sys ._perf_trampoline_fini )
26+
27+ barrier = threading .Barrier (NTHREADS )
28+
29+ def worker ():
30+ barrier .wait ()
31+ for i in range (ITERATIONS_PER_THREAD ):
32+ ns = {}
33+ tid = threading .get_ident ()
34+ exec (
35+ f"def func_{ tid } _{ i } (): return { i } \n "
36+ f"result = func_{ tid } _{ i } ()" ,
37+ ns ,
38+ )
39+ self .assertEqual (ns ["result" ], i )
40+ del ns # Immediate drop to prevent delaying cleanup, no internal gc.collect()
41+
42+ threading_helper .run_concurrently (
43+ nthreads = NTHREADS , worker_func = worker
44+ )
45+ # Single final sweep to ensure code objects and extra tracking clear cleanly
46+ gc .collect ()
47+
48+ def test_concurrent_shared_code_execution (self ):
49+ """Verify multiple threads safely handle entering evaluation loops for the same code object."""
50+ if not hasattr (sys , "_perf_trampoline_init" ) or not hasattr (sys , "_perf_trampoline_fini" ):
51+ self .skipTest ("perf trampoline APIs are not supported on this platform" )
52+
53+ try :
54+ sys ._perf_trampoline_init (1 )
55+ except ValueError as exc :
56+ self .skipTest (f"perf trampoline activation failed: { exc } " )
57+ self .addCleanup (sys ._perf_trampoline_fini )
58+
59+ barrier = threading .Barrier (NTHREADS )
60+
61+ # Pre-compile a single function instance to share across all threads
62+ shared_ns = {}
63+ exec ("def shared_func(): return 42" , shared_ns )
64+ shared_func = shared_ns ["shared_func" ]
65+
66+ def worker ():
67+ barrier .wait ()
68+ for _ in range (ITERATIONS_PER_THREAD ):
69+ self .assertEqual (shared_func (), 42 )
70+
71+ threading_helper .run_concurrently (
72+ nthreads = NTHREADS , worker_func = worker
73+ )
74+
75+ del shared_func
76+ del shared_ns
77+ # Single final sweep to clean up shared resources safely
78+ gc .collect ()
79+
80+
81+ if __name__ == "__main__" :
82+ unittest .main ()
0 commit comments