diff --git a/__tests__/workload.grpc.mapper.test.ts b/__tests__/workload.grpc.mapper.test.ts index f764315..7a3b924 100644 --- a/__tests__/workload.grpc.mapper.test.ts +++ b/__tests__/workload.grpc.mapper.test.ts @@ -55,4 +55,16 @@ describe('workload gRPC mapping', () => { expect(rebuilt.binds).toEqual(['data:/data:ro,z']); }); + + it('applies label.* additionalProperties to labels', () => { + const request = containerOptsToStartWorkloadRequest({ image: 'alpine:3' }); + request.additionalProperties = { + 'label.workload_key': 'workload-1', + 'label.team': 'core', + }; + + const rebuilt = startWorkloadRequestToContainerOpts(request); + + expect(rebuilt.labels).toEqual({ workload_key: 'workload-1', team: 'core' }); + }); }); diff --git a/package.json b/package.json index aa96d10..c93d5b7 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "lint": "eslint .", "test": "vitest run", "test:e2e": "vitest run --config vitest.config.e2e.ts", - "proto:generate": "buf generate buf.build/agynio/api" + "proto:generate": "rm -rf src/proto/gen && buf generate buf.build/agynio/api --path agynio/api/runner/v1/runner.proto" }, "dependencies": { "@nestjs/common": "^11.1.7", diff --git a/src/contracts/workload.grpc.ts b/src/contracts/workload.grpc.ts index 0d721de..9f56339 100644 --- a/src/contracts/workload.grpc.ts +++ b/src/contracts/workload.grpc.ts @@ -23,6 +23,7 @@ const PROP_CREATE_EXTRAS_JSON = 'create_extras_json'; const PROP_BIND_OPTIONS = 'bind_options'; const PROP_TTL_SECONDS = 'ttl_seconds'; const PROP_PLATFORM = 'platform'; +const LABEL_PREFIX = 'label.'; const BOOL_TRUE = new Set(['1', 'true', 'yes', 'on']); @@ -104,6 +105,44 @@ const ensureVolumeSpecName = (prefix: string, index: number): string => `${prefi const cloneAdditionalProperties = (input?: Record): Record => ({ ...(input ?? {}) }); +const sanitizeLabels = (input?: Record): Record => { + if (!input) return {}; + const labels: Record = {}; + for (const [key, value] of Object.entries(input)) { + const trimmed = key.trim(); + if (!trimmed) continue; + labels[trimmed] = value; + } + return labels; +}; + +const extractLabelProperties = (input?: Record): Record => { + if (!input) return {}; + const labels: Record = {}; + for (const [key, value] of Object.entries(input)) { + if (!key.startsWith(LABEL_PREFIX)) continue; + const labelKey = key.slice(LABEL_PREFIX.length).trim(); + if (!labelKey) continue; + labels[labelKey] = value; + } + return labels; +}; + +const mergeLabels = (...sources: Array | undefined>): Record => { + const labels: Record = {}; + for (const source of sources) { + if (!source) continue; + for (const [key, value] of Object.entries(source)) { + if (!key) continue; + labels[key] = value; + } + } + return labels; +}; + +export const extractRequestLabels = (request: StartWorkloadRequest): Record => + mergeLabels(extractLabelProperties(request.additionalProperties), sanitizeLabels(request.labels)); + export const workloadContainerPropKeys = { autoRemove: PROP_AUTO_REMOVE, networkMode: PROP_NETWORK_MODE, @@ -264,6 +303,9 @@ export const startWorkloadRequestToContainerOpts = (request: StartWorkloadReques const opts: ContainerOpts = {}; + const requestLabels = extractRequestLabels(request); + if (Object.keys(requestLabels).length > 0) opts.labels = requestLabels; + if (isNonEmptyString(main.image)) opts.image = main.image; if (isNonEmptyString(main.name)) opts.name = main.name; if (Array.isArray(main.cmd) && main.cmd.length > 0) opts.cmd = [...main.cmd]; @@ -291,7 +333,10 @@ export const startWorkloadRequestToContainerOpts = (request: StartWorkloadReques if (isNonEmptyString(containerProps[PROP_LABELS_JSON])) { try { const parsed = JSON.parse(containerProps[PROP_LABELS_JSON]) as Record; - if (parsed && typeof parsed === 'object') opts.labels = parsed; + if (parsed && typeof parsed === 'object') { + const mergedLabels = mergeLabels(requestLabels, parsed); + if (Object.keys(mergedLabels).length > 0) opts.labels = mergedLabels; + } } catch { // ignore malformed labels payloads } diff --git a/src/lib/container.service.ts b/src/lib/container.service.ts index c043f21..225b922 100644 --- a/src/lib/container.service.ts +++ b/src/lib/container.service.ts @@ -17,6 +17,8 @@ import { LogsStreamSession, Platform, PLATFORM_LABEL, + ContainerListEntry, + VolumeListEntry, type ContainerInspectInfo, } from './types.ts'; @@ -1010,12 +1012,59 @@ export class ContainerService implements DockerClientPort { return sorted.map((c) => new ContainerHandle(this, c.Id)); } + async listContainersByLabelKey(labelKey: string, options?: { all?: boolean }): Promise { + if (!labelKey) return []; + this.log(`Listing containers by label key all=${options?.all ?? true} label=${labelKey}`); + const list: Docker.ContainerInfo[] = await this.docker.listContainers({ + all: options?.all ?? true, + filters: { label: [labelKey] }, + }); + const sorted = [...list].sort((a: Docker.ContainerInfo, b: Docker.ContainerInfo) => { + const ac = typeof a.Created === 'number' ? a.Created : 0; + const bc = typeof b.Created === 'number' ? b.Created : 0; + if (ac !== bc) return ac - bc; + const aid = String(a.Id ?? ''); + const bid = String(b.Id ?? ''); + return aid.localeCompare(bid); + }); + return sorted + .filter((container) => typeof container.Id === 'string' && container.Id.length > 0) + .map((container) => ({ id: container.Id, labels: container.Labels ?? {} })); + } + async listContainersByVolume(volumeName: string): Promise { if (!volumeName) return []; const result = await this.docker.listContainers({ all: true, filters: { volume: [volumeName] } }); return Array.isArray(result) ? result.map((it) => it.Id) : []; } + async listVolumesByLabelKey(labelKey: string): Promise { + if (!labelKey) return []; + this.log(`Listing volumes by label key label=${labelKey}`); + const result = await this.docker.listVolumes({ filters: { label: [labelKey] } }); + const volumes = Array.isArray(result?.Volumes) ? result.Volumes : []; + const sorted = [...volumes].sort((a, b) => String(a.Name ?? '').localeCompare(String(b.Name ?? ''))); + return sorted + .filter((volume) => typeof volume.Name === 'string' && volume.Name.length > 0) + .map((volume) => ({ name: volume.Name, labels: volume.Labels ?? {} })); + } + + async ensureVolume(volumeName: string, labels?: Record): Promise { + const name = volumeName.trim(); + if (!name) return; + this.log(`Ensuring volume name=${name}`); + try { + await this.docker.createVolume({ Name: name, Labels: labels ?? {} }); + } catch (e: unknown) { + const sc = typeof e === 'object' && e && 'statusCode' in e ? (e as { statusCode?: number }).statusCode : undefined; + if (sc === 409) { + this.debug(`Volume already exists name=${name}`); + return; + } + throw e; + } + } + async removeVolume(volumeName: string, options?: { force?: boolean }): Promise { const force = options?.force ?? false; this.log(`Removing volume name=${volumeName} force=${force}`); diff --git a/src/lib/dockerClient.port.ts b/src/lib/dockerClient.port.ts index be05dd6..67b7d36 100644 --- a/src/lib/dockerClient.port.ts +++ b/src/lib/dockerClient.port.ts @@ -9,6 +9,8 @@ import type { LogsStreamOptions, LogsStreamSession, Platform, + ContainerListEntry, + VolumeListEntry, } from './types.ts'; export type DockerEventFilters = Record>; @@ -33,6 +35,9 @@ export interface DockerClientPort { getContainerLabels(containerId: string): Promise | undefined>; getContainerNetworks(containerId: string): Promise; findContainersByLabels(labels: Record, options?: { all?: boolean }): Promise; + listContainersByLabelKey(labelKey: string, options?: { all?: boolean }): Promise; + listVolumesByLabelKey(labelKey: string): Promise; + ensureVolume(volumeName: string, labels?: Record): Promise; listContainersByVolume(volumeName: string): Promise; removeVolume(volumeName: string, options?: { force?: boolean }): Promise; findContainerByLabels(labels: Record, options?: { all?: boolean }): Promise; diff --git a/src/lib/types.ts b/src/lib/types.ts index e212289..c908559 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -115,3 +115,13 @@ export type ContainerInspectInfo = { State?: ContainerInspectState; NetworkSettings?: ContainerInspectNetworkSettings; }; + +export type ContainerListEntry = { + id: string; + labels: Record; +}; + +export type VolumeListEntry = { + name: string; + labels: Record; +}; diff --git a/src/proto/grpc.ts b/src/proto/grpc.ts index ca2e422..c2d82ad 100644 --- a/src/proto/grpc.ts +++ b/src/proto/grpc.ts @@ -13,8 +13,12 @@ import { GetWorkloadLabelsResponseSchema, InspectWorkloadRequestSchema, InspectWorkloadResponseSchema, + ListWorkloadsRequestSchema, + ListWorkloadsResponseSchema, ListWorkloadsByVolumeRequestSchema, ListWorkloadsByVolumeResponseSchema, + ListVolumesRequestSchema, + ListVolumesResponseSchema, PutArchiveRequestSchema, PutArchiveResponseSchema, ReadyRequestSchema, @@ -87,7 +91,9 @@ export const RUNNER_SERVICE_REMOVE_WORKLOAD_PATH = '/agynio.api.runner.v1.Runner export const RUNNER_SERVICE_INSPECT_WORKLOAD_PATH = '/agynio.api.runner.v1.RunnerService/InspectWorkload'; export const RUNNER_SERVICE_GET_WORKLOAD_LABELS_PATH = '/agynio.api.runner.v1.RunnerService/GetWorkloadLabels'; export const RUNNER_SERVICE_FIND_WORKLOADS_BY_LABELS_PATH = '/agynio.api.runner.v1.RunnerService/FindWorkloadsByLabels'; +export const RUNNER_SERVICE_LIST_WORKLOADS_PATH = '/agynio.api.runner.v1.RunnerService/ListWorkloads'; export const RUNNER_SERVICE_LIST_WORKLOADS_BY_VOLUME_PATH = '/agynio.api.runner.v1.RunnerService/ListWorkloadsByVolume'; +export const RUNNER_SERVICE_LIST_VOLUMES_PATH = '/agynio.api.runner.v1.RunnerService/ListVolumes'; export const RUNNER_SERVICE_REMOVE_VOLUME_PATH = '/agynio.api.runner.v1.RunnerService/RemoveVolume'; export const RUNNER_SERVICE_TOUCH_WORKLOAD_PATH = '/agynio.api.runner.v1.RunnerService/TouchWorkload'; export const RUNNER_SERVICE_PUT_ARCHIVE_PATH = '/agynio.api.runner.v1.RunnerService/PutArchive'; @@ -132,11 +138,21 @@ export const runnerServiceGrpcDefinition: ServiceDefinition = { FindWorkloadsByLabelsRequestSchema, FindWorkloadsByLabelsResponseSchema, ), + listWorkloads: unaryDefinition( + RUNNER_SERVICE_LIST_WORKLOADS_PATH, + ListWorkloadsRequestSchema, + ListWorkloadsResponseSchema, + ), listWorkloadsByVolume: unaryDefinition( RUNNER_SERVICE_LIST_WORKLOADS_BY_VOLUME_PATH, ListWorkloadsByVolumeRequestSchema, ListWorkloadsByVolumeResponseSchema, ), + listVolumes: unaryDefinition( + RUNNER_SERVICE_LIST_VOLUMES_PATH, + ListVolumesRequestSchema, + ListVolumesResponseSchema, + ), removeVolume: unaryDefinition( RUNNER_SERVICE_REMOVE_VOLUME_PATH, RemoveVolumeRequestSchema, diff --git a/src/service/grpc/server.ts b/src/service/grpc/server.ts index 53d5b23..135219a 100644 --- a/src/service/grpc/server.ts +++ b/src/service/grpc/server.ts @@ -28,9 +28,15 @@ import { InspectWorkloadRequest, InspectWorkloadResponse, InspectWorkloadResponseSchema, + ListWorkloadsRequest, + ListWorkloadsResponse, + ListWorkloadsResponseSchema, ListWorkloadsByVolumeRequest, ListWorkloadsByVolumeResponse, ListWorkloadsByVolumeResponseSchema, + ListVolumesRequest, + ListVolumesResponse, + ListVolumesResponseSchema, LogChunkSchema, LogEndSchema, PutArchiveRequest, @@ -66,7 +72,13 @@ import { TouchWorkloadResponseSchema, SidecarInstance, SidecarInstanceSchema, + VolumeKind, + VolumeListItem, + VolumeListItemSchema, + VolumeSpec, WorkloadContainersSchema, + WorkloadListItem, + WorkloadListItemSchema, WorkloadStatus, } from '../../proto/gen/agynio/api/runner/v1/runner_pb.js'; import { runnerServiceGrpcDefinition } from '../../proto/grpc.js'; @@ -76,7 +88,7 @@ import type { ContainerService, InteractiveExecSession, LogsStreamSession } from import type { ContainerHandle } from '../../lib/container.handle.ts'; import type { RunnerConfig } from '../config.ts'; import { createDockerEventsParser } from '../dockerEvents.parser.ts'; -import { startWorkloadRequestToContainerOpts } from '../../contracts/workload.grpc.ts'; +import { extractRequestLabels, startWorkloadRequestToContainerOpts } from '../../contracts/workload.grpc.ts'; type ExecStream = ServerDuplexStream; @@ -130,6 +142,8 @@ const CONTAINER_STOP_TIMEOUT_SEC = 10; const SIDECAR_ROLE_LABEL = 'hautech.ai/role'; const SIDECAR_ROLE_VALUE = 'sidecar'; const PARENT_CONTAINER_LABEL = 'hautech.ai/parent_cid'; +const WORKLOAD_KEY_LABEL = 'workload_key'; +const VOLUME_KEY_LABEL = 'volume_key'; async function findSidecarHandles(containers: ContainerService, workloadId: string): Promise { try { @@ -171,6 +185,34 @@ async function removeSidecars( } } +const resolveVolumeName = (spec: VolumeSpec): string | null => { + const persistentName = spec.persistentName?.trim() ?? ''; + const fallbackName = spec.name?.trim() ?? ''; + const resolved = persistentName || fallbackName; + if (!resolved) return null; + if (resolved.includes('/')) return null; + return resolved; +}; + +async function ensureWorkloadVolumes( + containers: ContainerService, + request: StartWorkloadRequest, + baseLabels: Record, +): Promise { + const volumeSpecs = request.volumes ?? []; + if (volumeSpecs.length === 0) return; + const ensured = new Set(); + for (const spec of volumeSpecs) { + if (!spec || spec.kind !== VolumeKind.NAMED) continue; + const volumeName = resolveVolumeName(spec); + if (!volumeName || ensured.has(volumeName)) continue; + const volumeLabels = { ...baseLabels, ...(spec.labels ?? {}) }; + if (Object.keys(volumeLabels).length === 0) continue; + ensured.add(volumeName); + await containers.ensureVolume(volumeName, volumeLabels); + } +} + type DockerErrorDetails = { statusCode?: number; status?: number; @@ -366,6 +408,8 @@ export function createRunnerGrpcServer(opts: RunnerGrpcOptions): Server { return callback(toServiceError(status.INVALID_ARGUMENT, 'main_container_required')); } try { + const requestLabels = extractRequestLabels(call.request); + await ensureWorkloadVolumes(opts.containers, call.request, requestLabels); const containerOpts = startWorkloadRequestToContainerOpts(call.request); const sidecarOpts = Array.isArray(containerOpts.sidecars) ? containerOpts.sidecars : []; const stopAndRemove = async (containerId: string) => { @@ -562,6 +606,29 @@ export function createRunnerGrpcServer(opts: RunnerGrpcOptions): Server { callback(toDockerServiceError(error, status.NOT_FOUND)); } }, + listWorkloads: async ( + call: ServerUnaryCall, + callback: (error: ServiceError | null, value?: ListWorkloadsResponse) => void, + ) => { + try { + const containers = await opts.containers.listContainersByLabelKey(WORKLOAD_KEY_LABEL, { all: true }); + const workloads: WorkloadListItem[] = []; + for (const container of containers) { + if (container.labels[SIDECAR_ROLE_LABEL] === SIDECAR_ROLE_VALUE) continue; + const workloadKey = container.labels[WORKLOAD_KEY_LABEL]; + if (!workloadKey) continue; + workloads.push( + create(WorkloadListItemSchema, { + instanceId: container.id, + workloadKey, + }), + ); + } + callback(null, create(ListWorkloadsResponseSchema, { workloads })); + } catch (error) { + callback(toDockerServiceError(error, status.UNKNOWN)); + } + }, findWorkloadsByLabels: async ( call: ServerUnaryCall, callback: (error: ServiceError | null, value?: FindWorkloadsByLabelsResponse) => void, @@ -597,6 +664,28 @@ export function createRunnerGrpcServer(opts: RunnerGrpcOptions): Server { callback(toDockerServiceError(error, status.UNKNOWN)); } }, + listVolumes: async ( + call: ServerUnaryCall, + callback: (error: ServiceError | null, value?: ListVolumesResponse) => void, + ) => { + try { + const volumes = await opts.containers.listVolumesByLabelKey(VOLUME_KEY_LABEL); + const items: VolumeListItem[] = []; + for (const volume of volumes) { + const volumeKey = volume.labels[VOLUME_KEY_LABEL]; + if (!volumeKey) continue; + items.push( + create(VolumeListItemSchema, { + instanceId: volume.name, + volumeKey, + }), + ); + } + callback(null, create(ListVolumesResponseSchema, { volumes: items })); + } catch (error) { + callback(toDockerServiceError(error, status.UNKNOWN)); + } + }, removeVolume: async ( call: ServerUnaryCall, callback: (error: ServiceError | null, value?: RemoveVolumeResponse) => void,