Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -176,10 +177,108 @@ func indentString(input string, indentSpaces int, numbering bool) string {
return strings.Join(lines, "\n")
}

const (
// web-safe hex equivalents for the frontend
WebColorRed = "#FF5555" // for color.FgRed
WebColorYellow = "#FFFF55" // for color.FgYellow
WebColorCyan = "#8BE9FD" // for color.FgCyan
WebColorMagenta = "#FF79C6" // for color.FgMagenta
)

type CtxMsg struct {
Message string `json:"msg"`
Depth int `json:"level"`
FullPath string `json:"fullPath"`
}

type WebErrorPayload struct {
IsLeafError bool `json:"isLeafError"`
Context []CtxMsg `json:"context,omitempty"`

Error string `json:"error"`
ErrorColor string `json:"errorColor"`

Hint string `json:"hint,omitempty"`
HintColor string `json:"hintColor,omitempty"`

StackTrace []string `json:"stackTrace,omitempty"`
}

func (e *LeafError) Format(f fmt.State, c rune) {
switch c {
case 'v':

// Below is the JSON output formatting for the web editor console
if f.Flag('#') {
payload := WebErrorPayload{
IsLeafError: true,
Error: e.ErrorWithCauses(),
ErrorColor: "#FF5555",
}

if e.Context != nil && len(e.Context.Visited) > 0 {
var previousNode NodeBaseInterface
for _, item := range e.Context.Visited {
currentNode := item.Node

// TODO: (Seb) improve this
// In the logs, group nodes appear twice, once when entered, and once
// when the group-outputs node comes back and executes the group node again.
// While this is expected for the execution flow, we don't want to have that
// in the logs, it looks very confusing.
if previousNode != nil &&
strings.HasPrefix(previousNode.GetNodeTypeId(), "core/group-outputs@") &&
strings.HasPrefix(currentNode.GetNodeTypeId(), "core/group@") {

previousNode = currentNode
continue
}

nodeNameOrLabel := currentNode.GetLabel()
if nodeNameOrLabel == "" {
nodeNameOrLabel = currentNode.GetName()
}

var msg string
if item.Execute {
msg = fmt.Sprintf("execute '%s'", nodeNameOrLabel)
} else {
msg = fmt.Sprintf("request input from '%s'", nodeNameOrLabel)
}

payload.Context = append(payload.Context, CtxMsg{
Message: msg,
Depth: strings.Count(currentNode.GetFullPath(), "/") + 1,
FullPath: currentNode.GetFullPath(),
})

previousNode = item.Node
}
}

payload.Hint = getErrorHint(e)
if payload.Hint != "" {
payload.HintColor = "#FFFF55"
}

if f.Flag('+') {
rawStack := e.StackTrace()
payload.StackTrace = strings.Split(rawStack, "\n")
}

jsonBytes, err := json.Marshal(payload)
if err != nil {
// Fallback in case of JSON error
fmt.Fprint(f, e.Error())
return
}

fmt.Fprint(f, string(jsonBytes))
return
}

// This below is the regular terminal output formatting

var (
tmpErrEmoji string
tmpHintEmoji string
Expand Down Expand Up @@ -245,6 +344,7 @@ func (e *LeafError) Format(f fmt.State, c rune) {

fmt.Fprint(f, output)
return

case 's':
fmt.Fprint(f, e.Error())
}
Expand Down
7 changes: 5 additions & 2 deletions core/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,6 @@ func LoadNode(parent NodeBaseInterface, parentId string, nodeData any, validate
return nil, "", nil
}

// 1. Check ID
// We attempt to get the ID. If it fails, we record the error but CONTINUE
// processing (if validating) to check Type, Inputs, and Outputs.
id, idErr := utils.GetTypedPropertyByPath[string](nodeI, "id")
Expand All @@ -571,7 +570,6 @@ func LoadNode(parent NodeBaseInterface, parentId string, nodeData any, validate
}
}

// 2. Check Type
// If Type is missing, loading "makes no sense" as we cannot select a factory.
// We must early out here.
nodeType, typeErr := utils.GetTypedPropertyByPath[string](nodeI, "type")
Expand All @@ -582,6 +580,8 @@ func LoadNode(parent NodeBaseInterface, parentId string, nodeData any, validate
return nil, "", nil
}

nodeLabel, _ := utils.GetTypedPropertyByPath[string](nodeI, "label")

var (
n NodeBaseInterface
factoryErrs []error
Expand Down Expand Up @@ -615,6 +615,9 @@ func LoadNode(parent NodeBaseInterface, parentId string, nodeData any, validate
}

if idErr == nil {
if nodeLabel != "" {
n.SetLabel(nodeLabel)
}
n.SetId(id)
if parentId != "" {
n.SetFullPath(parentId + "/" + id)
Expand Down
13 changes: 8 additions & 5 deletions core/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func (n *Inputs) GetInputValues() map[InputId]any {

func (n *Inputs) InputValueById(ec *ExecutionState, host NodeWithInputs, inputId InputId, inputArrayPortId *InputId) (value any, err error) {
var finalValue any
var valueFound bool
var inputDef InputDefinition
var inputDefExists bool

Expand All @@ -343,12 +344,11 @@ func (n *Inputs) InputValueById(ec *ExecutionState, host NodeWithInputs, inputId
dstIsGroupInputsNode := strings.HasPrefix(dataSource.DstNode.GetNodeTypeId(), "core/group-inputs@")
regardCache := !srcIsGroupNode && !srcIsGroupInputsNode && !srcIsGroupOutputsNode && !dstIsGroupInputsNode

var ok bool
if regardCache {
finalValue, ok = ec.GetDataFromOutputCache(dataSource.SrcNode.GetCacheId(), outputCacheIdForCache, cacheType)
finalValue, valueFound = ec.GetDataFromOutputCache(dataSource.SrcNode.GetCacheId(), outputCacheIdForCache, cacheType)
}

if ok {
if valueFound {
if utils.GetLogLevel() == utils.LogLevelDebug {
utils.LogOut.Debugf("PushNodeVisit: (cached) %s, execute: %t\n", dataSource.SrcNode.GetId(), false)
}
Expand Down Expand Up @@ -385,6 +385,7 @@ func (n *Inputs) InputValueById(ec *ExecutionState, host NodeWithInputs, inputId
}
}
finalValue = v
valueFound = true

if regardCache {
ec.CacheDataOutput(dataSource.SrcNode.GetCacheId(), outputCacheIdForCache, finalValue, cacheType)
Expand All @@ -400,10 +401,12 @@ func (n *Inputs) InputValueById(ec *ExecutionState, host NodeWithInputs, inputId
inputDef, inputDefExists = n.inputDefs[inputId]
}

if userExists && inputValue != nil {
if userExists {
finalValue = inputValue
valueFound = true
} else if inputDefExists && inputDef.Default != nil {
finalValue = inputDef.Default
valueFound = true
} else if inputDefExists && inputDef.Required {
return nil, CreateErr(ec, &ErrNoInputValue{}, "no value for input '%v' (%s)", inputDef.Name, inputId)
} else if inputDefExists {
Expand All @@ -414,7 +417,7 @@ func (n *Inputs) InputValueById(ec *ExecutionState, host NodeWithInputs, inputId
}
}

if finalValue == nil {
if !valueFound {
if inputDefExists {
return nil, CreateErr(ec, &ErrNoInputValue{}, "no value for input '%v' (%s)", inputDef.Name, inputId)
}
Expand Down
10 changes: 6 additions & 4 deletions core/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,19 @@ func (n *Outputs) SetOutputValue(ec *ExecutionState, outputId OutputId, value an
}

func isValueValidForOutput(value any, expectedType string) bool {

if expectedType == "any" || expectedType == "unknown" {
return true
}

// if its not unknown or any but nil then the value is not compatible
if value == nil {
return false
}

valueType := reflect.TypeOf(value)
kind := valueType.Kind()

if expectedType == "any" || expectedType == "unknown" {
return true
}

_, mappingExists := validKindsForExpectedType[expectedType]
if mappingExists {
_, valid := validKindsForExpectedType[expectedType][kind]
Expand Down
8 changes: 0 additions & 8 deletions nodes/http@v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,6 @@ func (n *HttpNode) ExecuteImpl(c *core.ExecutionState, inputId core.InputId, pre
defer resp.Body.Close()
}

// Ensure the input reader is closed in all cases.
// If closing the reader fails without a prior error,
// treat it as an error which is part of the connection op.
err = utils.SafeCloseReader(reader)
if err != nil && connErr == nil {
connErr = err
}

var dsf core.DataStreamFactory

var statusCode int
Expand Down
1 change: 1 addition & 0 deletions nodes/start@v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (n *StartNode) ExecuteEntry(c *core.ExecutionState, outputValues map[core.O

dsf := core.DataStreamFactory{
Reader: os.Stdin,
Length: -1,
}

err := n.Outputs.SetOutputValue(c, ni.Core_start_v1_Output_stdin, dsf, core.SetOutputValueOpts{})
Expand Down
5 changes: 4 additions & 1 deletion nodes/test@v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func Test_InputValueById_Casting(t *testing.T) {

SAME_AS_OUTPUT := core.OutputId("")

var constReader io.Reader = strings.NewReader("hello")

tests := []struct {
output any // the value for the output port

Expand Down Expand Up @@ -93,7 +95,8 @@ func Test_InputValueById_Casting(t *testing.T) {

// test stream ports
{core.DataStreamFactory{
Reader: strings.NewReader("hello"),
Reader: constReader,
Length: core.GetReaderLength(constReader),
}, ni.Core_test_v1_Output_output_stream_foo123, SAME_AS_OUTPUT, ni.Core_test_v1_Input_input_stream_foo123, core.DataStreamFactory{}, expectError{}},
{"hello", ni.Core_test_v1_Output_output_string_foo123, SAME_AS_OUTPUT, ni.Core_test_v1_Input_input_stream_foo123, strings.NewReader(""), expectError{}},
{"hello", ni.Core_test_v1_Output_output_string_foo123, SAME_AS_OUTPUT, ni.Core_test_v1_Input_input_stream_foo123, core.DataStreamFactory{}, expectError{}},
Expand Down
2 changes: 1 addition & 1 deletion sessions/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func runGraphFromConn(ctx context.Context, graphData string, opts core.RunOpts,
// send final error, even if error lines were already streamed
sendEncryptedJSON(ws, map[string]string{
"type": MsgTypeJobError,
"error": fmt.Sprintf("Graph execution failed: %v", runErr),
"error": fmt.Sprintf("%#v", runErr),
}, sharedKey)
return // Exit, the deferred lock release will still run
}
Expand Down
2 changes: 1 addition & 1 deletion tests_e2e/references/reference_app.sh_l12
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ hint:

stack trace:
github.com/actionforge/actrun-cli/core.RunGraphFromFile
graph.go:1028
graph.go:1031
github.com/actionforge/actrun-cli/cmd.cmdRootRun
cmd_root.go:175
github.com/spf13/cobra.(*Command).execute
Expand Down
8 changes: 4 additions & 4 deletions tests_e2e/references/reference_dir-walk.sh_l56
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ github.com/actionforge/actrun-cli/nodes.(*WalkNode).ExecuteImpl
github.com/actionforge/actrun-cli/core.(*Executions).Execute
executions.go:56
github.com/actionforge/actrun-cli/nodes.(*StartNode).ExecuteImpl
start@v1.go:49
start@v1.go:50
github.com/actionforge/actrun-cli/nodes.(*StartNode).ExecuteEntry
start@v1.go:44
start@v1.go:45
github.com/actionforge/actrun-cli/core.RunGraph
graph.go:426
github.com/actionforge/actrun-cli/core.RunGraphFromString
graph.go:1013
graph.go:1016
github.com/actionforge/actrun-cli/core.RunGraphFromFile
graph.go:1031
graph.go:1034
github.com/actionforge/actrun-cli/cmd.cmdRootRun
cmd_root.go:175
github.com/spf13/cobra.(*Command).execute
Expand Down
14 changes: 7 additions & 7 deletions tests_e2e/references/reference_error_no_output.sh_l8
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,25 @@ github.com/actionforge/actrun-cli/core.(*Outputs).OutputValueById
github.com/actionforge/actrun-cli/core.(*Inputs).InputValueById
inputs.go:364
github.com/actionforge/actrun-cli/core.inputValueById[...]
inputs.go:472
inputs.go:475
github.com/actionforge/actrun-cli/core.InputValueFromSubInputs[...]
inputs.go:467
inputs.go:470
github.com/actionforge/actrun-cli/core.InputArrayValueById[...]
inputs.go:549
inputs.go:552
github.com/actionforge/actrun-cli/nodes.(*PrintNode).ExecuteImpl
print@v1.go:27
github.com/actionforge/actrun-cli/core.(*Executions).Execute
executions.go:56
github.com/actionforge/actrun-cli/nodes.(*StartNode).ExecuteImpl
start@v1.go:49
start@v1.go:50
github.com/actionforge/actrun-cli/nodes.(*StartNode).ExecuteEntry
start@v1.go:44
start@v1.go:45
github.com/actionforge/actrun-cli/core.RunGraph
graph.go:426
github.com/actionforge/actrun-cli/core.RunGraphFromString
graph.go:1013
graph.go:1016
github.com/actionforge/actrun-cli/core.RunGraphFromFile
graph.go:1031
graph.go:1034
github.com/actionforge/actrun-cli/cmd.cmdRootRun
cmd_root.go:175
github.com/spf13/cobra.(*Command).execute
Expand Down
6 changes: 3 additions & 3 deletions tests_e2e/references/reference_group-error.sh_l8
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ github.com/actionforge/actrun-cli/nodes.(*GroupNode).ExecuteImpl
github.com/actionforge/actrun-cli/core.(*Executions).Execute
executions.go:56
github.com/actionforge/actrun-cli/nodes.(*StartNode).ExecuteImpl
start@v1.go:49
start@v1.go:50
github.com/actionforge/actrun-cli/nodes.(*StartNode).ExecuteEntry
start@v1.go:44
start@v1.go:45
github.com/actionforge/actrun-cli/core.RunGraph
graph.go:426
github.com/actionforge/actrun-cli/core.RunGraphFromString
graph.go:1013
graph.go:1016

4 changes: 2 additions & 2 deletions tests_e2e/references/reference_group-port-collision.sh_l13
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ github.com/actionforge/actrun-cli/core.LoadGraph
github.com/actionforge/actrun-cli/core.RunGraph
graph.go:275
github.com/actionforge/actrun-cli/core.RunGraphFromString
graph.go:1013
graph.go:1016
github.com/actionforge/actrun-cli/core.RunGraphFromFile
graph.go:1031
graph.go:1034
github.com/actionforge/actrun-cli/cmd.cmdRootRun
cmd_root.go:175
github.com/spf13/cobra.(*Command).execute
Expand Down
Loading
Loading