@@ -71,6 +71,7 @@ type openAPIOperation struct {
7171 Parameters []openAPIParameter `yaml:"parameters" json:"parameters"`
7272 RequestBody * openAPIRequestBody `yaml:"requestBody" json:"requestBody"`
7373 Responses map [string ]openAPIResponse `yaml:"responses" json:"responses"`
74+ XPipeline string `yaml:"x-pipeline" json:"x-pipeline"`
7475}
7576
7677// openAPIParameter describes a path, query, header, or cookie parameter.
@@ -116,13 +117,14 @@ type openAPISchema struct {
116117// OpenAPIModule parses an OpenAPI v3 spec and registers HTTP routes that
117118// validate incoming requests against the spec schemas.
118119type OpenAPIModule struct {
119- name string
120- cfg OpenAPIConfig
121- spec * openAPISpec
122- specBytes []byte // raw spec bytes for serving (original file content)
123- specJSON []byte // cached JSON-serialised spec for /openapi.json endpoint
124- routerName string
125- logger * slog.Logger
120+ name string
121+ cfg OpenAPIConfig
122+ spec * openAPISpec
123+ specBytes []byte // raw spec bytes for serving (original file content)
124+ specJSON []byte // cached JSON-serialised spec for /openapi.json endpoint
125+ routerName string
126+ logger * slog.Logger
127+ pipelineLookup PipelineLookupFn
126128}
127129
128130// NewOpenAPIModule creates a new OpenAPIModule with the given name and config.
@@ -199,6 +201,12 @@ func (m *OpenAPIModule) RequiresServices() []modular.ServiceDependency { return
199201// RouterName returns the optional explicit router module name to attach routes to.
200202func (m * OpenAPIModule ) RouterName () string { return m .routerName }
201203
204+ // SetPipelineLookup sets the function used to resolve pipeline names found in
205+ // x-pipeline operation extensions. This must be called before RegisterRoutes.
206+ func (m * OpenAPIModule ) SetPipelineLookup (fn PipelineLookupFn ) {
207+ m .pipelineLookup = fn
208+ }
209+
202210// RegisterRoutes attaches all spec paths (and optional Swagger UI / spec endpoints)
203211// to the given HTTPRouter.
204212func (m * OpenAPIModule ) RegisterRoutes (router HTTPRouter ) {
@@ -276,17 +284,22 @@ func (m *OpenAPIModule) RegisterRoutes(router HTTPRouter) {
276284// ---- Handler builders ----
277285
278286// buildRouteHandler creates an HTTPHandler that validates the request (if enabled)
279- // and returns a 501 Not Implemented stub response. In a full integration the
280- // caller would wrap this handler or replace the stub with real business logic .
287+ // and either executes the linked pipeline (if x-pipeline is set) or returns a 501
288+ // Not Implemented stub response .
281289func (m * OpenAPIModule ) buildRouteHandler (specPath , method string , op * openAPIOperation ) HTTPHandler {
282290 validateReq := m .cfg .Validation .Request
283- return & openAPIRouteHandler {
291+ h := & openAPIRouteHandler {
284292 module : m ,
285293 specPath : specPath ,
286294 method : method ,
287295 op : op ,
288296 validateReq : validateReq ,
289297 }
298+ if op .XPipeline != "" {
299+ h .pipelineName = op .XPipeline
300+ h .pipelineLookup = m .pipelineLookup
301+ }
302+ return h
290303}
291304
292305// buildSwaggerUIHandler returns an inline Swagger UI page that loads the spec
@@ -299,11 +312,13 @@ func (m *OpenAPIModule) buildSwaggerUIHandler(specURL string) HTTPHandler {
299312// ---- openAPIRouteHandler ----
300313
301314type openAPIRouteHandler struct {
302- module * OpenAPIModule
303- specPath string
304- method string
305- op * openAPIOperation
306- validateReq bool
315+ module * OpenAPIModule
316+ specPath string
317+ method string
318+ op * openAPIOperation
319+ validateReq bool
320+ pipelineName string
321+ pipelineLookup PipelineLookupFn
307322}
308323
309324func (h * openAPIRouteHandler ) Handle (w http.ResponseWriter , r * http.Request ) {
@@ -319,6 +334,58 @@ func (h *openAPIRouteHandler) Handle(w http.ResponseWriter, r *http.Request) {
319334 }
320335 }
321336
337+ // If x-pipeline is configured, execute the named pipeline.
338+ if h .pipelineName != "" {
339+ if h .pipelineLookup == nil {
340+ w .Header ().Set ("Content-Type" , "application/json" )
341+ w .WriteHeader (http .StatusInternalServerError )
342+ _ = json .NewEncoder (w ).Encode (map [string ]string {
343+ "error" : fmt .Sprintf ("pipeline lookup not configured for pipeline %q" , h .pipelineName ),
344+ })
345+ return
346+ }
347+
348+ pipeline , ok := h .pipelineLookup (h .pipelineName )
349+ if ! ok {
350+ w .Header ().Set ("Content-Type" , "application/json" )
351+ w .WriteHeader (http .StatusBadGateway )
352+ _ = json .NewEncoder (w ).Encode (map [string ]string {
353+ "error" : fmt .Sprintf ("pipeline %q not found" , h .pipelineName ),
354+ })
355+ return
356+ }
357+
358+ data := openAPIExtractRequestData (r )
359+
360+ rw := & trackedResponseWriter {ResponseWriter : w }
361+ ctx := context .WithValue (r .Context (), HTTPResponseWriterContextKey , rw )
362+ ctx = context .WithValue (ctx , HTTPRequestContextKey , r )
363+
364+ // Use a per-request shallow copy of the pipeline to avoid concurrent
365+ // mutations of shared pipeline state (e.g. sequence/event counters).
366+ pipelineCopy := * pipeline
367+ result , err := pipelineCopy .Execute (ctx , data )
368+ if err != nil {
369+ if ! rw .written {
370+ w .Header ().Set ("Content-Type" , "application/json" )
371+ w .WriteHeader (http .StatusInternalServerError )
372+ _ = json .NewEncoder (w ).Encode (map [string ]string {
373+ "error" : fmt .Sprintf ("pipeline execution failed: %v" , err ),
374+ })
375+ }
376+ return
377+ }
378+
379+ if rw .written {
380+ return
381+ }
382+
383+ w .Header ().Set ("Content-Type" , "application/json" )
384+ w .WriteHeader (http .StatusOK )
385+ _ = json .NewEncoder (w ).Encode (result .Current )
386+ return
387+ }
388+
322389 // Default stub: 501 Not Implemented
323390 // In a full integration callers wire their own handler on top of this module.
324391 w .Header ().Set ("Content-Type" , "application/json" )
@@ -758,3 +825,50 @@ func supportedContentTypes(content map[string]openAPIMediaType) string {
758825 sort .Strings (types )
759826 return strings .Join (types , ", " )
760827}
828+
829+ // openAPIExtractRequestData builds a trigger data map from an HTTP request,
830+ // extracting query parameters (first value per key) and, for JSON bodies,
831+ // the decoded top-level body fields (without overwriting query param values).
832+ // The request body is restored after reading so downstream handlers can still
833+ // consume it.
834+ func openAPIExtractRequestData (r * http.Request ) map [string ]any {
835+ const maxBodySize = 1 << 20 // 1 MiB limit for JSON body parsing
836+
837+ data := make (map [string ]any )
838+
839+ // Extract query parameters (first value per key).
840+ for k , v := range r .URL .Query () {
841+ if len (v ) > 0 {
842+ data [k ] = v [0 ]
843+ }
844+ }
845+
846+ // Extract JSON body fields if Content-Type is application/json.
847+ if r .Body != nil {
848+ ct := r .Header .Get ("Content-Type" )
849+ if idx := strings .Index (ct , ";" ); idx != - 1 {
850+ ct = strings .TrimSpace (ct [:idx ])
851+ } else {
852+ ct = strings .TrimSpace (ct )
853+ }
854+
855+ if strings .EqualFold (ct , "application/json" ) {
856+ bodyBytes , err := io .ReadAll (io .LimitReader (r .Body , maxBodySize ))
857+ if err == nil && len (bodyBytes ) > 0 {
858+ var bodyData map [string ]any
859+ if err := json .Unmarshal (bodyBytes , & bodyData ); err == nil {
860+ for k , v := range bodyData {
861+ // Do not overwrite query parameters with body fields.
862+ if _ , exists := data [k ]; ! exists {
863+ data [k ] = v
864+ }
865+ }
866+ }
867+ // Restore r.Body so downstream handlers can still read it.
868+ r .Body = io .NopCloser (bytes .NewReader (bodyBytes ))
869+ }
870+ }
871+ }
872+
873+ return data
874+ }
0 commit comments