Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/app/transpile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
add_cleanup_processor bool
inline bool
patterns_dir_path string
translate_dir_path string
)

func makeTranspileCmd() *cobra.Command {
Expand Down Expand Up @@ -66,11 +67,12 @@ func makeTranspileCmd() *cobra.Command {
cmd.Flags().BoolVar(&fidelity, "fidelity", true, "try to keep correct if-else semantic")
cmd.Flags().BoolVar(&add_cleanup_processor, "add_cleanup_processor", true, "add a cleanup processor to remove temporary fields created by the transpiler")
cmd.Flags().BoolVar(&inline, "inline", false, "whether the input is provided inline or via file paths(default false)")
cmd.Flags().StringVar(&patterns_dir_path, "patterns_dir_path", "", "directory containing pattern files")
cmd.Flags().StringVar(&patterns_dir_path, "patterns_dir_path", "", "directory containing the grok pattern_dir files. May be used to overwrite filter's defintion")
cmd.Flags().StringVar(&patterns_dir_path, "translate_dir_path", "", "directory containing translate filter file. May be used to overwrite filter's definition.")
return cmd
}

func runTranspile(cmd *cobra.Command, args []string) error {
check := transpile.New(pipeline_threshold, log_level, deal_with_error_locally, add_default_global_on_failure, fidelity, add_cleanup_processor, inline, patterns_dir_path)
check := transpile.New(pipeline_threshold, log_level, deal_with_error_locally, add_default_global_on_failure, fidelity, add_cleanup_processor, inline, patterns_dir_path, translate_dir_path)
return check.Run(args)
}
4 changes: 2 additions & 2 deletions internal/app/transpile/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ This document lists the components the transpiler currently recognizes and the G
| filter | [drop](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-drop) | `DealWithDrop` | ✅ |
| filter | [geoip](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-geoip) | `DealWithGeoIP` | Limited configuration options available |
| filter | [grok](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-grok) | `DealWithGrok` | Limited configuration options available |
| filter | [json](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-json) | `DealWithJSON` | Limited configuration options availale source and target. TODO: Add ecs_compatibility support|
| filter | [json](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-json) | `DealWithJSON` | Limited configuration options available source and target. TODO: Add ecs_compatibility support|
| filter | [kv](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-kv) | `DealWithKV` | Limited configuration options supported |
| filter | [mutate](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-mutate) | `DealWithMutate` | ✅ (merge not supported yet) |
| filter | [prune](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-prune) | `DealWithPrune` | Limited support |
| filter | [split](http://elastic.co/docs/reference/logstash/plugins/plugins-filters-split) | `DealWithUnfeasiblePlugin` | ❌ Returns an error and return an empty processor list. There is no way to replicate the split logic |
| filter | [syslog_pri](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-syslog_pri) | `DealWithSyslogPri` | ✅ (Converted as script) |
| filter | [translate](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-translate) | `DealWithTranslate` | ✅ (No external file dictionary available) |
| filter | [translate](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-translate) | `DealWithTranslate` | ✅ Limited configuration options available/ |
| filter | [truncate](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-truncate) | `DealWithTruncate` | ❌ Commented out in source (not active) |
| filter | [urldecode](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-urldecode) | `DealWithURLDecode` | ✅ (Charset not supported) |
| filter | [useragent](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-useragent) | `DealWithUserAgent` | ✅ (Caveats do apply with [ecs_compatibility: false](https://github.com/herrBez/baffo/issues/9)) |
Expand Down
73 changes: 71 additions & 2 deletions internal/app/transpile/transpile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transpile

import (
"bufio"
"encoding/csv"
"encoding/json"
"os"
"path"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.elastic.co/ecszerolog"
"go.yaml.in/yaml/v3"

"bytes"
"fmt"
Expand Down Expand Up @@ -41,9 +43,10 @@ type Transpile struct {
addCleanUpProcessor bool
inline bool
patterns_dir_path string
translate_dir_path string
}

func New(threshold int, log_level string, deal_with_error_locally bool, addDefaultGlobalOnFailure bool, fidelity bool, addCleanupProcessor bool, inline bool, patterns_dir_path string) Transpile {
func New(threshold int, log_level string, deal_with_error_locally bool, addDefaultGlobalOnFailure bool, fidelity bool, addCleanupProcessor bool, inline bool, patterns_dir_path string, translate_dir_path string) Transpile {
return Transpile{
threshold: threshold,
log_level: level[strings.ToLower(log_level)],
Expand All @@ -53,6 +56,7 @@ func New(threshold int, log_level string, deal_with_error_locally bool, addDefau
addCleanUpProcessor: addCleanupProcessor,
inline: inline,
patterns_dir_path: patterns_dir_path,
translate_dir_path: translate_dir_path,
}
}

Expand Down Expand Up @@ -1936,13 +1940,62 @@ func DealWithCSV(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor,
return ingestProcessors, onFailureProcessors
}

func loadDictionaryFromPath(filePath string) (map[string]string, error) {
dictionary := make(map[string]string)

// Verify file exists
info, err := os.Stat(filePath)
if err != nil {
return nil, fmt.Errorf("stat %q: %w", filePath, err)
}
if info.IsDir() {
return nil, fmt.Errorf("%q is a directory", filePath)
}

// Read file
content, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("read %q: %w", filePath, err)
}

// Detect format by file extension
ext := strings.ToLower(filepath.Ext(filePath))

switch ext {
case ".json":
if err := json.Unmarshal(content, &dictionary); err != nil {
return nil, fmt.Errorf("invalid JSON dictionary: %w", err)
}
case ".yaml", ".yml", "":
if err := yaml.Unmarshal(content, &dictionary); err != nil {
return nil, fmt.Errorf("invalid YAML dictionary: %w", err)
}
case ".csv":
r := csv.NewReader(strings.NewReader(string(content)))
records, err := r.ReadAll()
if err != nil {
return nil, fmt.Errorf("invalid CSV dictionary: %w", err)
}
for _, row := range records {
if len(row) != 2 {
return nil, errors.New("CSV rows must have exactly 2 columns (key,value)")
}
dictionary[strings.TrimSpace(row[0])] = strings.TrimSpace(row[1])
}
default:
return nil, fmt.Errorf("unsupported dictionary file type: %q", ext)
}

return dictionary, nil
}

func DealWithTranslate(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, []IngestProcessor) {
ingestProcessors := []IngestProcessor{}
onFailureProcessors := []IngestProcessor{}

proc := ScriptProcessor{}.WithTag(id).(ScriptProcessor)

params := make(map[string]interface{})
params := make(map[string]any)

var target *string = nil
ECSCompatibility := "v8" // We assume ECS Compatibility
Expand All @@ -1965,6 +2018,22 @@ func DealWithTranslate(plugin ast.Plugin, id string, t Transpile) ([]IngestProce
}
params["dictionary"] = dictionary

case "dictionary_path":
dictionaryPath := getStringAttributeString(attr)

// Make sure the dir path can be overwritten from command line
if t.translate_dir_path != "" {
baseDictionaryFile := filepath.Base(dictionaryPath)
dictionaryPath = fmt.Sprintf(t.translate_dir_path, baseDictionaryFile)
}

dict, err := loadDictionaryFromPath(dictionaryPath)
if err != nil {
log.Warn().Msgf("Could not load DictionaryFromPath %v", err)
}

params["dictionary"] = dict

case "ecs_compatibility":
ECSCompatibility = getStringAttributeString(attr)

Expand Down
Loading