Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions docs/source/train/options.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,205 @@
Options Reference
=================

Options let to customize how a TrainJob is created and executed. Pass them as a list to the '''options''' parameter of the
:py:meth:`kubeflow.trainer.TrainerClient.train` method.
.. code-block:: python

from kubeflow.trainer import TrainerClient, CustomTrainer
from kubeflow.trainer.options import Name, Labels, Annotations

trainer_client = TrainerClient()
job_id = trainer_client.train(
trainer=CustomTrainer(func=train_function),
options=[
Name("my-train-job"),
Labels({"team": "ml", "env": "prod"}),
Annotations({"note": "experiment-42"}),
],
)

.. note::
Not all options work with every backend. Each option documents
which backends it supports. An unsupported option will raise a
`ValueError` at runtime.

----

Usage Guide
-----------

Name
----

Set a custom name for the TrainJob resource. Works with all backends.

.. code-block:: python
from kubeflow.trainer import TrainerClient, CustomTrainer
from kubeflow.trainer.options import Name

trainer_client = TrainerClient()

job_id = trainer_client.train(
trainer=CustomTrainer(func=train_function),
options=[Name("my-custom-job")],
)

Labels
------

Add labels to the TrainJob resource metadata (``metadata.labels``). Only supported on the **Kubernetes backend**.

.. code-block:: python

from kubeflow.trainer import TrainerClient, CustomTrainer
from kubeflow.trainer.options import Labels

trainer_client = TrainerClient()

job_id = trainer_client.train(
trainer=CustomTrainer(func=train_function),
options=[Labels({"team": "ml-platform", "version": "v2"})],
)

Annotations
-----------

Add annotations to the TrainJob resource metadata(``metadata.annotations``). Only supported on the Kubernetes backend.

.. code-block:: python

from kubeflow.trainer import TrainerClient, CustomTrainer
from kubeflow.trainer.options import Annotations

trainer_client = TrainerClient()

job_id = trainer_client.train(
trainer=CustomTrainer(func=train_function),
options=[Annotations({"owner": "alice", "ticket": "JIRA-42"})],
)

TrainerCommand
--------------

