POC: add telemetry to localprocess backend.#444
POC: add telemetry to localprocess backend.#444sujalshah-bit wants to merge 1 commit intokubeflow:mainfrom
Conversation
Signed-off-by: Sujal Shah <sujalshah28092004@gmail.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
🎉 Welcome to the Kubeflow SDK! 🎉 Thanks for opening your first PR! We're happy to have you as part of our community 🚀 Here's what happens next:
Join the community:
Feel free to ask questions in the comments if you need any help or clarification! |
There was a problem hiding this comment.
Pull request overview
This PR adds OpenTelemetry telemetry instrumentation to the Kubeflow Trainer SDK's local process backend. The changes include adding OpenTelemetry dependencies, instrumenting key functions with spans, and propagating tracer providers through the call stack while handling thread boundaries correctly. A new example file demonstrates how to configure and use the telemetry system with different backends (Jaeger, OTel Collector, console).
Changes:
- Added OpenTelemetry dependencies (api, sdk, exporter-otlp-proto-grpc) to pyproject.toml and uv.lock
- Instrumented LocalProcessBackend and its utility functions with distributed tracing spans
- Added TrainerClient-level spans and proper exception handling
- Implemented span context propagation across thread boundaries in LocalJob
- Added test_client.py demonstrating how to build and use a TracerProvider
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pyproject.toml | Added opentelemetry-api, opentelemetry-sdk, and opentelemetry-exporter-otlp-proto-grpc to dev dependencies |
| uv.lock | Updated lock file with new OpenTelemetry package versions and transitive dependencies |
| kubeflow/trainer/backends/localprocess/utils.py | Added span instrumentation to get_dependencies_command, get_command_using_train_func, and get_local_train_job_script with tracer_provider parameter passing |
| kubeflow/trainer/backends/localprocess/job.py | Added parent span context and tracer_provider parameters; implemented LocalJob.run() with telemetry spans and proper context propagation across thread boundary |
| kubeflow/trainer/backends/localprocess/backend.py | Added tracer_provider parameter to LocalProcessBackend; wrapped all public methods with instrumentation spans |
| kubeflow/trainer/api/trainer_client.py | Added tracer_provider parameter to TrainerClient; wrapped all public methods with telemetry spans |
| kubeflow/test_client.py | New example demonstrating telemetry configuration with build_tracer_provider() and MNIST training example |
| self.backend = KubernetesBackend(backend_config, tracer_provider=_provider) | ||
| elif isinstance(backend_config, LocalProcessBackendConfig): | ||
| self.backend = LocalProcessBackend(backend_config) | ||
| self.backend = LocalProcessBackend(backend_config, tracer_provider=_provider) | ||
| elif isinstance(backend_config, ContainerBackendConfig): | ||
| self.backend = ContainerBackend(backend_config) | ||
| self.backend = ContainerBackend(backend_config, tracer_provider=_provider) |
There was a problem hiding this comment.
KubernetesBackend and ContainerBackend do not have a tracer_provider parameter, but the code attempts to pass it. This will raise a TypeError when TrainerClient is initialized with these backends. The other backends need to be updated to accept this parameter.
| span.set_status(StatusCode.ERROR, "Job cancelled") | ||
| span.set_attribute("job.cancelled", True) | ||
| return |
There was a problem hiding this comment.
The context.detach() at line 183 will not be called when an early return occurs at line 141 inside the try block. This leaves the context stack corrupted for future use on that OS thread. The context.attach/detach pattern should be outside the inner try-except-finally block to ensure detach always executes.
| result.append( | ||
| types.TrainJob( | ||
| name=_job.name, | ||
| creation_timestamp=_job.created, | ||
| runtime=runtime, | ||
| num_nodes=1, | ||
| steps=[ | ||
| types.Step(name=s.step_name, pod_name=s.step_name, status=s.job.status) | ||
| for s in _job.steps | ||
| ], | ||
| ) | ||
| ) |
There was a problem hiding this comment.
In list_jobs, the TrainJob constructor is passed runtime=runtime (the filter parameter), but it should be runtime=_job.runtime (the actual job's runtime) to be consistent with get_job and to correctly return the runtime for each job.
What this PR does / why we need it:
Which issue(s) this PR fixes (optional, in
Fixes #<issue number>, #<issue number>, ...format, will close the issue(s) when PR gets merged):Fixes #
Checklist: