From bb6ddf47912ae88b92bd53bc930f894ac28e7d42 Mon Sep 17 00:00:00 2001 From: llogen Date: Thu, 22 Jan 2026 13:53:58 +0100 Subject: [PATCH 1/2] feat: implement chunked file transfer protocol This change replaces the simple full-file transfer mechanism with a robust chunked protocol that supports: - Concurrent uploads/downloads with unique transfer IDs - 1MB chunk streaming to support large files - Transfer state management with acknowledgments - Round-robin scheduling for fair resource usage during concurrent transfers Signed-off-by: llogen --- cmds/dutagent/states.go | 31 +- cmds/dutctl/file_transfer.go | 543 ++++++++++++++++++++ cmds/dutctl/rpc.go | 81 ++- go.mod | 2 +- internal/dutagent/broker.go | 28 +- internal/dutagent/broker_test.go | 11 +- internal/dutagent/sendserialize_test.go | 75 +++ internal/dutagent/session.go | 385 ++++++++++++-- internal/dutagent/worker.go | 484 ++++++++++++++---- internal/test/mock/session.go | 2 +- pkg/module/dummy/dummy_file_transfer.go | 4 +- pkg/module/file/file.go | 9 +- pkg/module/flash/flash.go | 11 +- pkg/module/module.go | 3 +- protobuf/buf.gen.yaml | 6 +- protobuf/buf.yaml | 2 +- protobuf/dutctl/v1/dutctl.proto | 63 ++- protobuf/gen/dutctl/v1/dutctl.pb.go | 643 +++++++++++++++++++----- 18 files changed, 2041 insertions(+), 342 deletions(-) create mode 100644 cmds/dutctl/file_transfer.go create mode 100644 internal/dutagent/sendserialize_test.go diff --git a/cmds/dutagent/states.go b/cmds/dutagent/states.go index 5169a25a..933d7dd6 100644 --- a/cmds/dutagent/states.go +++ b/cmds/dutagent/states.go @@ -31,6 +31,7 @@ type runCmdArgs struct { cmdMsg *pb.Command dev dut.Device cmd dut.Command + broker *dutagent.Broker session module.Session moduleErrCh chan error brokerErrCh <-chan error @@ -151,8 +152,10 @@ func releaseAutoLock(_ context.Context, args runCmdArgs) (runCmdArgs, fsm.State[ // in a separate goroutine, this state will not wait for the execution to finish. // Further, worker goroutines will be started to serve the module-to-client communication // during the module execution. +// +//nolint:funlen func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State[runCmdArgs], error) { - broker := &dutagent.Broker{} + args.broker = &dutagent.Broker{} // Deferred initialization of the moduleErr channel: only create if not already provided // (tests may still pass a custom channel). @@ -163,7 +166,7 @@ func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State rpcCtx := ctx modCtx, modCtxCancel := context.WithCancel(rpcCtx) - moduleSession, brokerErrCh := broker.Start(modCtx, args.stream) + moduleSession, brokerErrCh := args.broker.Start(modCtx, args.stream) args.brokerErrCh = brokerErrCh args.session = moduleSession @@ -178,12 +181,19 @@ func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State // Run the modules in a goroutine. // Termination of the module execution is signaled by closing the moduleErrCh channel. go func() { + defer modCtxCancel() // Ensure workers exit even if stream doesn't close + cnt := len(args.cmd.Modules) for idx, module := range args.cmd.Modules { if ctx.Err() != nil { log.Printf("Execution aborted, %d of %d modules done: %v", idx, cnt, ctx.Err()) - modCtxCancel() + args.broker.Shutdown() + + // Wait for file transfers to complete (workers will exit gracefully) + log.Print("Waiting for file transfers to complete...") + args.broker.WaitForTransfersToComplete() + log.Print("All file transfers completed") return } @@ -195,14 +205,25 @@ func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State args.moduleErrCh <- err log.Printf("Module %q failed: %v", module.Config.Name, err) - modCtxCancel() + args.broker.Shutdown() + + // Wait for file transfers to complete (workers will exit gracefully) + log.Print("Waiting for file transfers to complete...") + args.broker.WaitForTransfersToComplete() + log.Print("All file transfers completed") return } } log.Print("All modules finished successfully") - modCtxCancel() + args.broker.Shutdown() + + // Wait for file transfers to complete (workers will exit gracefully) + log.Print("Waiting for file transfers to complete...") + args.broker.WaitForTransfersToComplete() + log.Print("All file transfers completed") + close(args.moduleErrCh) }() diff --git a/cmds/dutctl/file_transfer.go b/cmds/dutctl/file_transfer.go new file mode 100644 index 00000000..67f206a7 --- /dev/null +++ b/cmds/dutctl/file_transfer.go @@ -0,0 +1,543 @@ +// Copyright 2025 Blindspot Software +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "fmt" + "io" + "log" + "os" + "path/filepath" + "sync" + + "connectrpc.com/connect" + + pb "github.com/BlindspotSoftware/dutctl/protobuf/gen/dutctl/v1" +) + +const ( + clientChunkSize = 1024 * 1024 // 1MB chunks + downloadFilePerms = 0o600 // Downloaded file permissions (user read/write only) +) + +// StreamForClient is a type alias for the stream connection to reduce line length. +type StreamForClient = *connect.BidiStreamForClient[pb.RunRequest, pb.RunResponse] + +// clientFileTransferState represents an active file transfer on the client. +type clientFileTransferState struct { + transferID string + path string + file *os.File + direction string // "upload" or "download" + expectedChunkNum int32 // For validating chunk sequence on download + mu sync.Mutex +} + +// clientFileTransferManager manages file transfers on the client side. +type clientFileTransferManager struct { + transfers map[string]*clientFileTransferState + mu sync.RWMutex + cmdArgs []string // Command arguments for file path validation +} + +func newClientFileTransferManager(cmdArgs []string) *clientFileTransferManager { + return &clientFileTransferManager{ + transfers: make(map[string]*clientFileTransferState), + cmdArgs: cmdArgs, + } +} + +func (m *clientFileTransferManager) registerTransfer(transferID, path, direction string) *clientFileTransferState { + m.mu.Lock() + defer m.mu.Unlock() + + state := &clientFileTransferState{ + transferID: transferID, + path: path, + direction: direction, + } + + m.transfers[transferID] = state + + return state +} + +func (m *clientFileTransferManager) getTransfer(transferID string) *clientFileTransferState { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.transfers[transferID] +} + +func (m *clientFileTransferManager) removeTransfer(transferID string) { + m.mu.Lock() + defer m.mu.Unlock() + + if state, exists := m.transfers[transferID]; exists { + if state.file != nil { + state.file.Close() + } + + delete(m.transfers, transferID) + } +} + +// normalizePath expands ~ and converts to absolute path for consistent comparison. +// Returns the normalized path or logs error and returns original path. +func normalizePath(path string) string { + // Expand ~ to home directory + expanded := path + if path != "" && path[0] == '~' { + home, err := os.UserHomeDir() + if err != nil { + log.Printf("Warning: could not expand ~: %v, using path as-is", err) + + return path + } + + expanded = filepath.Join(home, path[1:]) + } + + // Convert to absolute path + abs, err := filepath.Abs(expanded) + if err != nil { + log.Printf("Warning: could not convert to absolute path %q: %v, using expanded path", expanded, err) + + return expanded + } + + return abs +} + +// isValidPath checks if a file path is explicitly mentioned in the command arguments. +// Normalizes both paths (expands ~ and converts to absolute) before comparison. +func (m *clientFileTransferManager) isValidPath(path string) bool { + normalizedPath := normalizePath(path) + + for _, arg := range m.cmdArgs { + normalizedArg := normalizePath(arg) + + if normalizedArg == normalizedPath { + return true + } + } + + return false +} + +// sendChunkToAgent sends a file chunk to the agent. +func (m *clientFileTransferManager) sendChunkToAgent( + transferID string, + chunkNum int32, + data []byte, + isFinal bool, + stream StreamForClient, +) error { + chunk := &pb.RunRequest{ + Msg: &pb.RunRequest_FileChunk{ + FileChunk: &pb.FileChunk{ + TransferId: transferID, + ChunkNumber: chunkNum, + ChunkData: data, + ChunkOffset: int64(chunkNum) * int64(clientChunkSize), + IsFinal: isFinal, + }, + }, + } + + return stream.Send(chunk) +} + +// handleUploadRequest processes a request to upload a file to the agent. +func (m *clientFileTransferManager) handleUploadRequest(transferID, path string, stream StreamForClient) error { + // Validate that the requested file is in the command arguments + if !m.isValidPath(path) { + errMsg := fmt.Sprintf("file %q not specified in command arguments - security violation prevented", path) + log.Printf("Error: %s", errMsg) + + rejectErr := m.sendTransferError(transferID, errMsg, stream) + if rejectErr != nil { + return fmt.Errorf("sending transfer rejection: %w", rejectErr) + } + + return nil + } + + _, statErr := os.Stat(path) + if statErr != nil { + log.Printf("Error accessing file %q: %v", path, statErr) + + rejectErr := m.sendTransferError(transferID, fmt.Sprintf("cannot access file: %v", statErr), stream) + if rejectErr != nil { + return fmt.Errorf("sending transfer rejection: %w", rejectErr) + } + + return nil + } + + file, err := os.Open(path) + if err != nil { + log.Printf("Error opening file %q: %v", path, err) + + rejectErr := m.sendTransferError(transferID, fmt.Sprintf("cannot open file: %v", err), stream) + if rejectErr != nil { + return fmt.Errorf("sending transfer rejection: %w", rejectErr) + } + + return nil + } + + state := m.registerTransfer(transferID, path, "upload") + state.file = file + + acceptErr := m.sendTransferAcceptance(transferID, stream) + if acceptErr != nil { + file.Close() + m.removeTransfer(transferID) + + return fmt.Errorf("sending transfer acceptance: %w", acceptErr) + } + + log.Printf("Uploading %q to device...", filepath.Base(path)) + + m.sendUploadInChunks(transferID, path, file, stream) + + return nil +} + +// handleDownloadRequest processes a request to download a file from the agent. +// The agent specifies what file it will send, and the destination path from +// command arguments is where we should save it. +func (m *clientFileTransferManager) handleDownloadRequest(transferID, destinationPath string, stream StreamForClient) error { + // Validate that the destination file path is in the command arguments + if !m.isValidPath(destinationPath) { + errMsg := fmt.Sprintf("file %q not specified in command arguments - security violation prevented", destinationPath) + log.Printf("Error: %s", errMsg) + + rejectErr := m.sendTransferError(transferID, errMsg, stream) + if rejectErr != nil { + return fmt.Errorf("sending transfer rejection: %w", rejectErr) + } + + return nil + } + + // Register the download transfer + m.registerTransfer(transferID, destinationPath, "download") + + log.Printf("Downloading file to %q...", filepath.Base(destinationPath)) + + // Send acceptance to agent + acceptErr := m.sendTransferAcceptance(transferID, stream) + if acceptErr != nil { + m.removeTransfer(transferID) + + return fmt.Errorf("sending transfer acceptance: %w", acceptErr) + } + + return nil +} + +// sendTransferAcceptance sends a transfer acceptance response. +func (m *clientFileTransferManager) sendTransferAcceptance(transferID string, stream StreamForClient) error { + res := &pb.RunRequest{ + Msg: &pb.RunRequest_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_ACCEPTED, + }, + }, + } + + return stream.Send(res) +} + +// sendUploadInChunks reads and sends a file in chunks to the agent. +func (m *clientFileTransferManager) sendUploadInChunks(transferID, path string, file *os.File, stream StreamForClient) { + go func() { + defer file.Close() + defer m.removeTransfer(transferID) + + chunkNum := int32(0) + + for { + chunkData := make([]byte, clientChunkSize) + bytesRead, readErr := file.Read(chunkData) + + if readErr != nil && readErr != io.EOF { + log.Printf("Error reading file %q: %v", path, readErr) + + return + } + + isFinal := readErr == io.EOF + + if bytesRead > 0 { + chunkData = chunkData[:bytesRead] + } else { + // Final empty chunk to signal EOF + chunkData = []byte{} + } + + chunkErr := m.sendChunkToAgent(transferID, chunkNum, chunkData, isFinal, stream) + if chunkErr != nil { + log.Printf("Error sending file chunk: %v", chunkErr) + + return + } + + chunkNum++ + + if isFinal { + break + } + } + }() +} + +// handleFileTransferRequest handles a FileTransferRequest from the agent. +// This can be either: +// 1. A request for the client to upload a file to the agent (agent requesting from client) +// 2. A notification that the agent will send a file download +// The direction is explicitly specified in the FileTransferRequest message. +func (m *clientFileTransferManager) handleFileTransferRequest(ftReq *pb.FileTransferRequest, stream StreamForClient) error { + transferID := ftReq.GetTransferId() + metadata := ftReq.GetMetadata() + path := metadata.GetPath() + direction := ftReq.GetDirection() + + switch direction { + case pb.FileTransferRequest_UPLOAD: + // Agent is requesting a file from the client (client uploads to agent) + return m.handleUploadRequest(transferID, path, stream) + + case pb.FileTransferRequest_DOWNLOAD: + // Agent is sending a file to the client (client downloads from agent) + return m.handleDownloadRequest(transferID, path, stream) + + default: + // Unspecified or unknown direction + errMsg := fmt.Sprintf("unknown transfer direction %v (agent/client version mismatch?)", direction) + log.Printf("Error: %s", errMsg) + + return m.sendTransferError(transferID, errMsg, stream) + } +} + +// sendTransferError sends an error response for a failed transfer. +func (m *clientFileTransferManager) sendTransferError(transferID, message string, stream StreamForClient) error { + res := &pb.RunRequest{ + Msg: &pb.RunRequest_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_TRANSFER_REJECTED, + ErrorMessage: message, + }, + }, + } + + return stream.Send(res) +} + +// sendChunkAcknowledgment sends an acknowledgment for a received chunk. +func (m *clientFileTransferManager) sendChunkAcknowledgment(transferID string, nextChunk int32, stream StreamForClient) error { + res := &pb.RunRequest{ + Msg: &pb.RunRequest_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_CHUNK_RECEIVED, + NextChunkExpected: nextChunk, + }, + }, + } + + return stream.Send(res) +} + +// sendTransferComplete sends a transfer completion response. +func (m *clientFileTransferManager) sendTransferComplete(transferID string, stream StreamForClient) error { + res := &pb.RunRequest{ + Msg: &pb.RunRequest_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_TRANSFER_COMPLETE, + }, + }, + } + + return stream.Send(res) +} + +// validateChunkTransfer validates transfer existence, path authorization, and chunk sequence. +// Returns error if validation fails (error already sent to stream), nil otherwise. +func (m *clientFileTransferManager) validateChunkTransfer( + chunk *pb.FileChunk, + stream StreamForClient, +) (*clientFileTransferState, error) { + transferID := chunk.GetTransferId() + + // Validate transfer exists. + state := m.getTransfer(transferID) + if state == nil { + log.Printf("Error: received chunk for unknown transfer %s", transferID) + + sendErr := m.sendTransferError(transferID, "unknown transfer", stream) + if sendErr != nil { + return nil, fmt.Errorf("sending error response: %w", sendErr) + } + + return nil, fmt.Errorf("unknown transfer") + } + + // Validate path is authorized. + if !m.isValidPath(state.path) { + errMsg := fmt.Sprintf("file %q not in command arguments", state.path) + log.Printf("Error: %s", errMsg) + + m.removeTransfer(transferID) + + sendErr := m.sendTransferError(transferID, errMsg, stream) + if sendErr != nil { + return nil, fmt.Errorf("sending error response: %w", sendErr) + } + + return nil, fmt.Errorf("path not authorized") + } + + // Validate chunk sequence. + state.mu.Lock() + + if chunk.GetChunkNumber() != state.expectedChunkNum { + expected := state.expectedChunkNum + state.mu.Unlock() + + log.Printf("Error: chunk order violation for %s: expected %d, got %d", + transferID, expected, chunk.GetChunkNumber()) + + m.removeTransfer(transferID) + + sendErr := m.sendTransferError(transferID, "chunk sequence error", stream) + if sendErr != nil { + return nil, fmt.Errorf("sending error response: %w", sendErr) + } + + return nil, fmt.Errorf("chunk sequence error") + } + + state.mu.Unlock() + + return state, nil +} + +// processChunkData creates file if needed and writes chunk data. +func (m *clientFileTransferManager) processChunkData( + chunk *pb.FileChunk, + state *clientFileTransferState, + stream StreamForClient, +) error { + transferID := chunk.GetTransferId() + + // Create file on first chunk. + if chunk.GetChunkNumber() == 0 { + file, err := os.OpenFile(state.path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, downloadFilePerms) + if err != nil { + log.Printf("Error creating download file: %v", err) + + m.removeTransfer(transferID) + + sendErr := m.sendTransferError(transferID, fmt.Sprintf("cannot create file: %v", err), stream) + if sendErr != nil { + return fmt.Errorf("sending error response: %w", sendErr) + } + + return fmt.Errorf("cannot create file") + } + + state.mu.Lock() + state.file = file + state.mu.Unlock() + } + + // Write chunk data. + state.mu.Lock() + file := state.file + state.mu.Unlock() + + if file != nil && len(chunk.GetChunkData()) > 0 { + _, writeErr := file.Write(chunk.GetChunkData()) + if writeErr != nil { + log.Printf("Error writing to file: %v", writeErr) + + m.removeTransfer(transferID) + + sendErr := m.sendTransferError(transferID, fmt.Sprintf("write error: %v", writeErr), stream) + if sendErr != nil { + return fmt.Errorf("sending error response: %w", sendErr) + } + + return fmt.Errorf("write error") + } + } + + return nil +} + +// handleFileChunk handles a FileChunk from the agent (file download). +// +//nolint:nilerr // errors are already sent to stream, returning nil to continue processing +func (m *clientFileTransferManager) handleFileChunk(chunk *pb.FileChunk, stream StreamForClient) error { + transferID := chunk.GetTransferId() + + state, err := m.validateChunkTransfer(chunk, stream) + if err != nil { + // Validation failed, error already sent to stream + return nil + } + + err = m.processChunkData(chunk, state, stream) + if err != nil { + // Error occurred, error already sent to stream + return nil + } + + // Update expected chunk number. + state.mu.Lock() + state.expectedChunkNum++ + state.mu.Unlock() + + // Send acknowledgment. + ackErr := m.sendChunkAcknowledgment(transferID, chunk.GetChunkNumber()+1, stream) + if ackErr != nil { + return fmt.Errorf("sending chunk ack: %w", ackErr) + } + + // Final chunk: close file and send completion. + if chunk.GetIsFinal() { + completeErr := m.sendTransferComplete(transferID, stream) + if completeErr != nil { + return fmt.Errorf("sending completion: %w", completeErr) + } + + m.removeTransfer(transferID) + } + + return nil +} + +// handleFileTransferResponse handles a FileTransferResponse from the agent (acknowledgments). +// Silently processes responses; only logs errors. +func (m *clientFileTransferManager) handleFileTransferResponse(ftRes *pb.FileTransferResponse) { + transferID := ftRes.GetTransferId() + status := ftRes.GetStatus() + + switch status { + case pb.FileTransferResponse_ERROR: + log.Printf("File transfer error for %s: %s", transferID, ftRes.GetErrorMessage()) + m.removeTransfer(transferID) + case pb.FileTransferResponse_TRANSFER_COMPLETE: + m.removeTransfer(transferID) + } + // Other statuses (CHUNK_RECEIVED, ACCEPTED) are silently processed +} diff --git a/cmds/dutctl/rpc.go b/cmds/dutctl/rpc.go index 13e3075c..f877641f 100644 --- a/cmds/dutctl/rpc.go +++ b/cmds/dutctl/rpc.go @@ -10,9 +10,7 @@ import ( "errors" "fmt" "io" - "io/fs" "log" - "os" "strings" "time" @@ -197,6 +195,8 @@ func (app *application) runRPC(device, command string, cmdArgs []string) error { errChan := make(chan error, numWorkers) + ftManager := newClientFileTransferManager(cmdArgs) + stream := app.rpcClient.Run(runCtx) stream.RequestHeader().Set(lock.UserHeader, app.user) @@ -225,21 +225,24 @@ func (app *application) runRPC(device, command string, cmdArgs []string) error { // Receive routine go func() { - defer cancel() + defer func() { + if r := recover(); r != nil { + log.Printf("Receive routine panic: %v", r) + } + + cancel() + }() for { select { case <-runCtx.Done(): - log.Println("Receive routine terminating: Run-Context cancelled") - return - default: // Unblock select, continue with the forwarding logic. + default: } res, err := stream.Receive() - if errors.Is(err, io.EOF) { - log.Println("Receive routine terminating: Stream closed by agent") + if errors.Is(err, io.EOF) { return } else if err != nil { errChan <- fmt.Errorf("receiving RPC message: %w", err) @@ -273,51 +276,30 @@ func (app *application) runRPC(device, command string, cmdArgs []string) error { case *pb.Console_Stdin: log.Printf("Unexpected Console Stdin %q", string(consoleData.Stdin)) } - case *pb.RunResponse_FileRequest: - path := msg.FileRequest.GetPath() - log.Printf("File request for: %q\n", path) - - content, err := os.ReadFile(path) - if err != nil { - errChan <- fmt.Errorf("reading requested file %q: %w", path, err) - - return - } + case *pb.RunResponse_FileTransferRequest: + ftReq := msg.FileTransferRequest - err = stream.Send(&pb.RunRequest{ - Msg: &pb.RunRequest_File{ - File: &pb.File{ - Path: path, - Content: content, - }, - }, - }) - if err != nil { - errChan <- fmt.Errorf("sending requested file %q: %w", path, err) + ftErr := ftManager.handleFileTransferRequest(ftReq, stream) + if ftErr != nil { + errChan <- ftErr return } - log.Printf("Sent file: %q\n", path) - case *pb.RunResponse_File: - path := msg.File.GetPath() - content := msg.File.GetContent() - - log.Printf("Received file: %q\n", path) - - if len(content) == 0 { - log.Println("Received empty file content") - } - - perm := 0600 + case *pb.RunResponse_FileChunk: + chunk := msg.FileChunk - err = os.WriteFile(path, content, fs.FileMode(perm)) - if err != nil { - errChan <- fmt.Errorf("saving received file %q: %w", path, err) + chunkErr := ftManager.handleFileChunk(chunk, stream) + if chunkErr != nil { + errChan <- chunkErr return } + case *pb.RunResponse_FileTransferResponse: + ftRes := msg.FileTransferResponse + ftManager.handleFileTransferResponse(ftRes) + default: log.Printf("Unexpected message type %T", msg) } @@ -348,14 +330,17 @@ func (app *application) runRPC(device, command string, cmdArgs []string) error { text, err := reader.ReadString('\n') if err != nil { - if !errors.Is(err, io.EOF) { - errChan <- fmt.Errorf("reading stdin: %w", err) + if errors.Is(err, io.EOF) { + // EOF on stdin is normal when there's no interactive input + log.Println("Send routine: stdin closed (EOF), stopping stdin forwarding") + } else { + log.Printf("Send routine: error reading stdin: %v", err) } return } - err = stream.Send(&pb.RunRequest{ + sendErr := stream.Send(&pb.RunRequest{ Msg: &pb.RunRequest_Console{ Console: &pb.Console{ Data: &pb.Console_Stdin{ @@ -364,8 +349,8 @@ func (app *application) runRPC(device, command string, cmdArgs []string) error { }, }, }) - if err != nil { - errChan <- fmt.Errorf("sending RPC message: %w", err) + if sendErr != nil { + log.Printf("Send routine: error sending to stream: %v", sendErr) return } diff --git a/go.mod b/go.mod index ba1890b8..7be5823b 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/bougou/go-ipmi v0.7.8 github.com/go-playground/validator/v10 v10.30.2 github.com/google/go-cmp v0.7.0 + github.com/google/uuid v1.1.2 github.com/stianeikeland/go-rpio/v4 v4.6.0 github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07 golang.org/x/crypto v0.51.0 @@ -20,7 +21,6 @@ require ( github.com/gabriel-vasile/mimetype v1.4.13 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/google/uuid v1.1.2 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect diff --git a/internal/dutagent/broker.go b/internal/dutagent/broker.go index 333deb93..44ab9f8d 100644 --- a/internal/dutagent/broker.go +++ b/internal/dutagent/broker.go @@ -30,12 +30,14 @@ type Broker struct { func (b *Broker) init() { log.Print("Broker: Initializing") - b.session.printCh = make(chan string) - b.session.stdinCh = make(chan []byte) - b.session.stdoutCh = make(chan []byte) - b.session.stderrCh = make(chan []byte) - b.session.fileReqCh = make(chan string) - b.session.fileCh = make(chan chan []byte) + b.session.printCh = make(chan string, channelBufferSize) + b.session.stdinCh = make(chan []byte, channelBufferSize) + b.session.stdoutCh = make(chan []byte, channelBufferSize) + b.session.stderrCh = make(chan []byte, channelBufferSize) + b.session.shutdownCh = make(chan struct{}) + b.session.fileTransferNotifyCh = make(chan struct{}, 1) + b.session.activeUploads = make(map[string]*uploadState) + b.session.activeDownloads = make(map[string]*downloadState) // Buffer equals number of workers so error sends never block. b.errCh = make(chan error, numWorkers) @@ -59,6 +61,10 @@ func (b *Broker) Start(ctx context.Context, s Stream) (module.Session, <-chan er go func() { b.wg.Wait() + // Both workers have exited. Release any transfer still in flight so + // WaitForTransfers (and thus graceful shutdown) cannot block forever + // when the client disconnected mid-transfer. + b.session.abortTransfers() close(b.errCh) }() }) @@ -102,3 +108,13 @@ func (b *Broker) fromClient(ctx context.Context, cancel context.CancelFunc) { cancel() }() } + +// Shutdown signals workers to stop accepting new sends and begin graceful shutdown. +func (b *Broker) Shutdown() { + b.session.Shutdown() +} + +// WaitForTransfersToComplete blocks until all active file transfers complete. +func (b *Broker) WaitForTransfersToComplete() { + b.session.WaitForTransfers() +} diff --git a/internal/dutagent/broker_test.go b/internal/dutagent/broker_test.go index d8db2c67..fa743402 100644 --- a/internal/dutagent/broker_test.go +++ b/internal/dutagent/broker_test.go @@ -32,9 +32,8 @@ func (s *testStream) Send(_ *pb.RunResponse) error { func (s *testStream) Receive() (*pb.RunRequest, error) { if s.recvBlock { - if s.unblockCh == nil { - s.unblockCh = make(chan struct{}) - } + // unblockCh must be created by the test before Start so this goroutine + // only reads it, never writes it (a write here would race the test). <-s.unblockCh // will block until closed; simulates a long receive } @@ -156,15 +155,13 @@ func TestBroker_StdinForwarding(t *testing.T) { // Cancellation during a blocked receive should terminate fromClientWorker without producing errors. func TestBroker_CancelDuringBlockedReceive(t *testing.T) { b := &Broker{} - stream := &testStream{recvBlock: true} + stream := &testStream{recvBlock: true, unblockCh: make(chan struct{})} ctx, cancel := context.WithCancel(context.Background()) _, errCh := b.Start(ctx, stream) // Cancel promptly, then unblock the fake receive so worker goroutine does not leak. cancel() - if stream.unblockCh != nil { - close(stream.unblockCh) - } + close(stream.unblockCh) errs := collectErrors(t, errCh, 200*time.Millisecond) if len(errs) != 0 { diff --git a/internal/dutagent/sendserialize_test.go b/internal/dutagent/sendserialize_test.go new file mode 100644 index 00000000..0fb63209 --- /dev/null +++ b/internal/dutagent/sendserialize_test.go @@ -0,0 +1,75 @@ +// Copyright 2025 Blindspot Software +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package dutagent + +import ( + "io" + "sync" + "sync/atomic" + "testing" + "time" + + pb "github.com/BlindspotSoftware/dutctl/protobuf/gen/dutctl/v1" +) + +// concurrencyStream records the maximum number of Send calls that overlap. +// The connect BidiStream forbids concurrent Send, so sendToClient must keep +// this at 1 no matter how many goroutines call it. +type concurrencyStream struct { + inSend atomic.Int32 + maxSeen atomic.Int32 + calls atomic.Int32 +} + +func (c *concurrencyStream) Send(_ *pb.RunResponse) error { + now := c.inSend.Add(1) + + for { + prev := c.maxSeen.Load() + if now <= prev || c.maxSeen.CompareAndSwap(prev, now) { + break + } + } + + time.Sleep(time.Millisecond) // widen the window so overlaps are observable + + c.calls.Add(1) + c.inSend.Add(-1) + + return nil +} + +func (c *concurrencyStream) Receive() (*pb.RunRequest, error) { return nil, io.EOF } + +// TestSendToClientSerializes verifies that sendToClient serialises stream sends, +// which is required because both workers send on the same connect BidiStream. +func TestSendToClientSerializes(t *testing.T) { + s := &session{} + stream := &concurrencyStream{} + + const goroutines = 50 + + var wg sync.WaitGroup + + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + + _ = s.sendToClient(stream, &pb.RunResponse{}) + }() + } + + wg.Wait() + + if got := stream.maxSeen.Load(); got > 1 { + t.Errorf("observed %d concurrent Send calls, want at most 1", got) + } + + if got := stream.calls.Load(); got != goroutines { + t.Errorf("Send called %d times, want %d", got, goroutines) + } +} diff --git a/internal/dutagent/session.go b/internal/dutagent/session.go index d8ea6222..0e7f89cc 100644 --- a/internal/dutagent/session.go +++ b/internal/dutagent/session.go @@ -8,23 +8,71 @@ import ( "fmt" "io" "log" + "sync" "github.com/BlindspotSoftware/dutctl/internal/chanio" + "github.com/google/uuid" + + pb "github.com/BlindspotSoftware/dutctl/protobuf/gen/dutctl/v1" +) + +const ( + // chunkSize is the maximum size of a single file chunk (1MB). + chunkSize = 1024 * 1024 + + // channelBufferSize is the buffer size for internal channels. + channelBufferSize = 128 ) +// uploadState represents an active upload from client to agent. +type uploadState struct { + transferID string + metadata *pb.FileMetadata + lastChunk int32 // Track last received chunk number for sequence validation + complete bool + file *io.PipeWriter + reader *io.PipeReader + requestSent bool // Track if initial FileTransferRequest has been sent + mu sync.Mutex +} + +// downloadState represents an active download from agent to client. +type downloadState struct { + transferID string + metadata *pb.FileMetadata + reader io.Reader + closer io.Closer // Optional closer for the reader (e.g. *os.File) + chunkNumber int32 // Chunk being sent + awaitingFinalAck bool // Waiting for client TRANSFER_COMPLETE +} + // session implements the module.Session interface. type session struct { - printCh chan string - stdinCh chan []byte - stdoutCh chan []byte - stderrCh chan []byte - fileReqCh chan string - fileCh chan chan []byte // a single file is represented by a channel of bytes + printCh chan string + stdinCh chan []byte + stdoutCh chan []byte + stderrCh chan []byte + shutdownCh chan struct{} // Graceful shutdown signal + + // File transfer tracking + activeUploads map[string]*uploadState // transferID -> upload state + activeDownloads map[string]*downloadState // transferID -> download state + uploadMutex sync.RWMutex + downloadMutex sync.RWMutex + + // Notification channel for file transfer activity. + // Signaled when a new transfer is registered so toClientWorker wakes up. + fileTransferNotifyCh chan struct{} + + // sendMu serializes all sends on the bidirectional stream. The connect + // BidiStream forbids concurrent Send calls, and both workers (toClientWorker + // and fromClientWorker) send responses, so every send must hold this lock. + sendMu sync.Mutex - // currentFile holds the name of the file currently being transferred. - // It is used for both, to indicate the file that was requested by the module - // and the file that is being sent back to the client. - currentFile string + // Shutdown state tracking + shutdownMutex sync.Mutex + isShuttingDown bool + transferWg sync.WaitGroup // Tracks active transfers for graceful shutdown } func (s *session) Print(a ...any) { @@ -39,6 +87,8 @@ func (s *session) Println(a ...any) { s.printCh <- fmt.Sprintln(a...) } +// Console returns readers and writers for interactive console I/O. +// //nolint:nonamedreturns func (s *session) Console() (stdin io.Reader, stdout, stderr io.Writer) { var ( @@ -65,44 +115,319 @@ func (s *session) Console() (stdin io.Reader, stdout, stderr io.Writer) { return stdinReader, stdoutWriter, stderrWriter } +// RequestFile requests a file from the client. +// It sends a FileTransferRequest and returns a reader that streams the file chunks. func (s *session) RequestFile(name string) (io.Reader, error) { - if s.fileReqCh == nil { - log.Fatal("session.RequestFile() called but session.fileReq is nil") + transferID := uuid.New().String() + + log.Printf("Module issued file request for: %q (transfer_id=%s)", name, transferID) + + // Create upload state with pipe for streaming + reader, writer := io.Pipe() + state := &uploadState{ + transferID: transferID, + metadata: &pb.FileMetadata{ + Path: name, + Name: name, + Size: 0, // Size unknown; client has the file + }, + file: writer, + reader: reader, + lastChunk: -1, } - log.Printf("Module issued file request for: %q", name) + s.uploadMutex.Lock() + s.activeUploads[transferID] = state + s.uploadMutex.Unlock() - s.fileReqCh <- name // Send the file request to the client. + s.transferWg.Add(1) + s.notifyFileTransfer() - file := <-s.fileCh // This will block until the client sends the file. + return state.reader, nil +} - r, err := chanio.NewChanReader(file) - if err != nil { - log.Fatalf("session.RequestFile() failed to create reader: %v", err) +// SendFile sends a file to the client. +// It chunks the file and manages the transfer state. +// The size parameter should be the total file size in bytes. +// If the reader implements io.Closer, the session takes ownership and closes it +// when the transfer completes. +func (s *session) SendFile(name string, size int64, r io.Reader) error { + transferID := uuid.New().String() + + log.Printf("Module issued file send for: %q, size: %d bytes (transfer_id=%s)", name, size, transferID) + + state := &downloadState{ + transferID: transferID, + metadata: &pb.FileMetadata{ + Path: name, + Name: name, + Size: size, + }, + reader: r, + chunkNumber: 0, } - return r, nil + // If the reader is also a Closer, take ownership of its lifecycle. + if c, ok := r.(io.Closer); ok { + state.closer = c + } + + s.downloadMutex.Lock() + s.activeDownloads[transferID] = state + s.downloadMutex.Unlock() + + s.transferWg.Add(1) + s.notifyFileTransfer() + + return nil +} + +// notifyFileTransfer signals the toClientWorker that file transfer work is available. +func (s *session) notifyFileTransfer() { + select { + case s.fileTransferNotifyCh <- struct{}{}: + default: + } } -func (s *session) SendFile(name string, r io.Reader) error { - if s.currentFile != "" { - log.Fatal("session.SendFile() called during a ongoing file request") +// getNextChunk returns the next chunk for a download transfer. +// Returns the chunk, a flag indicating if this is the final chunk, and any error. +func (s *session) getNextChunk(transferID string) (*pb.FileChunk, bool, error) { + s.downloadMutex.RLock() + state, exists := s.activeDownloads[transferID] + s.downloadMutex.RUnlock() + + if !exists { + return nil, false, fmt.Errorf("download not found: %s", transferID) } - content, err := io.ReadAll(r) + // Read next chunk from reader + chunkData := make([]byte, chunkSize) + n, err := state.reader.Read(chunkData) + + chunkData = chunkData[:n] + + isFinal := err == io.EOF + if err != nil && err != io.EOF { + return nil, false, err + } + + // Calculate offset and chunk number + chunkOffset := int64(state.chunkNumber) * int64(chunkSize) + + chunk := &pb.FileChunk{ + TransferId: transferID, + ChunkNumber: state.chunkNumber, + ChunkData: chunkData, + ChunkOffset: chunkOffset, + IsFinal: isFinal, + } + + // Increment chunk number for next call + state.chunkNumber++ + + return chunk, isFinal, nil +} + +// registerUploadChunk registers a received chunk for an upload transfer. +func (s *session) registerUploadChunk(transferID string, chunk *pb.FileChunk) error { + s.uploadMutex.Lock() + state, exists := s.activeUploads[transferID] + s.uploadMutex.Unlock() + + if !exists { + return fmt.Errorf("upload not found: %s", transferID) + } + + state.mu.Lock() + defer state.mu.Unlock() + + // Validate chunk sequence + expectedChunk := state.lastChunk + 1 + if chunk.GetChunkNumber() != expectedChunk { + return fmt.Errorf("chunk order violation: expected %d, got %d", expectedChunk, chunk.GetChunkNumber()) + } + + state.lastChunk = chunk.GetChunkNumber() + + // Write chunk to pipe (data is streamed directly, not stored in memory) + _, err := state.file.Write(chunk.GetChunkData()) if err != nil { return err } - log.Printf("Module issued file transfer of : %q, with %d bytes", name, len(content)) + // If this is the final chunk, close the pipe + if chunk.GetIsFinal() { + state.complete = true + state.file.Close() + } - s.currentFile = name + return nil +} - file := make(chan []byte, 1) - s.fileCh <- file - file <- content +// getActiveUploads returns a list of active upload transfer IDs. +func (s *session) getActiveUploads() []string { + s.uploadMutex.RLock() + defer s.uploadMutex.RUnlock() - close(file) // indicate EOF. + transferIDs := make([]string, 0, len(s.activeUploads)) + for id := range s.activeUploads { + transferIDs = append(transferIDs, id) + } - return nil + return transferIDs +} + +// getActiveDownloads returns a list of active download transfer IDs. +func (s *session) getActiveDownloads() []string { + s.downloadMutex.RLock() + defer s.downloadMutex.RUnlock() + + transferIDs := make([]string, 0, len(s.activeDownloads)) + for id := range s.activeDownloads { + transferIDs = append(transferIDs, id) + } + + return transferIDs +} + +// removeDownload removes a completed download from tracking. +// Idempotent: safe to call multiple times for the same transfer ID. +func (s *session) removeDownload(transferID string) { + s.downloadMutex.Lock() + defer s.downloadMutex.Unlock() + + state, exists := s.activeDownloads[transferID] + if !exists { + return + } + + // Close the reader if it implements io.Closer (e.g. *os.File). + if state.closer != nil { + state.closer.Close() + } + + delete(s.activeDownloads, transferID) + + s.transferWg.Done() +} + +// getUpload retrieves upload state for a given transfer ID. +func (s *session) getUpload(transferID string) *uploadState { + s.uploadMutex.RLock() + defer s.uploadMutex.RUnlock() + + return s.activeUploads[transferID] +} + +// removeUpload removes a completed upload from tracking. +// Idempotent: safe to call multiple times for the same transfer ID. +func (s *session) removeUpload(transferID string) { + s.uploadMutex.Lock() + defer s.uploadMutex.Unlock() + + state, exists := s.activeUploads[transferID] + if !exists { + return + } + + if state.file != nil { + state.file.CloseWithError(fmt.Errorf("transfer removed")) + } + + delete(s.activeUploads, transferID) + + s.transferWg.Done() +} + +// getDownload retrieves download state for a given transfer ID. +func (s *session) getDownload(transferID string) *downloadState { + s.downloadMutex.RLock() + defer s.downloadMutex.RUnlock() + + return s.activeDownloads[transferID] +} + +// isDownloadAwaitingAck checks if a download is waiting for client TRANSFER_COMPLETE. +func (s *session) isDownloadAwaitingAck(transferID string) bool { + s.downloadMutex.RLock() + defer s.downloadMutex.RUnlock() + + if state, exists := s.activeDownloads[transferID]; exists { + return state.awaitingFinalAck + } + + return false +} + +// markDownloadAwaitingAck marks a download as waiting for client TRANSFER_COMPLETE. +func (s *session) markDownloadAwaitingAck(transferID string) { + s.downloadMutex.Lock() + defer s.downloadMutex.Unlock() + + if state, exists := s.activeDownloads[transferID]; exists { + state.awaitingFinalAck = true + } +} + +// IsShuttingDown checks if the session is in graceful shutdown mode. +func (s *session) IsShuttingDown() bool { + s.shutdownMutex.Lock() + defer s.shutdownMutex.Unlock() + + return s.isShuttingDown +} + +// Shutdown initiates graceful shutdown - signals that module execution is complete +// and workers should finish any pending file transfers before exiting. +// Workers will stop accepting new module requests but continue processing file transfers. +func (s *session) Shutdown() { + s.shutdownMutex.Lock() + defer s.shutdownMutex.Unlock() + + if s.isShuttingDown { + return // Already shutting down + } + + log.Print("Session: Initiating graceful shutdown") + + s.isShuttingDown = true + close(s.shutdownCh) // Signal workers that module is done +} + +// WaitForTransfers blocks until all active transfers complete. +func (s *session) WaitForTransfers() { + s.transferWg.Wait() +} + +// abortTransfers tears down every still-active transfer and balances the +// transfer wait group. It is called once the stream workers have exited (e.g. +// the client disconnected mid-transfer) so a half-finished transfer cannot +// wedge WaitForTransfers — and therefore graceful shutdown — forever. +func (s *session) abortTransfers() { + s.uploadMutex.Lock() + + for id, state := range s.activeUploads { + if state.file != nil { + state.file.CloseWithError(fmt.Errorf("transfer aborted: stream closed")) + } + + delete(s.activeUploads, id) + s.transferWg.Done() + } + + s.uploadMutex.Unlock() + + s.downloadMutex.Lock() + + for id, state := range s.activeDownloads { + if state.closer != nil { + state.closer.Close() + } + + delete(s.activeDownloads, id) + s.transferWg.Done() + } + + s.downloadMutex.Unlock() } diff --git a/internal/dutagent/worker.go b/internal/dutagent/worker.go index 62484fe9..a6f448ee 100644 --- a/internal/dutagent/worker.go +++ b/internal/dutagent/worker.go @@ -11,94 +11,299 @@ import ( "io" "log" - "github.com/BlindspotSoftware/dutctl/internal/chanio" - pb "github.com/BlindspotSoftware/dutctl/protobuf/gen/dutctl/v1" ) +// sendToClient serializes all sends on the bidirectional stream and recovers +// from a panic that can occur when the stream is already closed during graceful +// shutdown. The connect BidiStream is not safe for concurrent Send calls, and +// both workers send responses, so every send goes through this single lock. +func (s *session) sendToClient(stream Stream, res *pb.RunResponse) (err error) { + s.sendMu.Lock() + defer s.sendMu.Unlock() + + defer func() { + if r := recover(); r != nil { + log.Printf("Recovered from panic in stream.Send: %v", r) + + err = nil + } + }() + + return stream.Send(res) +} + +// sendDownloadError sends an error response for a download transfer and cleans up. +func sendDownloadError(stream Stream, s *session, transferID string, downloadMetadataSent map[string]bool, err error) { + log.Printf("Error getting chunk for transfer %s: %v", transferID, err) + + res := &pb.RunResponse{ + Msg: &pb.RunResponse_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_ERROR, + ErrorMessage: fmt.Sprintf("error reading file: %v", err), + NextChunkExpected: 0, + }, + }, + } + + sendErr := s.sendToClient(stream, res) + if sendErr != nil { + log.Printf("handleDownloadFileTransfer: error sending error response: %v", sendErr) + } + + s.removeDownload(transferID) + delete(downloadMetadataSent, transferID) +} + +// sendDownloadMetadata sends the file metadata to the client. +// Returns true if metadata was sent, false otherwise. +func sendDownloadMetadata(stream Stream, s *session, transferID string, downloadMetadataSent map[string]bool) bool { + if downloadMetadataSent[transferID] { + return false + } + + download := s.getDownload(transferID) + if download == nil { + return false + } + + res := &pb.RunResponse{ + Msg: &pb.RunResponse_FileTransferRequest{ + FileTransferRequest: &pb.FileTransferRequest{ + TransferId: transferID, + Metadata: download.metadata, + Direction: pb.FileTransferRequest_DOWNLOAD, + }, + }, + } + + sendErr := s.sendToClient(stream, res) + if sendErr != nil { + log.Printf("handleDownloadFileTransfer: error sending metadata: %v", sendErr) + + return false + } + + downloadMetadataSent[transferID] = true + + return true +} + +// handleDownloadFileTransfer processes a single download transfer for sending to the client. +// Returns true if work was done (a message was sent), false if nothing to do. +func handleDownloadFileTransfer(stream Stream, s *session, transferID string, downloadMetadataSent map[string]bool) bool { + // Skip if waiting for client acknowledgment + if s.isDownloadAwaitingAck(transferID) { + return false + } + + // Send metadata first + if !downloadMetadataSent[transferID] { + return sendDownloadMetadata(stream, s, transferID, downloadMetadataSent) + } + + // Get next chunk + chunk, isFinal, err := s.getNextChunk(transferID) + if err != nil { + sendDownloadError(stream, s, transferID, downloadMetadataSent, err) + + return true + } + + if chunk == nil { + return false + } + + res := &pb.RunResponse{ + Msg: &pb.RunResponse_FileChunk{FileChunk: chunk}, + } + + sendErr := s.sendToClient(stream, res) + if sendErr != nil { + log.Printf("handleDownloadFileTransfer: error sending chunk: %v", sendErr) + + return false + } + + if isFinal { + s.markDownloadAwaitingAck(transferID) + } + + return true +} + +// processFileTransfers handles pending upload requests and download chunks. +// Returns true if any work was done. +func processFileTransfers(stream Stream, s *session, downloadMetadataSent map[string]bool) bool { + sent := false + + // Send FileTransferRequest for new uploads that haven't been announced yet. + if !s.IsShuttingDown() { + for _, transferID := range s.getActiveUploads() { + upload := s.getUpload(transferID) + if upload == nil { + continue + } + + // metadata is written by fromClientWorker under upload.mu, so read + // it (and requestSent) under the same lock. + upload.mu.Lock() + metadata := upload.metadata + alreadySent := upload.requestSent + upload.mu.Unlock() + + if metadata == nil || alreadySent { + continue + } + + res := &pb.RunResponse{ + Msg: &pb.RunResponse_FileTransferRequest{ + FileTransferRequest: &pb.FileTransferRequest{ + TransferId: transferID, + Metadata: metadata, + Direction: pb.FileTransferRequest_UPLOAD, + }, + }, + } + + sendErr := s.sendToClient(stream, res) + if sendErr != nil { + log.Printf("toClientWorker: error sending upload request: %v", sendErr) + + return sent + } + + upload.mu.Lock() + upload.requestSent = true + upload.mu.Unlock() + + sent = true + + break // One at a time + } + } + + // Send download metadata or chunks. Iterate active downloads and process + // the first one that has work available. + for _, transferID := range s.getActiveDownloads() { + if handleDownloadFileTransfer(stream, s, transferID, downloadMetadataSent) { + sent = true + + break // One at a time for fairness + } + } + + return sent +} + +// handleConsoleMessage handles stdout/stderr console messages. +func handleConsoleMessage(stream Stream, s *session, bytes []byte, isStderr bool) error { + // During shutdown, discard messages but don't send + if s.IsShuttingDown() { + return nil + } + + var res *pb.RunResponse + if isStderr { + res = &pb.RunResponse{ + Msg: &pb.RunResponse_Console{ + Console: &pb.Console{ + Data: &pb.Console_Stderr{Stderr: bytes}, + }, + }, + } + } else { + res = &pb.RunResponse{ + Msg: &pb.RunResponse_Console{ + Console: &pb.Console{ + Data: &pb.Console_Stdout{Stdout: bytes}, + }, + }, + } + } + + err := s.sendToClient(stream, res) + if err != nil { + if isStderr { + log.Printf("toClientWorker: error sending stderr: %v", err) + } else { + log.Printf("toClientWorker: error sending stdout: %v", err) + } + + return err + } + + return nil +} + // toClientWorker sends messages from the module session to the client. -// This function is an infinite loop. It terminates when the session's done channel is closed. +// This function is an infinite loop. It terminates when the context is cancelled. +// +// It handles: +// - Print/Console messages from modules. +// - FileTransferRequest messages with metadata to initiate downloads. +// - FileChunk messages for downloads (agent -> client). // -//nolint:cyclop, funlen +//nolint:cyclop // main select loop inherently has multiple cases func toClientWorker(ctx context.Context, stream Stream, s *session) error { + // Track which downloads have had their metadata sent + downloadMetadataSent := make(map[string]bool) + for { select { case <-ctx.Done(): return nil - case str := <-s.printCh: - res := &pb.RunResponse{ - Msg: &pb.RunResponse_Print{Print: &pb.Print{Text: []byte(str)}}, - } - err := stream.Send(res) - if err != nil { - return err - } - case bytes := <-s.stdoutCh: - res := &pb.RunResponse{ - Msg: &pb.RunResponse_Console{Console: &pb.Console{Data: &pb.Console_Stdout{Stdout: bytes}}}, + case str := <-s.printCh: + // During shutdown, discard messages but don't send + if s.IsShuttingDown() { + continue } - err := stream.Send(res) - if err != nil { - return err - } - case bytes := <-s.stderrCh: res := &pb.RunResponse{ - Msg: &pb.RunResponse_Console{Console: &pb.Console{Data: &pb.Console_Stderr{Stderr: bytes}}}, + Msg: &pb.RunResponse_Print{Print: &pb.Print{Text: []byte(str)}}, } - err := stream.Send(res) + err := s.sendToClient(stream, res) if err != nil { - return err - } - case name := <-s.fileReqCh: - res := &pb.RunResponse{ - Msg: &pb.RunResponse_FileRequest{FileRequest: &pb.FileRequest{Path: name}}, - } + log.Printf("toClientWorker: error sending print: %v", err) - err := stream.Send(res) - if err != nil { return err } - s.currentFile = name - case file := <-s.fileCh: - r, err := chanio.NewChanReader(file) + case bytes := <-s.stdoutCh: + err := handleConsoleMessage(stream, s, bytes, false) if err != nil { return err } - content, err := io.ReadAll(r) + case bytes := <-s.stderrCh: + err := handleConsoleMessage(stream, s, bytes, true) if err != nil { return err } - log.Printf("Received file from module, sending to client. Name: %q, Size %d", s.currentFile, len(content)) - - res := &pb.RunResponse{ - Msg: &pb.RunResponse_File{ - File: &pb.File{ - Path: s.currentFile, - Content: content, - }, - }, - } - - err = stream.Send(res) - if err != nil { - return err + case <-s.fileTransferNotifyCh: + if processFileTransfers(stream, s, downloadMetadataSent) { + // Re-signal: more work may be pending. + s.notifyFileTransfer() } - - s.currentFile = "" } } } -// fromClientWorker reads messages from the client and passes them to the module session. -// This function is an infinite loop. It terminates when the session's done channel is closed. +// fromClientWorker reads messages from the client and routes them appropriately. +// This function is an infinite loop. It terminates when an error (including io.EOF) occurs. // -//nolint:cyclop,funlen,gocognit +// It handles: +// - Console messages for interactive input +// - FileChunk messages for uploads (client -> agent) +// - FileTransferRequest messages to initiate downloads +// - FileTransferResponse messages to acknowledge transfers +// +//nolint:cyclop,funlen,gocognit,gocyclo,maintidx func fromClientWorker(ctx context.Context, stream Stream, s *session) error { type recvResult struct { req *pb.RunRequest @@ -107,6 +312,7 @@ func fromClientWorker(ctx context.Context, stream Stream, s *session) error { // Single goroutine performing blocking Receive calls and forwarding results. resCh := make(chan recvResult) + // Receive loop goroutine rationale: // // We offload blocking stream.Receive calls to this goroutine so the main select @@ -143,6 +349,7 @@ func fromClientWorker(ctx context.Context, stream Stream, s *session) error { default: return nil } + case r := <-resCh: if r.err != nil { if errors.Is(r.err, io.EOF) { @@ -153,8 +360,6 @@ func fromClientWorker(ctx context.Context, stream Stream, s *session) error { } if r.req == nil { // Defensive: shouldn't happen unless stream.Receive misbehaves - log.Println("Received nil request without error; ignoring") - continue } @@ -166,65 +371,174 @@ func fromClientWorker(ctx context.Context, stream Stream, s *session) error { case *pb.Console_Stdin: stdin := consoleMsg.Stdin if stdin == nil { - log.Println("Received nil stdin message") - continue } - log.Printf("Server received stdin from client: %q", string(stdin)) - select { case <-ctx.Done(): return nil case s.stdinCh <- stdin: } - log.Println("Passed stdin to module") - default: log.Printf("Unexpected Console message %T", consoleMsg) } - case *pb.RunRequest_File: - fileMsg := msg.File - if fileMsg == nil { - log.Println("Received empty file message") - return fmt.Errorf("bad file transfer: received empty file-message") + case *pb.RunRequest_FileChunk: + chunk := msg.FileChunk + if chunk == nil { + continue + } + + transferID := chunk.GetTransferId() + + // Register or update the upload with this chunk. + registerErr := s.registerUploadChunk(transferID, chunk) + if registerErr != nil { + log.Printf("Error registering upload chunk: %v", registerErr) + + // Send error response + res := &pb.RunResponse{ + Msg: &pb.RunResponse_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_ERROR, + ErrorMessage: fmt.Sprintf("error processing chunk: %v", registerErr), + }, + }, + } + + sendErr := s.sendToClient(stream, res) + if sendErr != nil { + return sendErr + } + + // Cleanup upload state - close pipe and remove from tracking + s.removeUpload(transferID) + + continue + } + + // Send acknowledgment. + res := &pb.RunResponse{ + Msg: &pb.RunResponse_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_CHUNK_RECEIVED, + NextChunkExpected: chunk.GetChunkNumber() + 1, + }, + }, } - if s.currentFile == "" { - log.Println("Received file without a request") + sendErr := s.sendToClient(stream, res) + if sendErr != nil { + log.Printf("fromClientWorker: error sending chunk acknowledgment: %v", sendErr) - return fmt.Errorf("bad file transfer: received file-message without a former request") + return sendErr } - path := fileMsg.GetPath() - content := fileMsg.GetContent() + // If this was the final chunk, send completion response + if chunk.GetIsFinal() { + res := &pb.RunResponse{ + Msg: &pb.RunResponse_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_TRANSFER_COMPLETE, + }, + }, + } + + sendErr := s.sendToClient(stream, res) + if sendErr != nil { + log.Printf("fromClientWorker: error sending transfer complete: %v", sendErr) - if content == nil { - log.Println("Received file message with empty content") + return sendErr + } - return fmt.Errorf("bad file transfer: received file-message without content") + s.removeUpload(transferID) + } + case *pb.RunRequest_FileTransferRequest: + ftReq := msg.FileTransferRequest + if ftReq == nil { + continue } - if path != s.currentFile { - log.Printf("Received unexpected file %q - ignoring!", path) + transferID := ftReq.GetTransferId() + metadata := ftReq.GetMetadata() + // Check if this is a known transfer (module called RequestFile) + upload := s.getUpload(transferID) + if upload == nil { + // Send rejection + res := &pb.RunResponse{ + Msg: &pb.RunResponse_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_TRANSFER_REJECTED, + ErrorMessage: "no matching request from module", + }, + }, + } + + sendErr := s.sendToClient(stream, res) + if sendErr != nil { + return sendErr + } - return fmt.Errorf("bad file transfer: received file-message %q but requested %q", path, s.currentFile) + continue } - log.Printf("Server received file %q from client", path) + // Update metadata with client's info (protected by mutex) + upload.mu.Lock() + upload.metadata = metadata + upload.mu.Unlock() + + // Send acceptance + res := &pb.RunResponse{ + Msg: &pb.RunResponse_FileTransferResponse{ + FileTransferResponse: &pb.FileTransferResponse{ + TransferId: transferID, + Status: pb.FileTransferResponse_ACCEPTED, + }, + }, + } + + sendErr := s.sendToClient(stream, res) + if sendErr != nil { + log.Printf("fromClientWorker: error sending acceptance: %v", sendErr) + + return sendErr + } - file := make(chan []byte, 1) - s.fileCh <- file - file <- content + case *pb.RunRequest_FileTransferResponse: + ftRes := msg.FileTransferResponse + if ftRes == nil { + continue + } + + transferID := ftRes.GetTransferId() + status := ftRes.GetStatus() + + switch status { + case pb.FileTransferResponse_ERROR: + s.removeDownload(transferID) + s.removeUpload(transferID) + case pb.FileTransferResponse_TRANSFER_COMPLETE: + // Remove download on client confirmation + if s.isDownloadAwaitingAck(transferID) { + s.removeDownload(transferID) + } + case pb.FileTransferResponse_TRANSFER_REJECTED: + s.removeUpload(transferID) + case pb.FileTransferResponse_CHUNK_RECEIVED: + // Used for upload flows, ignore for downloads + } - close(file) - log.Println("Passed file to module (buffered in the session)") + case *pb.RunRequest_Command: + // Command is handled by the broker, not here + // This shouldn't arrive during an active session - s.currentFile = "" default: - log.Printf("Unexpected message type %T", msg) + // Unexpected message type } } } diff --git a/internal/test/mock/session.go b/internal/test/mock/session.go index c76d716c..fac5d899 100644 --- a/internal/test/mock/session.go +++ b/internal/test/mock/session.go @@ -80,7 +80,7 @@ func (m *Session) RequestFile(name string) (io.Reader, error) { return m.RequestedFileResponse, nil } -func (m *Session) SendFile(name string, r io.Reader) error { +func (m *Session) SendFile(name string, _ int64, r io.Reader) error { m.SendFileCalled = true m.SentFileName = name diff --git a/pkg/module/dummy/dummy_file_transfer.go b/pkg/module/dummy/dummy_file_transfer.go index 3e275497..3a94880f 100644 --- a/pkg/module/dummy/dummy_file_transfer.go +++ b/pkg/module/dummy/dummy_file_transfer.go @@ -86,9 +86,9 @@ func (d *FT) Run(_ context.Context, s module.Session, args ...string) error { } outFile := args[1] - log.Printf("dummy.FT module: Sending back processed file %q", outFile) + log.Printf("dummy.FT module: Sending back processed file %q (%d bytes)", outFile, len(result)) - err = s.SendFile(outFile, bytes.NewBuffer(result)) + err = s.SendFile(outFile, int64(len(result)), bytes.NewBuffer(result)) if err != nil { return fmt.Errorf("failed to send file: %v", err) } diff --git a/pkg/module/file/file.go b/pkg/module/file/file.go index c7eb30e4..971b7201 100644 --- a/pkg/module/file/file.go +++ b/pkg/module/file/file.go @@ -271,15 +271,16 @@ func (f *File) downloadFile(sesh module.Session) error { return fmt.Errorf("failed to open source file %q: %w", f.sourcePath, err) } - // Open source file + // Open source file. + // Note: the session takes ownership of the file via SendFile and closes it + // when the transfer completes, so we must NOT defer Close here. srcFile, err := os.Open(f.sourcePath) if err != nil { return fmt.Errorf("failed to open source file %q: %w", f.sourcePath, err) } - defer srcFile.Close() - // Send file to client - err = sesh.SendFile(f.destPath, srcFile) + // Send file to client with size information + err = sesh.SendFile(f.destPath, fileInfo.Size(), srcFile) if err != nil { return fmt.Errorf("failed to send file to client: %w", err) } diff --git a/pkg/module/flash/flash.go b/pkg/module/flash/flash.go index 449f30fb..9fdf8dd7 100644 --- a/pkg/module/flash/flash.go +++ b/pkg/module/flash/flash.go @@ -256,7 +256,7 @@ func (f *Flash) cmdline() []string { return args } -// uploadImage receives the flash image file from sesh and saves is locally. +// uploadImage receives and saves the flash image from client. func uploadImage(sesh module.Session, remote, local string) error { img, err := sesh.RequestFile(remote) if err != nil { @@ -276,14 +276,19 @@ func uploadImage(sesh module.Session, remote, local string) error { return nil } -// downloadImage sends the local flash image file to sesh. +// downloadImage sends the flash image to client. func downloadImage(sesh module.Session, local, remote string) error { file, err := os.Open(local) if err != nil { return fmt.Errorf("open flash image on dutagent after read operation: %w", err) } - err = sesh.SendFile(remote, file) + fileInfo, err := file.Stat() + if err != nil { + return fmt.Errorf("stat flash image on dutagent after read operation: %w", err) + } + + err = sesh.SendFile(remote, fileInfo.Size(), file) if err != nil { return fmt.Errorf("send flash image to client after read operation: %w", err) } diff --git a/pkg/module/module.go b/pkg/module/module.go index 948252dc..c6b887bb 100644 --- a/pkg/module/module.go +++ b/pkg/module/module.go @@ -71,7 +71,8 @@ type Session interface { // The file is identified by its name and is made available to the module via the returned io.Reader. RequestFile(name string) (io.Reader, error) // SendFile sends a file to the client. - SendFile(name string, r io.Reader) error + // The size parameter specifies the total file size in bytes, which helps the client track progress. + SendFile(name string, size int64, r io.Reader) error } // Record holds the information required to register a module. diff --git a/protobuf/buf.gen.yaml b/protobuf/buf.gen.yaml index fd94ac64..8b78f9cb 100644 --- a/protobuf/buf.gen.yaml +++ b/protobuf/buf.gen.yaml @@ -1,8 +1,8 @@ -version: v2 +version: v1 plugins: - - local: protoc-gen-go + - plugin: go out: gen opt: paths=source_relative - - local: protoc-gen-connect-go + - plugin: connect-go out: gen opt: paths=source_relative \ No newline at end of file diff --git a/protobuf/buf.yaml b/protobuf/buf.yaml index 4cb6a23e..5fc4486b 100644 --- a/protobuf/buf.yaml +++ b/protobuf/buf.yaml @@ -1,4 +1,4 @@ -version: v2 +version: v1 lint: use: - DEFAULT diff --git a/protobuf/dutctl/v1/dutctl.proto b/protobuf/dutctl/v1/dutctl.proto index 8f04d11f..3ba86b91 100644 --- a/protobuf/dutctl/v1/dutctl.proto +++ b/protobuf/dutctl/v1/dutctl.proto @@ -68,7 +68,9 @@ message RunRequest { oneof msg { Command command = 1; Console console = 2; - File file = 3; + FileTransferRequest file_transfer_request = 3; + FileChunk file_chunk = 4; + FileTransferResponse file_transfer_response = 5; } } @@ -78,8 +80,9 @@ message RunResponse { oneof msg { Print print = 1; Console console = 2; - FileRequest file_request = 3; - File file = 4; + FileTransferRequest file_transfer_request = 3; + FileChunk file_chunk = 4; + FileTransferResponse file_transfer_response = 5; } } @@ -105,15 +108,51 @@ message Console { } } -// FileRequest is used by the agent to request a file from the client. -message FileRequest { - string path = 1; -} - -// File is used by the client and the agent to transfer a file. -message File { - string path = 1; - bytes content = 2; +// FileMetadata describes the properties of a file being transferred. +message FileMetadata { + string path = 1; // Path of the file + int64 size = 2; // Total file size in bytes + string name = 3; // Filename for reference +} + +// FileChunk represents a single chunk of a file being transferred. +// Maximum chunk size is 1MB. +message FileChunk { + string transfer_id = 1; // Unique ID for this transfer session + int32 chunk_number = 2; // Sequential chunk number (0-indexed) + bytes chunk_data = 3; // File data chunk (max 1MB) + int64 chunk_offset = 4; // Byte offset in the file + bool is_final = 5; // Whether this is the last chunk +} + +// FileTransferRequest initiates a file transfer. +// Sent by the side requesting to receive a file. +message FileTransferRequest { + string transfer_id = 1; // Unique ID for this transfer + FileMetadata metadata = 2; // File metadata + + enum Direction { + DIRECTION_UNSPECIFIED = 0; + UPLOAD = 1; // Client sending file to agent (agent requesting from client) + DOWNLOAD = 2; // Agent sending file to client + } + Direction direction = 3; // Direction of file transfer +} + +// FileTransferResponse acknowledges file transfer requests and chunk receipts. +message FileTransferResponse { + string transfer_id = 1; // Matches request ID + enum Status { + STATUS_UNSPECIFIED = 0; + ACCEPTED = 1; // Ready to receive chunks + CHUNK_RECEIVED = 2; // Chunk received successfully + TRANSFER_COMPLETE = 3; // All chunks received + TRANSFER_REJECTED = 4; // Transfer cannot proceed + ERROR = 5; // Transfer error occurred + } + Status status = 2; + int32 next_chunk_expected = 3; // Next chunk number expected (for recovery) + string error_message = 4; // Error details if status is ERROR } // LockRequest is sent by the client to acquire or extend a lock on a device. diff --git a/protobuf/gen/dutctl/v1/dutctl.pb.go b/protobuf/gen/dutctl/v1/dutctl.pb.go index 8bf030bb..93839524 100644 --- a/protobuf/gen/dutctl/v1/dutctl.pb.go +++ b/protobuf/gen/dutctl/v1/dutctl.pb.go @@ -21,6 +21,113 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type FileTransferRequest_Direction int32 + +const ( + FileTransferRequest_DIRECTION_UNSPECIFIED FileTransferRequest_Direction = 0 + FileTransferRequest_UPLOAD FileTransferRequest_Direction = 1 // Client sending file to agent (agent requesting from client) + FileTransferRequest_DOWNLOAD FileTransferRequest_Direction = 2 // Agent sending file to client +) + +// Enum value maps for FileTransferRequest_Direction. +var ( + FileTransferRequest_Direction_name = map[int32]string{ + 0: "DIRECTION_UNSPECIFIED", + 1: "UPLOAD", + 2: "DOWNLOAD", + } + FileTransferRequest_Direction_value = map[string]int32{ + "DIRECTION_UNSPECIFIED": 0, + "UPLOAD": 1, + "DOWNLOAD": 2, + } +) + +func (x FileTransferRequest_Direction) Enum() *FileTransferRequest_Direction { + p := new(FileTransferRequest_Direction) + *p = x + return p +} + +func (x FileTransferRequest_Direction) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (FileTransferRequest_Direction) Descriptor() protoreflect.EnumDescriptor { + return file_dutctl_v1_dutctl_proto_enumTypes[0].Descriptor() +} + +func (FileTransferRequest_Direction) Type() protoreflect.EnumType { + return &file_dutctl_v1_dutctl_proto_enumTypes[0] +} + +func (x FileTransferRequest_Direction) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use FileTransferRequest_Direction.Descriptor instead. +func (FileTransferRequest_Direction) EnumDescriptor() ([]byte, []int) { + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{15, 0} +} + +type FileTransferResponse_Status int32 + +const ( + FileTransferResponse_STATUS_UNSPECIFIED FileTransferResponse_Status = 0 + FileTransferResponse_ACCEPTED FileTransferResponse_Status = 1 // Ready to receive chunks + FileTransferResponse_CHUNK_RECEIVED FileTransferResponse_Status = 2 // Chunk received successfully + FileTransferResponse_TRANSFER_COMPLETE FileTransferResponse_Status = 3 // All chunks received + FileTransferResponse_TRANSFER_REJECTED FileTransferResponse_Status = 4 // Transfer cannot proceed + FileTransferResponse_ERROR FileTransferResponse_Status = 5 // Transfer error occurred +) + +// Enum value maps for FileTransferResponse_Status. +var ( + FileTransferResponse_Status_name = map[int32]string{ + 0: "STATUS_UNSPECIFIED", + 1: "ACCEPTED", + 2: "CHUNK_RECEIVED", + 3: "TRANSFER_COMPLETE", + 4: "TRANSFER_REJECTED", + 5: "ERROR", + } + FileTransferResponse_Status_value = map[string]int32{ + "STATUS_UNSPECIFIED": 0, + "ACCEPTED": 1, + "CHUNK_RECEIVED": 2, + "TRANSFER_COMPLETE": 3, + "TRANSFER_REJECTED": 4, + "ERROR": 5, + } +) + +func (x FileTransferResponse_Status) Enum() *FileTransferResponse_Status { + p := new(FileTransferResponse_Status) + *p = x + return p +} + +func (x FileTransferResponse_Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (FileTransferResponse_Status) Descriptor() protoreflect.EnumDescriptor { + return file_dutctl_v1_dutctl_proto_enumTypes[1].Descriptor() +} + +func (FileTransferResponse_Status) Type() protoreflect.EnumType { + return &file_dutctl_v1_dutctl_proto_enumTypes[1] +} + +func (x FileTransferResponse_Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use FileTransferResponse_Status.Descriptor instead. +func (FileTransferResponse_Status) EnumDescriptor() ([]byte, []int) { + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{16, 0} +} + // ListRequest is sent by the client to request a list of devices connected to the agent. type ListRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -426,7 +533,9 @@ type RunRequest struct { // // *RunRequest_Command // *RunRequest_Console - // *RunRequest_File + // *RunRequest_FileTransferRequest + // *RunRequest_FileChunk + // *RunRequest_FileTransferResponse Msg isRunRequest_Msg `protobuf_oneof:"msg"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -487,10 +596,28 @@ func (x *RunRequest) GetConsole() *Console { return nil } -func (x *RunRequest) GetFile() *File { +func (x *RunRequest) GetFileTransferRequest() *FileTransferRequest { if x != nil { - if x, ok := x.Msg.(*RunRequest_File); ok { - return x.File + if x, ok := x.Msg.(*RunRequest_FileTransferRequest); ok { + return x.FileTransferRequest + } + } + return nil +} + +func (x *RunRequest) GetFileChunk() *FileChunk { + if x != nil { + if x, ok := x.Msg.(*RunRequest_FileChunk); ok { + return x.FileChunk + } + } + return nil +} + +func (x *RunRequest) GetFileTransferResponse() *FileTransferResponse { + if x != nil { + if x, ok := x.Msg.(*RunRequest_FileTransferResponse); ok { + return x.FileTransferResponse } } return nil @@ -508,15 +635,27 @@ type RunRequest_Console struct { Console *Console `protobuf:"bytes,2,opt,name=console,proto3,oneof"` } -type RunRequest_File struct { - File *File `protobuf:"bytes,3,opt,name=file,proto3,oneof"` +type RunRequest_FileTransferRequest struct { + FileTransferRequest *FileTransferRequest `protobuf:"bytes,3,opt,name=file_transfer_request,json=fileTransferRequest,proto3,oneof"` +} + +type RunRequest_FileChunk struct { + FileChunk *FileChunk `protobuf:"bytes,4,opt,name=file_chunk,json=fileChunk,proto3,oneof"` +} + +type RunRequest_FileTransferResponse struct { + FileTransferResponse *FileTransferResponse `protobuf:"bytes,5,opt,name=file_transfer_response,json=fileTransferResponse,proto3,oneof"` } func (*RunRequest_Command) isRunRequest_Msg() {} func (*RunRequest_Console) isRunRequest_Msg() {} -func (*RunRequest_File) isRunRequest_Msg() {} +func (*RunRequest_FileTransferRequest) isRunRequest_Msg() {} + +func (*RunRequest_FileChunk) isRunRequest_Msg() {} + +func (*RunRequest_FileTransferResponse) isRunRequest_Msg() {} // RunResponse is sent by the agent in response to a RunRequest and can either contain // just the output of the command (Print), or trigger further interaction with the client. @@ -526,8 +665,9 @@ type RunResponse struct { // // *RunResponse_Print // *RunResponse_Console - // *RunResponse_FileRequest - // *RunResponse_File + // *RunResponse_FileTransferRequest + // *RunResponse_FileChunk + // *RunResponse_FileTransferResponse Msg isRunResponse_Msg `protobuf_oneof:"msg"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -588,19 +728,28 @@ func (x *RunResponse) GetConsole() *Console { return nil } -func (x *RunResponse) GetFileRequest() *FileRequest { +func (x *RunResponse) GetFileTransferRequest() *FileTransferRequest { + if x != nil { + if x, ok := x.Msg.(*RunResponse_FileTransferRequest); ok { + return x.FileTransferRequest + } + } + return nil +} + +func (x *RunResponse) GetFileChunk() *FileChunk { if x != nil { - if x, ok := x.Msg.(*RunResponse_FileRequest); ok { - return x.FileRequest + if x, ok := x.Msg.(*RunResponse_FileChunk); ok { + return x.FileChunk } } return nil } -func (x *RunResponse) GetFile() *File { +func (x *RunResponse) GetFileTransferResponse() *FileTransferResponse { if x != nil { - if x, ok := x.Msg.(*RunResponse_File); ok { - return x.File + if x, ok := x.Msg.(*RunResponse_FileTransferResponse); ok { + return x.FileTransferResponse } } return nil @@ -618,21 +767,27 @@ type RunResponse_Console struct { Console *Console `protobuf:"bytes,2,opt,name=console,proto3,oneof"` } -type RunResponse_FileRequest struct { - FileRequest *FileRequest `protobuf:"bytes,3,opt,name=file_request,json=fileRequest,proto3,oneof"` +type RunResponse_FileTransferRequest struct { + FileTransferRequest *FileTransferRequest `protobuf:"bytes,3,opt,name=file_transfer_request,json=fileTransferRequest,proto3,oneof"` } -type RunResponse_File struct { - File *File `protobuf:"bytes,4,opt,name=file,proto3,oneof"` +type RunResponse_FileChunk struct { + FileChunk *FileChunk `protobuf:"bytes,4,opt,name=file_chunk,json=fileChunk,proto3,oneof"` +} + +type RunResponse_FileTransferResponse struct { + FileTransferResponse *FileTransferResponse `protobuf:"bytes,5,opt,name=file_transfer_response,json=fileTransferResponse,proto3,oneof"` } func (*RunResponse_Print) isRunResponse_Msg() {} func (*RunResponse_Console) isRunResponse_Msg() {} -func (*RunResponse_FileRequest) isRunResponse_Msg() {} +func (*RunResponse_FileTransferRequest) isRunResponse_Msg() {} + +func (*RunResponse_FileChunk) isRunResponse_Msg() {} -func (*RunResponse_File) isRunResponse_Msg() {} +func (*RunResponse_FileTransferResponse) isRunResponse_Msg() {} // Command is used by the client to start a command execution on a device. type Command struct { @@ -840,28 +995,30 @@ func (*Console_Stdout) isConsole_Data() {} func (*Console_Stderr) isConsole_Data() {} -// FileRequest is used by the agent to request a file from the client. -type FileRequest struct { +// FileMetadata describes the properties of a file being transferred. +type FileMetadata struct { state protoimpl.MessageState `protogen:"open.v1"` - Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` // Path of the file + Size int64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"` // Total file size in bytes + Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` // Filename for reference unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } -func (x *FileRequest) Reset() { - *x = FileRequest{} +func (x *FileMetadata) Reset() { + *x = FileMetadata{} mi := &file_dutctl_v1_dutctl_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *FileRequest) String() string { +func (x *FileMetadata) String() string { return protoimpl.X.MessageStringOf(x) } -func (*FileRequest) ProtoMessage() {} +func (*FileMetadata) ProtoMessage() {} -func (x *FileRequest) ProtoReflect() protoreflect.Message { +func (x *FileMetadata) ProtoReflect() protoreflect.Message { mi := &file_dutctl_v1_dutctl_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -873,41 +1030,59 @@ func (x *FileRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use FileRequest.ProtoReflect.Descriptor instead. -func (*FileRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use FileMetadata.ProtoReflect.Descriptor instead. +func (*FileMetadata) Descriptor() ([]byte, []int) { return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{13} } -func (x *FileRequest) GetPath() string { +func (x *FileMetadata) GetPath() string { if x != nil { return x.Path } return "" } -// File is used by the client and the agent to transfer a file. -type File struct { +func (x *FileMetadata) GetSize() int64 { + if x != nil { + return x.Size + } + return 0 +} + +func (x *FileMetadata) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +// FileChunk represents a single chunk of a file being transferred. +// Maximum chunk size is 1MB. +type FileChunk struct { state protoimpl.MessageState `protogen:"open.v1"` - Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` - Content []byte `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"` + TransferId string `protobuf:"bytes,1,opt,name=transfer_id,json=transferId,proto3" json:"transfer_id,omitempty"` // Unique ID for this transfer session + ChunkNumber int32 `protobuf:"varint,2,opt,name=chunk_number,json=chunkNumber,proto3" json:"chunk_number,omitempty"` // Sequential chunk number (0-indexed) + ChunkData []byte `protobuf:"bytes,3,opt,name=chunk_data,json=chunkData,proto3" json:"chunk_data,omitempty"` // File data chunk (max 1MB) + ChunkOffset int64 `protobuf:"varint,4,opt,name=chunk_offset,json=chunkOffset,proto3" json:"chunk_offset,omitempty"` // Byte offset in the file + IsFinal bool `protobuf:"varint,5,opt,name=is_final,json=isFinal,proto3" json:"is_final,omitempty"` // Whether this is the last chunk unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } -func (x *File) Reset() { - *x = File{} +func (x *FileChunk) Reset() { + *x = FileChunk{} mi := &file_dutctl_v1_dutctl_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *File) String() string { +func (x *FileChunk) String() string { return protoimpl.X.MessageStringOf(x) } -func (*File) ProtoMessage() {} +func (*FileChunk) ProtoMessage() {} -func (x *File) ProtoReflect() protoreflect.Message { +func (x *FileChunk) ProtoReflect() protoreflect.Message { mi := &file_dutctl_v1_dutctl_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -919,25 +1094,177 @@ func (x *File) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use File.ProtoReflect.Descriptor instead. -func (*File) Descriptor() ([]byte, []int) { +// Deprecated: Use FileChunk.ProtoReflect.Descriptor instead. +func (*FileChunk) Descriptor() ([]byte, []int) { return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{14} } -func (x *File) GetPath() string { +func (x *FileChunk) GetTransferId() string { if x != nil { - return x.Path + return x.TransferId + } + return "" +} + +func (x *FileChunk) GetChunkNumber() int32 { + if x != nil { + return x.ChunkNumber + } + return 0 +} + +func (x *FileChunk) GetChunkData() []byte { + if x != nil { + return x.ChunkData + } + return nil +} + +func (x *FileChunk) GetChunkOffset() int64 { + if x != nil { + return x.ChunkOffset + } + return 0 +} + +func (x *FileChunk) GetIsFinal() bool { + if x != nil { + return x.IsFinal + } + return false +} + +// FileTransferRequest initiates a file transfer. +// Sent by the side requesting to receive a file. +type FileTransferRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + TransferId string `protobuf:"bytes,1,opt,name=transfer_id,json=transferId,proto3" json:"transfer_id,omitempty"` // Unique ID for this transfer + Metadata *FileMetadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"` // File metadata + Direction FileTransferRequest_Direction `protobuf:"varint,3,opt,name=direction,proto3,enum=dutctl.v1.FileTransferRequest_Direction" json:"direction,omitempty"` // Direction of file transfer + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FileTransferRequest) Reset() { + *x = FileTransferRequest{} + mi := &file_dutctl_v1_dutctl_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FileTransferRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FileTransferRequest) ProtoMessage() {} + +func (x *FileTransferRequest) ProtoReflect() protoreflect.Message { + mi := &file_dutctl_v1_dutctl_proto_msgTypes[15] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FileTransferRequest.ProtoReflect.Descriptor instead. +func (*FileTransferRequest) Descriptor() ([]byte, []int) { + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{15} +} + +func (x *FileTransferRequest) GetTransferId() string { + if x != nil { + return x.TransferId } return "" } -func (x *File) GetContent() []byte { +func (x *FileTransferRequest) GetMetadata() *FileMetadata { if x != nil { - return x.Content + return x.Metadata } return nil } +func (x *FileTransferRequest) GetDirection() FileTransferRequest_Direction { + if x != nil { + return x.Direction + } + return FileTransferRequest_DIRECTION_UNSPECIFIED +} + +// FileTransferResponse acknowledges file transfer requests and chunk receipts. +type FileTransferResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + TransferId string `protobuf:"bytes,1,opt,name=transfer_id,json=transferId,proto3" json:"transfer_id,omitempty"` // Matches request ID + Status FileTransferResponse_Status `protobuf:"varint,2,opt,name=status,proto3,enum=dutctl.v1.FileTransferResponse_Status" json:"status,omitempty"` + NextChunkExpected int32 `protobuf:"varint,3,opt,name=next_chunk_expected,json=nextChunkExpected,proto3" json:"next_chunk_expected,omitempty"` // Next chunk number expected (for recovery) + ErrorMessage string `protobuf:"bytes,4,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` // Error details if status is ERROR + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FileTransferResponse) Reset() { + *x = FileTransferResponse{} + mi := &file_dutctl_v1_dutctl_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FileTransferResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FileTransferResponse) ProtoMessage() {} + +func (x *FileTransferResponse) ProtoReflect() protoreflect.Message { + mi := &file_dutctl_v1_dutctl_proto_msgTypes[16] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FileTransferResponse.ProtoReflect.Descriptor instead. +func (*FileTransferResponse) Descriptor() ([]byte, []int) { + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{16} +} + +func (x *FileTransferResponse) GetTransferId() string { + if x != nil { + return x.TransferId + } + return "" +} + +func (x *FileTransferResponse) GetStatus() FileTransferResponse_Status { + if x != nil { + return x.Status + } + return FileTransferResponse_STATUS_UNSPECIFIED +} + +func (x *FileTransferResponse) GetNextChunkExpected() int32 { + if x != nil { + return x.NextChunkExpected + } + return 0 +} + +func (x *FileTransferResponse) GetErrorMessage() string { + if x != nil { + return x.ErrorMessage + } + return "" +} + // LockRequest is sent by the client to acquire or extend a lock on a device. // The lock owner identity is carried in an HTTP header, not in this message. type LockRequest struct { @@ -950,7 +1277,7 @@ type LockRequest struct { func (x *LockRequest) Reset() { *x = LockRequest{} - mi := &file_dutctl_v1_dutctl_proto_msgTypes[15] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -962,7 +1289,7 @@ func (x *LockRequest) String() string { func (*LockRequest) ProtoMessage() {} func (x *LockRequest) ProtoReflect() protoreflect.Message { - mi := &file_dutctl_v1_dutctl_proto_msgTypes[15] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[17] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -975,7 +1302,7 @@ func (x *LockRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use LockRequest.ProtoReflect.Descriptor instead. func (*LockRequest) Descriptor() ([]byte, []int) { - return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{15} + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{17} } func (x *LockRequest) GetDevice() string { @@ -1005,7 +1332,7 @@ type LockResponse struct { func (x *LockResponse) Reset() { *x = LockResponse{} - mi := &file_dutctl_v1_dutctl_proto_msgTypes[16] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1017,7 +1344,7 @@ func (x *LockResponse) String() string { func (*LockResponse) ProtoMessage() {} func (x *LockResponse) ProtoReflect() protoreflect.Message { - mi := &file_dutctl_v1_dutctl_proto_msgTypes[16] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[18] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1030,7 +1357,7 @@ func (x *LockResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use LockResponse.ProtoReflect.Descriptor instead. func (*LockResponse) Descriptor() ([]byte, []int) { - return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{16} + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{18} } func (x *LockResponse) GetDevice() string { @@ -1073,7 +1400,7 @@ type UnlockRequest struct { func (x *UnlockRequest) Reset() { *x = UnlockRequest{} - mi := &file_dutctl_v1_dutctl_proto_msgTypes[17] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1085,7 +1412,7 @@ func (x *UnlockRequest) String() string { func (*UnlockRequest) ProtoMessage() {} func (x *UnlockRequest) ProtoReflect() protoreflect.Message { - mi := &file_dutctl_v1_dutctl_proto_msgTypes[17] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[19] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1098,7 +1425,7 @@ func (x *UnlockRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use UnlockRequest.ProtoReflect.Descriptor instead. func (*UnlockRequest) Descriptor() ([]byte, []int) { - return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{17} + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{19} } func (x *UnlockRequest) GetDevice() string { @@ -1124,7 +1451,7 @@ type UnlockResponse struct { func (x *UnlockResponse) Reset() { *x = UnlockResponse{} - mi := &file_dutctl_v1_dutctl_proto_msgTypes[18] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1136,7 +1463,7 @@ func (x *UnlockResponse) String() string { func (*UnlockResponse) ProtoMessage() {} func (x *UnlockResponse) ProtoReflect() protoreflect.Message { - mi := &file_dutctl_v1_dutctl_proto_msgTypes[18] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1149,7 +1476,7 @@ func (x *UnlockResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use UnlockResponse.ProtoReflect.Descriptor instead. func (*UnlockResponse) Descriptor() ([]byte, []int) { - return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{18} + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{20} } // RegisterRequest is sent by a device agent to register with the relay server. @@ -1164,7 +1491,7 @@ type RegisterRequest struct { func (x *RegisterRequest) Reset() { *x = RegisterRequest{} - mi := &file_dutctl_v1_dutctl_proto_msgTypes[19] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1176,7 +1503,7 @@ func (x *RegisterRequest) String() string { func (*RegisterRequest) ProtoMessage() {} func (x *RegisterRequest) ProtoReflect() protoreflect.Message { - mi := &file_dutctl_v1_dutctl_proto_msgTypes[19] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1189,7 +1516,7 @@ func (x *RegisterRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterRequest.ProtoReflect.Descriptor instead. func (*RegisterRequest) Descriptor() ([]byte, []int) { - return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{19} + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{21} } func (x *RegisterRequest) GetDevices() []string { @@ -1216,7 +1543,7 @@ type RegisterResponse struct { func (x *RegisterResponse) Reset() { *x = RegisterResponse{} - mi := &file_dutctl_v1_dutctl_proto_msgTypes[20] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[22] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1228,7 +1555,7 @@ func (x *RegisterResponse) String() string { func (*RegisterResponse) ProtoMessage() {} func (x *RegisterResponse) ProtoReflect() protoreflect.Message { - mi := &file_dutctl_v1_dutctl_proto_msgTypes[20] + mi := &file_dutctl_v1_dutctl_proto_msgTypes[22] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1241,7 +1568,7 @@ func (x *RegisterResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterResponse.ProtoReflect.Descriptor instead. func (*RegisterResponse) Descriptor() ([]byte, []int) { - return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{20} + return file_dutctl_v1_dutctl_proto_rawDescGZIP(), []int{22} } var File_dutctl_v1_dutctl_proto protoreflect.FileDescriptor @@ -1270,18 +1597,23 @@ const file_dutctl_v1_dutctl_proto_rawDesc = "" + "\x03cmd\x18\x02 \x01(\tR\x03cmd\x12\x18\n" + "\akeyword\x18\x03 \x01(\tR\akeyword\"+\n" + "\x0fDetailsResponse\x12\x18\n" + - "\adetails\x18\x01 \x01(\tR\adetails\"\x9a\x01\n" + + "\adetails\x18\x01 \x01(\tR\adetails\"\xd9\x02\n" + "\n" + "RunRequest\x12.\n" + "\acommand\x18\x01 \x01(\v2\x12.dutctl.v1.CommandH\x00R\acommand\x12.\n" + - "\aconsole\x18\x02 \x01(\v2\x12.dutctl.v1.ConsoleH\x00R\aconsole\x12%\n" + - "\x04file\x18\x03 \x01(\v2\x0f.dutctl.v1.FileH\x00R\x04fileB\x05\n" + - "\x03msg\"\xd2\x01\n" + + "\aconsole\x18\x02 \x01(\v2\x12.dutctl.v1.ConsoleH\x00R\aconsole\x12T\n" + + "\x15file_transfer_request\x18\x03 \x01(\v2\x1e.dutctl.v1.FileTransferRequestH\x00R\x13fileTransferRequest\x125\n" + + "\n" + + "file_chunk\x18\x04 \x01(\v2\x14.dutctl.v1.FileChunkH\x00R\tfileChunk\x12W\n" + + "\x16file_transfer_response\x18\x05 \x01(\v2\x1f.dutctl.v1.FileTransferResponseH\x00R\x14fileTransferResponseB\x05\n" + + "\x03msg\"\xd4\x02\n" + "\vRunResponse\x12(\n" + "\x05print\x18\x01 \x01(\v2\x10.dutctl.v1.PrintH\x00R\x05print\x12.\n" + - "\aconsole\x18\x02 \x01(\v2\x12.dutctl.v1.ConsoleH\x00R\aconsole\x12;\n" + - "\ffile_request\x18\x03 \x01(\v2\x16.dutctl.v1.FileRequestH\x00R\vfileRequest\x12%\n" + - "\x04file\x18\x04 \x01(\v2\x0f.dutctl.v1.FileH\x00R\x04fileB\x05\n" + + "\aconsole\x18\x02 \x01(\v2\x12.dutctl.v1.ConsoleH\x00R\aconsole\x12T\n" + + "\x15file_transfer_request\x18\x03 \x01(\v2\x1e.dutctl.v1.FileTransferRequestH\x00R\x13fileTransferRequest\x125\n" + + "\n" + + "file_chunk\x18\x04 \x01(\v2\x14.dutctl.v1.FileChunkH\x00R\tfileChunk\x12W\n" + + "\x16file_transfer_response\x18\x05 \x01(\v2\x1f.dutctl.v1.FileTransferResponseH\x00R\x14fileTransferResponseB\x05\n" + "\x03msg\"O\n" + "\aCommand\x12\x16\n" + "\x06device\x18\x01 \x01(\tR\x06device\x12\x18\n" + @@ -1293,12 +1625,42 @@ const file_dutctl_v1_dutctl_proto_rawDesc = "" + "\x05stdin\x18\x01 \x01(\fH\x00R\x05stdin\x12\x18\n" + "\x06stdout\x18\x02 \x01(\fH\x00R\x06stdout\x12\x18\n" + "\x06stderr\x18\x03 \x01(\fH\x00R\x06stderrB\x06\n" + - "\x04data\"!\n" + - "\vFileRequest\x12\x12\n" + - "\x04path\x18\x01 \x01(\tR\x04path\"4\n" + - "\x04File\x12\x12\n" + - "\x04path\x18\x01 \x01(\tR\x04path\x12\x18\n" + - "\acontent\x18\x02 \x01(\fR\acontent\"P\n" + + "\x04data\"J\n" + + "\fFileMetadata\x12\x12\n" + + "\x04path\x18\x01 \x01(\tR\x04path\x12\x12\n" + + "\x04size\x18\x02 \x01(\x03R\x04size\x12\x12\n" + + "\x04name\x18\x03 \x01(\tR\x04name\"\xac\x01\n" + + "\tFileChunk\x12\x1f\n" + + "\vtransfer_id\x18\x01 \x01(\tR\n" + + "transferId\x12!\n" + + "\fchunk_number\x18\x02 \x01(\x05R\vchunkNumber\x12\x1d\n" + + "\n" + + "chunk_data\x18\x03 \x01(\fR\tchunkData\x12!\n" + + "\fchunk_offset\x18\x04 \x01(\x03R\vchunkOffset\x12\x19\n" + + "\bis_final\x18\x05 \x01(\bR\aisFinal\"\xf5\x01\n" + + "\x13FileTransferRequest\x12\x1f\n" + + "\vtransfer_id\x18\x01 \x01(\tR\n" + + "transferId\x123\n" + + "\bmetadata\x18\x02 \x01(\v2\x17.dutctl.v1.FileMetadataR\bmetadata\x12F\n" + + "\tdirection\x18\x03 \x01(\x0e2(.dutctl.v1.FileTransferRequest.DirectionR\tdirection\"@\n" + + "\tDirection\x12\x19\n" + + "\x15DIRECTION_UNSPECIFIED\x10\x00\x12\n" + + "\n" + + "\x06UPLOAD\x10\x01\x12\f\n" + + "\bDOWNLOAD\x10\x02\"\xc9\x02\n" + + "\x14FileTransferResponse\x12\x1f\n" + + "\vtransfer_id\x18\x01 \x01(\tR\n" + + "transferId\x12>\n" + + "\x06status\x18\x02 \x01(\x0e2&.dutctl.v1.FileTransferResponse.StatusR\x06status\x12.\n" + + "\x13next_chunk_expected\x18\x03 \x01(\x05R\x11nextChunkExpected\x12#\n" + + "\rerror_message\x18\x04 \x01(\tR\ferrorMessage\"{\n" + + "\x06Status\x12\x16\n" + + "\x12STATUS_UNSPECIFIED\x10\x00\x12\f\n" + + "\bACCEPTED\x10\x01\x12\x12\n" + + "\x0eCHUNK_RECEIVED\x10\x02\x12\x15\n" + + "\x11TRANSFER_COMPLETE\x10\x03\x12\x15\n" + + "\x11TRANSFER_REJECTED\x10\x04\x12\t\n" + + "\x05ERROR\x10\x05\"P\n" + "\vLockRequest\x12\x16\n" + "\x06device\x18\x01 \x01(\tR\x06device\x12)\n" + "\x10duration_seconds\x18\x02 \x01(\x03R\x0fdurationSeconds\"x\n" + @@ -1338,59 +1700,70 @@ func file_dutctl_v1_dutctl_proto_rawDescGZIP() []byte { return file_dutctl_v1_dutctl_proto_rawDescData } -var file_dutctl_v1_dutctl_proto_msgTypes = make([]protoimpl.MessageInfo, 21) +var file_dutctl_v1_dutctl_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_dutctl_v1_dutctl_proto_msgTypes = make([]protoimpl.MessageInfo, 23) var file_dutctl_v1_dutctl_proto_goTypes = []any{ - (*ListRequest)(nil), // 0: dutctl.v1.ListRequest - (*ListResponse)(nil), // 1: dutctl.v1.ListResponse - (*DeviceInfo)(nil), // 2: dutctl.v1.DeviceInfo - (*LockInfo)(nil), // 3: dutctl.v1.LockInfo - (*CommandsRequest)(nil), // 4: dutctl.v1.CommandsRequest - (*CommandsResponse)(nil), // 5: dutctl.v1.CommandsResponse - (*DetailsRequest)(nil), // 6: dutctl.v1.DetailsRequest - (*DetailsResponse)(nil), // 7: dutctl.v1.DetailsResponse - (*RunRequest)(nil), // 8: dutctl.v1.RunRequest - (*RunResponse)(nil), // 9: dutctl.v1.RunResponse - (*Command)(nil), // 10: dutctl.v1.Command - (*Print)(nil), // 11: dutctl.v1.Print - (*Console)(nil), // 12: dutctl.v1.Console - (*FileRequest)(nil), // 13: dutctl.v1.FileRequest - (*File)(nil), // 14: dutctl.v1.File - (*LockRequest)(nil), // 15: dutctl.v1.LockRequest - (*LockResponse)(nil), // 16: dutctl.v1.LockResponse - (*UnlockRequest)(nil), // 17: dutctl.v1.UnlockRequest - (*UnlockResponse)(nil), // 18: dutctl.v1.UnlockResponse - (*RegisterRequest)(nil), // 19: dutctl.v1.RegisterRequest - (*RegisterResponse)(nil), // 20: dutctl.v1.RegisterResponse + (FileTransferRequest_Direction)(0), // 0: dutctl.v1.FileTransferRequest.Direction + (FileTransferResponse_Status)(0), // 1: dutctl.v1.FileTransferResponse.Status + (*ListRequest)(nil), // 2: dutctl.v1.ListRequest + (*ListResponse)(nil), // 3: dutctl.v1.ListResponse + (*DeviceInfo)(nil), // 4: dutctl.v1.DeviceInfo + (*LockInfo)(nil), // 5: dutctl.v1.LockInfo + (*CommandsRequest)(nil), // 6: dutctl.v1.CommandsRequest + (*CommandsResponse)(nil), // 7: dutctl.v1.CommandsResponse + (*DetailsRequest)(nil), // 8: dutctl.v1.DetailsRequest + (*DetailsResponse)(nil), // 9: dutctl.v1.DetailsResponse + (*RunRequest)(nil), // 10: dutctl.v1.RunRequest + (*RunResponse)(nil), // 11: dutctl.v1.RunResponse + (*Command)(nil), // 12: dutctl.v1.Command + (*Print)(nil), // 13: dutctl.v1.Print + (*Console)(nil), // 14: dutctl.v1.Console + (*FileMetadata)(nil), // 15: dutctl.v1.FileMetadata + (*FileChunk)(nil), // 16: dutctl.v1.FileChunk + (*FileTransferRequest)(nil), // 17: dutctl.v1.FileTransferRequest + (*FileTransferResponse)(nil), // 18: dutctl.v1.FileTransferResponse + (*LockRequest)(nil), // 19: dutctl.v1.LockRequest + (*LockResponse)(nil), // 20: dutctl.v1.LockResponse + (*UnlockRequest)(nil), // 21: dutctl.v1.UnlockRequest + (*UnlockResponse)(nil), // 22: dutctl.v1.UnlockResponse + (*RegisterRequest)(nil), // 23: dutctl.v1.RegisterRequest + (*RegisterResponse)(nil), // 24: dutctl.v1.RegisterResponse } var file_dutctl_v1_dutctl_proto_depIdxs = []int32{ - 2, // 0: dutctl.v1.ListResponse.devices:type_name -> dutctl.v1.DeviceInfo - 3, // 1: dutctl.v1.DeviceInfo.lock:type_name -> dutctl.v1.LockInfo - 10, // 2: dutctl.v1.RunRequest.command:type_name -> dutctl.v1.Command - 12, // 3: dutctl.v1.RunRequest.console:type_name -> dutctl.v1.Console - 14, // 4: dutctl.v1.RunRequest.file:type_name -> dutctl.v1.File - 11, // 5: dutctl.v1.RunResponse.print:type_name -> dutctl.v1.Print - 12, // 6: dutctl.v1.RunResponse.console:type_name -> dutctl.v1.Console - 13, // 7: dutctl.v1.RunResponse.file_request:type_name -> dutctl.v1.FileRequest - 14, // 8: dutctl.v1.RunResponse.file:type_name -> dutctl.v1.File - 0, // 9: dutctl.v1.DeviceService.List:input_type -> dutctl.v1.ListRequest - 4, // 10: dutctl.v1.DeviceService.Commands:input_type -> dutctl.v1.CommandsRequest - 6, // 11: dutctl.v1.DeviceService.Details:input_type -> dutctl.v1.DetailsRequest - 8, // 12: dutctl.v1.DeviceService.Run:input_type -> dutctl.v1.RunRequest - 15, // 13: dutctl.v1.DeviceService.Lock:input_type -> dutctl.v1.LockRequest - 17, // 14: dutctl.v1.DeviceService.Unlock:input_type -> dutctl.v1.UnlockRequest - 19, // 15: dutctl.v1.RelayService.Register:input_type -> dutctl.v1.RegisterRequest - 1, // 16: dutctl.v1.DeviceService.List:output_type -> dutctl.v1.ListResponse - 5, // 17: dutctl.v1.DeviceService.Commands:output_type -> dutctl.v1.CommandsResponse - 7, // 18: dutctl.v1.DeviceService.Details:output_type -> dutctl.v1.DetailsResponse - 9, // 19: dutctl.v1.DeviceService.Run:output_type -> dutctl.v1.RunResponse - 16, // 20: dutctl.v1.DeviceService.Lock:output_type -> dutctl.v1.LockResponse - 18, // 21: dutctl.v1.DeviceService.Unlock:output_type -> dutctl.v1.UnlockResponse - 20, // 22: dutctl.v1.RelayService.Register:output_type -> dutctl.v1.RegisterResponse - 16, // [16:23] is the sub-list for method output_type - 9, // [9:16] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 4, // 0: dutctl.v1.ListResponse.devices:type_name -> dutctl.v1.DeviceInfo + 5, // 1: dutctl.v1.DeviceInfo.lock:type_name -> dutctl.v1.LockInfo + 12, // 2: dutctl.v1.RunRequest.command:type_name -> dutctl.v1.Command + 14, // 3: dutctl.v1.RunRequest.console:type_name -> dutctl.v1.Console + 17, // 4: dutctl.v1.RunRequest.file_transfer_request:type_name -> dutctl.v1.FileTransferRequest + 16, // 5: dutctl.v1.RunRequest.file_chunk:type_name -> dutctl.v1.FileChunk + 18, // 6: dutctl.v1.RunRequest.file_transfer_response:type_name -> dutctl.v1.FileTransferResponse + 13, // 7: dutctl.v1.RunResponse.print:type_name -> dutctl.v1.Print + 14, // 8: dutctl.v1.RunResponse.console:type_name -> dutctl.v1.Console + 17, // 9: dutctl.v1.RunResponse.file_transfer_request:type_name -> dutctl.v1.FileTransferRequest + 16, // 10: dutctl.v1.RunResponse.file_chunk:type_name -> dutctl.v1.FileChunk + 18, // 11: dutctl.v1.RunResponse.file_transfer_response:type_name -> dutctl.v1.FileTransferResponse + 15, // 12: dutctl.v1.FileTransferRequest.metadata:type_name -> dutctl.v1.FileMetadata + 0, // 13: dutctl.v1.FileTransferRequest.direction:type_name -> dutctl.v1.FileTransferRequest.Direction + 1, // 14: dutctl.v1.FileTransferResponse.status:type_name -> dutctl.v1.FileTransferResponse.Status + 2, // 15: dutctl.v1.DeviceService.List:input_type -> dutctl.v1.ListRequest + 6, // 16: dutctl.v1.DeviceService.Commands:input_type -> dutctl.v1.CommandsRequest + 8, // 17: dutctl.v1.DeviceService.Details:input_type -> dutctl.v1.DetailsRequest + 10, // 18: dutctl.v1.DeviceService.Run:input_type -> dutctl.v1.RunRequest + 19, // 19: dutctl.v1.DeviceService.Lock:input_type -> dutctl.v1.LockRequest + 21, // 20: dutctl.v1.DeviceService.Unlock:input_type -> dutctl.v1.UnlockRequest + 23, // 21: dutctl.v1.RelayService.Register:input_type -> dutctl.v1.RegisterRequest + 3, // 22: dutctl.v1.DeviceService.List:output_type -> dutctl.v1.ListResponse + 7, // 23: dutctl.v1.DeviceService.Commands:output_type -> dutctl.v1.CommandsResponse + 9, // 24: dutctl.v1.DeviceService.Details:output_type -> dutctl.v1.DetailsResponse + 11, // 25: dutctl.v1.DeviceService.Run:output_type -> dutctl.v1.RunResponse + 20, // 26: dutctl.v1.DeviceService.Lock:output_type -> dutctl.v1.LockResponse + 22, // 27: dutctl.v1.DeviceService.Unlock:output_type -> dutctl.v1.UnlockResponse + 24, // 28: dutctl.v1.RelayService.Register:output_type -> dutctl.v1.RegisterResponse + 22, // [22:29] is the sub-list for method output_type + 15, // [15:22] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name } func init() { file_dutctl_v1_dutctl_proto_init() } @@ -1401,13 +1774,16 @@ func file_dutctl_v1_dutctl_proto_init() { file_dutctl_v1_dutctl_proto_msgTypes[8].OneofWrappers = []any{ (*RunRequest_Command)(nil), (*RunRequest_Console)(nil), - (*RunRequest_File)(nil), + (*RunRequest_FileTransferRequest)(nil), + (*RunRequest_FileChunk)(nil), + (*RunRequest_FileTransferResponse)(nil), } file_dutctl_v1_dutctl_proto_msgTypes[9].OneofWrappers = []any{ (*RunResponse_Print)(nil), (*RunResponse_Console)(nil), - (*RunResponse_FileRequest)(nil), - (*RunResponse_File)(nil), + (*RunResponse_FileTransferRequest)(nil), + (*RunResponse_FileChunk)(nil), + (*RunResponse_FileTransferResponse)(nil), } file_dutctl_v1_dutctl_proto_msgTypes[12].OneofWrappers = []any{ (*Console_Stdin)(nil), @@ -1419,13 +1795,14 @@ func file_dutctl_v1_dutctl_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_dutctl_v1_dutctl_proto_rawDesc), len(file_dutctl_v1_dutctl_proto_rawDesc)), - NumEnums: 0, - NumMessages: 21, + NumEnums: 2, + NumMessages: 23, NumExtensions: 0, NumServices: 2, }, GoTypes: file_dutctl_v1_dutctl_proto_goTypes, DependencyIndexes: file_dutctl_v1_dutctl_proto_depIdxs, + EnumInfos: file_dutctl_v1_dutctl_proto_enumTypes, MessageInfos: file_dutctl_v1_dutctl_proto_msgTypes, }.Build() File_dutctl_v1_dutctl_proto = out.File From a70443c6872152cedace8bd30bee3e43d3606a93 Mon Sep 17 00:00:00 2001 From: llogen Date: Mon, 26 Jan 2026 11:00:24 +0100 Subject: [PATCH 2/2] fix: add newline to logs if not existing Signed-off-by: llogen --- internal/output/text.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/output/text.go b/internal/output/text.go index a49bb94a..f2204990 100644 --- a/internal/output/text.go +++ b/internal/output/text.go @@ -278,7 +278,12 @@ func (f *TextFormatter) writeModuleOutputTo(content Content, writer io.Writer) { // Print metadata before content f.writeMetadata(content, writer) - fmt.Fprint(writer, output) + // If output doesn't end with a newline, add one to separate from next output + if len(output) > 0 && output[len(output)-1] != '\n' { + fmt.Fprintln(writer, output) + } else { + fmt.Fprint(writer, output) + } } else { f.writeGeneralTo(content, writer) }