diff --git a/module/api_v1_store.go b/module/api_v1_store.go index 8a471fd4..e68012d8 100644 --- a/module/api_v1_store.go +++ b/module/api_v1_store.go @@ -192,6 +192,17 @@ func (s *V1Store) initSchema() error { created_at TEXT NOT NULL, FOREIGN KEY (provider_id) REFERENCES iam_provider_configs(id) ON DELETE CASCADE ); + + CREATE TABLE IF NOT EXISTS workflow_permissions ( + id TEXT PRIMARY KEY, + workflow_id TEXT NOT NULL, + subject TEXT NOT NULL, + action TEXT NOT NULL DEFAULT 'read', + granted INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE + ); ` _, err := s.db.Exec(schema) if err != nil { diff --git a/module/app_container.go b/module/app_container.go index df3ee79d..64613387 100644 --- a/module/app_container.go +++ b/module/app_container.go @@ -2,6 +2,8 @@ package module import ( "fmt" + "os" + "path/filepath" "github.com/CrisisTextLine/modular" ) @@ -27,6 +29,7 @@ type AppContainerModule struct { current *AppDeployResult // current deployment state previous *AppDeployResult // last known-good deployment for rollback backend appContainerBackend + logger modular.Logger } // AppContainerSpec describes the desired state of an application container. @@ -111,6 +114,8 @@ func (m *AppContainerModule) Name() string { return m.name } // Init resolves the environment module and initialises the platform backend. func (m *AppContainerModule) Init(app modular.Application) error { + m.logger = app.Logger() + envName, _ := m.config["environment"].(string) if envName != "" { svc, ok := app.SvcRegistry()[envName] @@ -129,9 +134,20 @@ func (m *AppContainerModule) Init(app modular.Application) error { return fmt.Errorf("app.container %q: environment %q is not a platform.kubernetes or platform.ecs module (got %T)", m.name, envName, svc) } } else { - // Default to kubernetes mock when no environment is specified. - m.backend = &k8sAppBackend{} - m.platformType = "kubernetes" + // No environment configured: choose backend based on whether a kubeconfig is available. + kubeconfigPath := kubeconfigPath() + if kubeconfigPath != "" { + m.logger.Info("app.container: no environment configured, using kubernetes backend from kubeconfig", + "module", m.name, "kubeconfig", kubeconfigPath) + m.backend = &k8sAppBackend{} + m.platformType = "kubernetes" + } else { + m.logger.Warn("app.container: no environment configured and no kubeconfig found; defaulting to mock kubernetes backend", + "module", m.name, + "hint", "set 'environment' to a platform.kubernetes or platform.ecs module, or ensure KUBECONFIG / ~/.kube/config is present") + m.backend = &k8sAppBackend{} + m.platformType = "kubernetes" + } } m.spec = m.parseSpec() @@ -413,6 +429,24 @@ type K8sServicePortRef struct { Number int `json:"number"` } +// kubeconfigPath returns the path to the kubeconfig file if it exists, or an +// empty string. It checks KUBECONFIG env var first, then ~/.kube/config. +func kubeconfigPath() string { + if kc := os.Getenv("KUBECONFIG"); kc != "" { + kc = filepath.Clean(kc) + if _, err := os.Stat(kc); err == nil { //nolint:gosec // KUBECONFIG is a trusted env var + return kc + } + } + if home, err := os.UserHomeDir(); err == nil { + candidate := filepath.Join(home, ".kube", "config") + if _, err := os.Stat(candidate); err == nil { + return candidate + } + } + return "" +} + // ─── Kubernetes backend ─────────────────────────────────────────────────────── // k8sAppBackend implements appContainerBackend for platform.kubernetes environments. diff --git a/module/pipeline_step_build_binary.go b/module/pipeline_step_build_binary.go index 3bd1cd93..144b84f8 100644 --- a/module/pipeline_step_build_binary.go +++ b/module/pipeline_step_build_binary.go @@ -165,13 +165,26 @@ func (s *BuildBinaryStep) generateGoMod() string { } // generateMainGo returns the contents of the generated main.go file. +// The generated binary loads a workflow config (either embedded at compile-time +// or read from disk at runtime), builds the workflow engine, and runs it until +// SIGINT/SIGTERM is received. func (s *BuildBinaryStep) generateMainGo() string { var sb strings.Builder sb.WriteString("package main\n\n") sb.WriteString("import (\n") - sb.WriteString("\t_ \"embed\"\n") + if s.embedConfig { + sb.WriteString("\t_ \"embed\"\n") + } + sb.WriteString("\t\"context\"\n") sb.WriteString("\t\"fmt\"\n") + sb.WriteString("\t\"log/slog\"\n") sb.WriteString("\t\"os\"\n") + sb.WriteString("\t\"os/signal\"\n") + sb.WriteString("\t\"syscall\"\n") + sb.WriteString("\n") + sb.WriteString("\t\"github.com/CrisisTextLine/modular\"\n") + sb.WriteString("\tworkflow \"github.com/GoCodeAlone/workflow\"\n") + sb.WriteString("\t\"github.com/GoCodeAlone/workflow/config\"\n") sb.WriteString(")\n\n") if s.embedConfig { @@ -180,17 +193,51 @@ func (s *BuildBinaryStep) generateMainGo() string { } sb.WriteString("func main() {\n") + sb.WriteString("\tlogger := slog.New(slog.NewTextHandler(os.Stdout, nil))\n\n") + if s.embedConfig { sb.WriteString("\tif len(configYAML) == 0 {\n") sb.WriteString("\t\tfmt.Fprintln(os.Stderr, \"embedded config is empty\")\n") sb.WriteString("\t\tos.Exit(1)\n") - sb.WriteString("\t}\n") - sb.WriteString("\tfmt.Printf(\"Starting workflow app (config: %d bytes)\\n\", len(configYAML))\n") + sb.WriteString("\t}\n\n") + sb.WriteString("\tcfg, err := config.LoadFromString(string(configYAML))\n") } else { - sb.WriteString("\tfmt.Println(\"Starting workflow app\")\n") + sb.WriteString("\tcfgFile := \"app.yaml\"\n") + sb.WriteString("\tif len(os.Args) > 1 {\n") + sb.WriteString("\t\tcfgFile = os.Args[1]\n") + sb.WriteString("\t}\n\n") + sb.WriteString("\tcfg, err := config.LoadFromFile(cfgFile)\n") } - sb.WriteString("\t// TODO: wire up modular.NewStdApplication with embedded config\n") - sb.WriteString("\t_ = os.Args\n") + sb.WriteString("\tif err != nil {\n") + sb.WriteString("\t\tfmt.Fprintf(os.Stderr, \"parse config: %v\\n\", err)\n") + sb.WriteString("\t\tos.Exit(1)\n") + sb.WriteString("\t}\n\n") + + sb.WriteString("\tapp := modular.NewStdApplication(nil, logger)\n") + sb.WriteString("\tengine := workflow.NewStdEngine(app, logger)\n\n") + + sb.WriteString("\tif err := engine.BuildFromConfig(cfg); err != nil {\n") + sb.WriteString("\t\tfmt.Fprintf(os.Stderr, \"build engine: %v\\n\", err)\n") + sb.WriteString("\t\tos.Exit(1)\n") + sb.WriteString("\t}\n\n") + + sb.WriteString("\tctx, cancel := context.WithCancel(context.Background())\n") + sb.WriteString("\tdefer cancel()\n\n") + + sb.WriteString("\tif err := engine.Start(ctx); err != nil {\n") + sb.WriteString("\t\tfmt.Fprintf(os.Stderr, \"start engine: %v\\n\", err)\n") + sb.WriteString("\t\tos.Exit(1)\n") + sb.WriteString("\t}\n\n") + + sb.WriteString("\tsigCh := make(chan os.Signal, 1)\n") + sb.WriteString("\tsignal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)\n") + sb.WriteString("\t<-sigCh\n\n") + + sb.WriteString("\tcancel()\n") + sb.WriteString("\tif err := engine.Stop(context.Background()); err != nil {\n") + sb.WriteString("\t\tfmt.Fprintf(os.Stderr, \"stop engine: %v\\n\", err)\n") + sb.WriteString("\t\tos.Exit(1)\n") + sb.WriteString("\t}\n") sb.WriteString("}\n") return sb.String() } diff --git a/plugin/external/sdk/grpc_server.go b/plugin/external/sdk/grpc_server.go index 5c2aa7c7..7f460d3a 100644 --- a/plugin/external/sdk/grpc_server.go +++ b/plugin/external/sdk/grpc_server.go @@ -9,8 +9,6 @@ import ( goplugin "github.com/GoCodeAlone/go-plugin" pb "github.com/GoCodeAlone/workflow/plugin/external/proto" "github.com/google/uuid" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/structpb" ) @@ -364,8 +362,29 @@ func (s *grpcServer) GetConfigFragment(_ context.Context, _ *emptypb.Empty) (*pb // --- Service RPCs --- -func (s *grpcServer) InvokeService(_ context.Context, _ *pb.InvokeServiceRequest) (*pb.InvokeServiceResponse, error) { - return nil, status.Error(codes.Unimplemented, "InvokeService not implemented") +// InvokeService routes a service method call to the registered module identified +// by handle_id. The module must implement ServiceInvoker; if it does not, an +// error is returned to the host. +func (s *grpcServer) InvokeService(_ context.Context, req *pb.InvokeServiceRequest) (*pb.InvokeServiceResponse, error) { + s.mu.RLock() + inst, ok := s.modules[req.HandleId] + s.mu.RUnlock() + if !ok { + return &pb.InvokeServiceResponse{Error: fmt.Sprintf("unknown module handle: %s", req.HandleId)}, nil + } + + invoker, ok := inst.(ServiceInvoker) + if !ok { + return &pb.InvokeServiceResponse{ + Error: fmt.Sprintf("module handle %s does not implement ServiceInvoker", req.HandleId), + }, nil + } + + result, err := invoker.InvokeMethod(req.Method, structToMap(req.Args)) + if err != nil { + return &pb.InvokeServiceResponse{Error: err.Error()}, nil //nolint:nilerr // app error in response field + } + return &pb.InvokeServiceResponse{Result: mapToStruct(result)}, nil } // --- Message delivery RPC --- diff --git a/plugin/external/sdk/interfaces.go b/plugin/external/sdk/interfaces.go index 9b45d839..28407c45 100644 --- a/plugin/external/sdk/interfaces.go +++ b/plugin/external/sdk/interfaces.go @@ -124,3 +124,11 @@ type ConfigProvider interface { // ConfigFragment returns YAML config to merge into the host config. ConfigFragment() ([]byte, error) } + +// ServiceInvoker is optionally implemented by ModuleInstance to handle +// service method invocations from the host. The host calls InvokeService +// with a method name and a map of arguments; the implementation dispatches +// to the appropriate logic and returns a result map. +type ServiceInvoker interface { + InvokeMethod(method string, args map[string]any) (map[string]any, error) +}