Override the trainer container command (``spec.trainer.command``).
Can Only be used with ''CustomTrainerContainer'' not with ''CustomTrainer''' or ''BuiltinTrainer''.

.. code-block:: python

from kubeflow.trainer import TrainerClient, CustomTrainerContainer
from kubeflow.trainer.options import TrainerCommand

trainer_client = TrainerClient()

job_id = trainer_client.train(
trainer=CustomTrainerContainer(image="my-image:latest"),
options=[TrainerCommand(["python", "train.py", "--epochs", "10"])],
)

TrainerArgs
-----------

Append extra arguments to the trainer container command.

.. code-block:: python

from kubeflow.trainer import TrainerClient, CustomTrainer
from kubeflow.trainer.options import TrainerArgs

trainer_client = TrainerClient()

job_id = trainer_client.train(
trainer=CustomTrainer(func=train_function),
options=[TrainerArgs(["--lr", "0.001", "--batch-size", "32"])],
)

RuntimePatch
--------------

Apply structured patches to the TrainJob (``spec.runtimePatches``) Use this for advanced Kubernetes-level customisation such as adding init containers, volumes, or tolerations. Only supported on the ''Kubernetes backend''.

.. code-block:: python

from kubeflow.trainer import TrainerClient, CustomTrainer
from kubeflow.trainer.options import (
RuntimePatch,
TrainingRuntimeSpecPatch,
JobSetTemplatePatch,
JobSetSpecPatch,
ReplicatedJobPatch,
JobTemplatePatch,
JobSpecPatch,
PodTemplatePatch,
PodSpecPatch,
ContainerPatch,
)

trainer_client = TrainerClient()

patch = RuntimePatch(
training_runtime_spec=TrainingRuntimeSpecPatch(
template=JobSetTemplatePatch(
spec=JobSetSpecPatch(
replicated_jobs=[
ReplicatedJobPatch(
name="node",
template=JobTemplatePatch(
spec=JobSpecPatch(
template=PodTemplatePatch(
spec=PodSpecPatch(
containers=[
ContainerPatch(
name="trainer",
env=[{
"name": "MY_VAR",
"value": "hello",
}],
)
]
)
)
)
),
)
]
)
)
)
)

job_id = trainer_client.train(
trainer=CustomTrainer(func=train_function),
options=[patch],
)

----

Combining Multiple Options
--------------------------

You can pass multiple options together in a single list:

.. code-block:: python

from kubeflow.trainer import TrainerClient, CustomTrainer
from kubeflow.trainer.options import Name, Labels, Annotations

trainer_client = TrainerClient()

job_id = trainer_client.train(
trainer=CustomTrainer(func=train_function),
options=[
Name("experiment-001"),
Labels({"project": "llm-finetune"}),
Annotations({"author": "alice"}),
],
)

----

API Reference
-------------

.. autoclass:: kubeflow.trainer.options.Name
:members:
:show-inheritance:
Expand Down
158 changes: 158 additions & 0 deletions kubeflow/common/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import os
import functools
from contextlib import contextmanager
from typing import Optional

_tracer = None
_meter = None
_ENABLED = False


def configure(
service_name: str = "kubeflow-sdk",
exporter: str = "console",
endpoint: Optional[str] = None,
sample_rate: float = 1.0,
) -> None:
global _tracer, _meter, _ENABLED

if os.environ.get("KUBEFLOW_TRACING_DISABLED", "0") == "1":
return

if exporter == "none":
return

try:
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
SimpleSpanProcessor,
ConsoleSpanExporter,
)
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
from opentelemetry.sdk.resources import Resource

resource = Resource.create({"service.name": service_name})
sampler = TraceIdRatioBased(sample_rate)

tracer_provider = TracerProvider(resource=resource, sampler=sampler)

if exporter == "console":
tracer_provider.add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)
elif exporter == "otlp":
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)

otlp_endpoint = endpoint or os.environ.get(
"KUBEFLOW_OTLP_ENDPOINT", "http://localhost:4317"
)
tracer_provider.add_span_processor(
SimpleSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)

trace.set_tracer_provider(tracer_provider)
_tracer = trace.get_tracer("kubeflow.sdk", "0.1.0")

metric_reader = PeriodicExportingMetricReader(
ConsoleMetricExporter(), export_interval_millis=60000
)
meter_provider = MeterProvider(
resource=resource, metric_readers=[metric_reader]
)
metrics.set_meter_provider(meter_provider)
_meter = metrics.get_meter("kubeflow.sdk", "0.1.0")

_ENABLED = True

except ImportError:
pass

def get_tracer():
if _tracer is not None:
return _tracer
try:
from opentelemetry import trace
return trace.get_tracer("kubeflow.sdk")
except ImportError:
return _NoOpTracer()


def get_meter():
if _meter is not None:
return _meter
try:
from opentelemetry import metrics
return metrics.get_meter("kubeflow.sdk")
except ImportError:
return _NoOpMeter()


def is_enabled() -> bool:
return _ENABLED

class SpanNames:
TRAINER_TRAIN = "kubeflow.sdk.trainer.train"
TRAINER_GET_JOB = "kubeflow.sdk.trainer.get_job"
TRAINER_GET_LOGS = "kubeflow.sdk.trainer.get_job_logs"
TRAINER_WAIT = "kubeflow.sdk.trainer.wait_for_job_status"
TRAINER_CREATE_JOB = "kubeflow.sdk.trainer.create_trainjob"
TRAINER_POLL_STATUS = "kubeflow.sdk.trainer.poll_status"
PIPELINES_COMPILE = "kubeflow.sdk.pipelines.compile"
PIPELINES_SUBMIT = "kubeflow.sdk.pipelines.submit"
OPTIMIZER_OPTIMIZE = "kubeflow.sdk.optimizer.optimize"
SPARK_SUBMIT = "kubeflow.sdk.spark.submit"

class SpanAttributes:
JOB_NAME = "kubeflow.trainer.job_name"
JOB_ID = "kubeflow.trainer.job_id"
NAMESPACE = "kubeflow.trainer.namespace"
NUM_NODES = "kubeflow.trainer.num_nodes"
RUNTIME = "kubeflow.trainer.runtime"
STATUS = "kubeflow.trainer.status"
POLL_ITERATION = "kubeflow.trainer.poll.iteration"
PIPELINE_NAME = "kubeflow.pipelines.pipeline_name"
PIPELINE_PATH = "kubeflow.pipelines.pipeline_path"
RUN_ID = "kubeflow.pipelines.run_id"
EXPERIMENT_NAME = "kubeflow.pipelines.experiment_name"
GEN_AI_OPERATION = "gen_ai.operation.name"
GEN_AI_SYSTEM = "gen_ai.system"
GEN_AI_MODEL = "gen_ai.request.model"


class _NoOpSpan:

def set_attribute(self, key, value): pass
def add_event(self, name, attributes=None): pass
def record_exception(self, exception, attributes=None): pass
def set_status(self, status, description=None): pass
def __enter__(self): return self
def __exit__(self, *args): pass


class _NoOpTracer:

@contextmanager
def start_as_current_span(self, name, **kwargs):
yield _NoOpSpan()


class _NoOpMeter:

def create_counter(self, *a, **kw): return _NoOpInstrument()
def create_histogram(self, *a, **kw): return _NoOpInstrument()


class _NoOpInstrument:

def add(self, *a, **kw): pass
def record(self, *a, **kw): pass


Loading