Conversation
Summary of ChangesHello @abhishekg999, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request undertakes a significant architectural refactor of the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a major refactoring of the tracing capabilities, creating a new, more ergonomic Tracer object with better support for multi-project and multi-threaded/multi-tasking scenarios. The core of the change is a new ProxyTracerProvider that can dynamically switch between different tracer instances. The changes are extensive, adding many new modules for tracers, exporters, processors, and background task handling.
My review focuses on the new example files that demonstrate these features. I've found a couple of issues in the async example (src/foo.py) that could be misleading for users of the SDK: one related to an incorrect client configuration for a third-party API, and another related to an inefficient usage pattern of the new Tracer.create method. Addressing these would make the examples clearer and more robust.
src/foo.py
Outdated
| client = AsyncOpenAI( | ||
| api_key=os.getenv("ANTHROPIC_API_KEY"), | ||
| base_url="https://api.anthropic.com/v1", | ||
| ) |
There was a problem hiding this comment.
The configuration for the AsyncOpenAI client appears incorrect for calling the Anthropic API. The openai library sends requests in a format specific to OpenAI's API, which is not compatible with Anthropic's native API at https://api.anthropic.com/v1. This will likely lead to runtime errors.
To call the Anthropic API, you should use the anthropic Python SDK.
Additionally, the model name claude-opus-4-1 used on line 47 does not seem to be a valid Anthropic model name. The latest Opus model is claude-3-opus-20240229.
If you are using a proxy that makes Anthropic's API OpenAI-compatible, it would be helpful to clarify this with a comment and ensure the base_url points to that proxy.
src/foo.py
Outdated
| tags: list[str], | ||
| **kwargs, | ||
| ): | ||
| Tracer.create(project_name=name) |
There was a problem hiding this comment.
Calling Tracer.create() inside handle_request means a new Tracer instance (along with its own TracerProvider and other resources) is created for every single request. This is inefficient and not recommended for production code. Tracer instances are meant to be long-lived and created once per project.
A better approach would be to create the tracers for different projects once at application startup and then select the appropriate one to activate within the request handler. The commented-out code at the top of the file and the pattern in src/bar.py demonstrate this better pattern.
For example:
# At application startup
tracers = {
"fibonacci": Tracer.create(project_name="fibonacci", set_active=False),
"fizzbuzz": Tracer.create(project_name="fizzbuzz", set_active=False),
"chat": Tracer.create(project_name="chat", set_active=False),
"long_running_task": Tracer.create(project_name="long_running_task", set_active=False),
}
async def handle_request(name: str, ...):
tracers[name].set_active()
# ... rest of the handlerThis would be much more efficient and is likely the intended usage of the new multi-project support.
| @classmethod | ||
| def get_instance(cls) -> ProxyTracerProvider: | ||
| if cls._instance is None: | ||
| cls._instance = cls() | ||
| return cls._instance |
There was a problem hiding this comment.
🟡 JudgmentTracerProvider singleton is not thread-safe, unlike BackgroundQueue
JudgmentTracerProvider.get_instance() (src/judgeval/v1/trace/judgment_tracer_provider.py:109-112) uses a simple check-then-act pattern without any locking. In contrast, BackgroundQueue.get_instance() (src/judgeval/v1/background_queue.py:28-36) correctly uses double-checked locking with cls._lock. In a multi-threaded web server, two threads calling get_instance() simultaneously before initialization completes could create two separate JudgmentTracerProvider instances, with the first thread's registered tracers and context being silently discarded when the second thread overwrites _instance.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
@devin-ai-integration you want to create a lock on every get_instance call?
There was a problem hiding this comment.
Found a shutdown logic issue that prevents flushing queued jobs; changes suggested.
Status: Changes Suggested | Risk: Medium
Issues Identified & Suggestions
- Ensure shutdown flushes pending jobs before setting shutdown flag:
src/judgeval/v1/background_queue.py
Review Details
📁 182 files reviewed | 💬 1 comments
👍 / 👎 individual comments to help improve reviews for you
| return True | ||
|
|
||
| def shutdown(self, timeout_ms: int = 30000) -> None: | ||
| if self._shutdown: | ||
| return | ||
| self._shutdown = True |
There was a problem hiding this comment.
[Logic] shutdown() sets _shutdown = True before calling force_flush(), but force_flush() immediately returns False when _shutdown is set. That means pending jobs are never flushed during shutdown. Move the flag assignment after force_flush() (or allow flushing when shutting down).
def shutdown(self, timeout_ms: int = 30000) -> None:
if self._shutdown:
return
self.force_flush(timeout_ms)
self._shutdown = True
self._executor.shutdown(wait=False)Context for Agents
`shutdown()` sets `_shutdown = True` before calling `force_flush()`, but `force_flush()` immediately returns `False` when `_shutdown` is set. That means pending jobs are never flushed during shutdown. Move the flag assignment after `force_flush()` (or allow flushing when shutting down).
```python
def shutdown(self, timeout_ms: int = 30000) -> None:
if self._shutdown:
return
self.force_flush(timeout_ms)
self._shutdown = True
self._executor.shutdown(wait=False)
```
File: src/judgeval/v1/background_queue.py
Line: 72| return False | ||
| if not self._futures: | ||
| return True | ||
| _, not_done = wait(self._futures, timeout=timeout_ms / 1000.0) |
There was a problem hiding this comment.
🔴 BackgroundQueue _futures set mutated concurrently without synchronization
The _futures set is a plain set accessed from the calling thread in enqueue() (line 45: self._futures.add(future)) and force_flush() (line 61: wait(self._futures, ...)), and from thread-pool worker threads in _on_done() (line 51: self._futures.discard(future)). concurrent.futures.wait() iterates the set to build an internal copy via set(fs). If _on_done calls discard() on a worker thread during that iteration, it raises RuntimeError: Set changed size during iteration. This race is triggered when force_flush() or shutdown() is called while background jobs are completing.
Was this helpful? React with 👍 or 👎 to provide feedback.
adivate2021
left a comment
There was a problem hiding this comment.
some minor comments/clarifying questions
| self._project_id = project_id | ||
|
|
||
| def get(self, name: str) -> Optional[CustomScorer]: | ||
| def get(self, name: str) -> BaseJudge | None: |
There was a problem hiding this comment.
we are still keeping get? What can you do with the scorer once you get it, also why does this seem to return prompt scorer specific fields but its supposed to be general?
| class CustomScorerFactory: | ||
| class JudgesFactory: | ||
| __slots__ = ("_client", "_project_id") | ||
| _cache: Dict[Tuple[str, str, str, str], APIPromptScorer] = {} |
There was a problem hiding this comment.
this is a general judges factory but it has to do with prompt scorer?
src/judgeval/v1/trace/base_tracer.py
Outdated
| """Abstract base for all Judgment tracers. | ||
|
|
||
| Provides the core tracing surface: span creation, attribute recording, | ||
| the ``@observe`` and ``@agent`` decorators, context propagation for |
| """ | ||
|
|
||
| __slots__ = ( | ||
| "project_name", |
There was a problem hiding this comment.
why do we need project name, can we remove project name from the tracer?
There was a problem hiding this comment.
it is scoped to a project name tho?
There was a problem hiding this comment.
Like the underlying tracer instance is. the user only worries about the static Tracer, but internally Tracer.init does have a backing object
| api_key: Optional[str], | ||
| organization_id: Optional[str], | ||
| api_url: Optional[str], | ||
| environment: Optional[str], |
There was a problem hiding this comment.
is this to support when we add environments in the future?
| @@ -5,8 +5,8 @@ | |||
| from opentelemetry.context import Context, get_value | |||
There was a problem hiding this comment.
lets remove this like we did for typescript
There was a problem hiding this comment.
updating to use baggage
There was a problem hiding this comment.
Two important issues were noted around thread safety and potential memory growth that should be addressed.
Status: Changes Suggested | Risk: Medium
Issues Identified & Suggestions
- Guard shared futures set to avoid concurrent mutation errors:
src/judgeval/v1/background_queue.py - Clear accumulated async payloads to prevent memory leak over time:
src/judgeval/v1/trace/base_tracer.py
Review Details
📁 182 files reviewed | 💬 2 comments
👍 / 👎 individual comments to help improve reviews for you
| self._semaphore.release() | ||
| self._futures.discard(future) | ||
| exc = future.exception() | ||
| if exc is not None: |
There was a problem hiding this comment.
[Logic] ```
_, not_done = wait(self._futures, timeout=timeout_ms / 1000.0)
`_futures` is mutated by `_on_done` running in worker threads while `force_flush()` iterates it. This can raise `RuntimeError: Set changed size during iteration` or cause missed futures. Protect `_futures` with a lock or snapshot before waiting.
```python
self._futures_lock = threading.Lock()
# enqueue/_on_done: wrap add/discard with the lock
with self._futures_lock:
self._futures.add(future)
# force_flush
with self._futures_lock:
futures = set(self._futures)
_, not_done = wait(futures, timeout=timeout_ms / 1000.0)
Context for Agents
```
_, not_done = wait(self._futures, timeout=timeout_ms / 1000.0)
```
`_futures` is mutated by `_on_done` running in worker threads while `force_flush()` iterates it. This can raise `RuntimeError: Set changed size during iteration` or cause missed futures. Protect `_futures` with a lock or snapshot before waiting.
```python
self._futures_lock = threading.Lock()
# enqueue/_on_done: wrap add/discard with the lock
with self._futures_lock:
self._futures.add(future)
# force_flush
with self._futures_lock:
futures = set(self._futures)
_, not_done = wait(futures, timeout=timeout_ms / 1000.0)
```
File: src/judgeval/v1/background_queue.py
Line: 53
alanzhang25
left a comment
There was a problem hiding this comment.
- Do you have test coverage report? Might need to add some UTs for background queue
src/judgeval/v1/trace/base_tracer.py
Outdated
| # Static API: Async Evaluation # | ||
| # ------------------------------------------------------------------ # | ||
|
|
||
| _pending_evals: Dict[str, List[Dict[str, Any]]] = {} |
There was a problem hiding this comment.
why do we need to store this, i dont think the eval_name actually matters in this case and if it does, i think we can just store count instead of the actual payload
There was a problem hiding this comment.
removed, we do need the full array since its serialized in the span attribute, But i will use internal_attributes sicne that gets cleaned up
There was a problem hiding this comment.
to confirm, this is only used by tags right now right?
There was a problem hiding this comment.
tags and async_evaluate
| tracer.set_span_attribute( | ||
| agent_span, f"agent.{key}", value | ||
| ) | ||
| if result_metadata: |
There was a problem hiding this comment.
not sure why the indent here changed
src/judgeval/v1/trace/base_tracer.py
Outdated
| # Static API: Async Evaluation # | ||
| # ------------------------------------------------------------------ # | ||
|
|
||
| _pending_evals: Dict[str, List[Dict[str, Any]]] = {} |
There was a problem hiding this comment.
Is this _pending_evals unbounded memory
| def __init__(self): | ||
| self.resource_attributes = {} | ||
| pass |
There was a problem hiding this comment.
🔴 NoOpSpanProcessor does not override state_incr/state_append/state_get, will crash with AttributeError
NoOpSpanProcessor.__init__ does pass (line 15-16 of noop_span_processor.py), skipping the parent JudgmentSpanProcessor.__init__ which initializes self._state and self._lock. The NoOp overrides the old method names set_internal_attribute/get_internal_attribute, but the parent class was renamed to use state_set/state_get/state_incr/state_append. When async_evaluate in base_tracer.py:589 calls processor.state_incr() on a NoOpSpanProcessor, it hits the parent's state_incr which accesses self._lock — an attribute that doesn't exist because the parent __init__ was never called. This raises AttributeError. While the call is wrapped with @dont_throw so it's silently swallowed, the evaluation silently fails.
| def __init__(self): | |
| self.resource_attributes = {} | |
| pass | |
| def __init__(self): | |
| self._lock = __import__('threading').RLock() | |
| self._state: dict = {} | |
| self._span_finalizers: dict = {} |
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Changes suggested: fix semaphore release when executor submit fails.
Status: Changes Suggested | Risk: Medium
Issues Identified & Suggestions
- Release semaphore if submit raises to avoid capacity leak:
src/judgeval/v1/background_queue.py
Review Details
📁 188 files reviewed | 💬 1 comments
👍 / 👎 individual comments to help improve reviews for you
| if self._shutdown: | ||
| return False | ||
| if not self._semaphore.acquire(blocking=False): | ||
| judgeval_logger.warning("[BackgroundQueue] Queue full, dropping job") |
There was a problem hiding this comment.
[Logic] If ThreadPoolExecutor.submit() raises (e.g., executor already shut down), the semaphore was acquired but never released, permanently reducing queue capacity. Wrap submit in try/except and release the semaphore on failure.
if not self._semaphore.acquire(blocking=False):
...
try:
future = self._executor.submit(fn)
except Exception:
self._semaphore.release()
raiseContext for Agents
If `ThreadPoolExecutor.submit()` raises (e.g., executor already shut down), the semaphore was acquired but never released, permanently reducing queue capacity. Wrap submit in try/except and release the semaphore on failure.
```python
if not self._semaphore.acquire(blocking=False):
...
try:
future = self._executor.submit(fn)
except Exception:
self._semaphore.release()
raise
```
File: src/judgeval/v1/background_queue.py
Line: 42
adivate2021
left a comment
There was a problem hiding this comment.
lgtm other than the thing we talked abt with JudgesFactory
There was a problem hiding this comment.
One important logic issue found around missing score_type values that could break API validation.
Status: Changes Suggested | Risk: Medium
Issues Identified & Suggestions
- Populate or omit score_type to match server contract:
src/judgeval/v1/evaluation/hosted_evaluation.py
Review Details
📁 210 files reviewed | 💬 1 comments
👍 / 👎 individual comments to help improve reviews for you
| "judgment_scorers": [s.get_scorer_config() for s in scorers], | ||
| "judgment_scorers": [ | ||
| {"name": name, "score_type": "", "threshold": 0.5} for name in scorers | ||
| ], |
There was a problem hiding this comment.
[Logic] Hosted evaluations now send {"score_type": ""} for each scorer. If the API validates score_type as required (as it was previously), this will be rejected or interpreted incorrectly.
Fix: populate the real score_type for each scorer name (if known), or omit the field and update the server contract accordingly.
Context for Agents
Hosted evaluations now send `{"score_type": ""}` for each scorer. If the API validates `score_type` as required (as it was previously), this will be rejected or interpreted incorrectly.
Fix: populate the real `score_type` for each scorer name (if known), or omit the field and update the server contract accordingly.
File: src/judgeval/v1/evaluation/hosted_evaluation.py
Line: 32|
|
||
| from judgeval.v1.trace.baggage.propagator import JudgmentBaggagePropagator | ||
|
|
||
| _JUDGMENT_BAGGAGE_KEY = create_key("judgment.baggage") |
|
✔️ Propel has finished reviewing this change. |
There was a problem hiding this comment.
Review suggests addressing an important initialization issue in the no-op span processor.
Status: Changes Suggested | Risk: Medium
Issues Identified & Suggestions
- Initialize NoOpSpanProcessor state/lock or override inherited methods:
src/judgeval/v1/trace/processors/noop_span_processor.py
Review Details
📁 212 files reviewed | 💬 1 comments
👍 / 👎 individual comments to help improve reviews for you
|
|
||
| class NoOpJudgmentSpanProcessor(JudgmentSpanProcessor): | ||
| __slots__ = ("resource_attributes",) | ||
| class NoOpSpanProcessor(JudgmentSpanProcessor): |
There was a problem hiding this comment.
[Logic] NoOpSpanProcessor.init doesn’t initialize _state/_lock, but it inherits state_set/state_incr/state_append from JudgmentSpanProcessor. Any caller using those will hit AttributeError. Either call super().__init__(...) with a noop exporter or override those methods to no-op.
Context for Agents
NoOpSpanProcessor.__init__ doesn’t initialize `_state`/`_lock`, but it inherits `state_set/state_incr/state_append` from JudgmentSpanProcessor. Any caller using those will hit `AttributeError`. Either call `super().__init__(...)` with a noop exporter or override those methods to no-op.
File: src/judgeval/v1/trace/processors/noop_span_processor.py
Line: 12
This is a WIP Pr that will be a primary refactor separating Judgeval (a SDK around JudgmentLabs Rest API) from Judgment's observability product.
This will introduce a new
Tracertop level object that can be used to trace, as well as better multi project support and ergonomics.Example of new syntax: