From 70b91180157c12faddc9f931ec653e7f499f3f05 Mon Sep 17 00:00:00 2001 From: Mirko Bez Date: Wed, 18 Feb 2026 15:23:14 +0100 Subject: [PATCH 1/5] WIP #4: add first naive support for pattern definitions --- internal/app/transpile/transpile.go | 77 +++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 3 deletions(-) diff --git a/internal/app/transpile/transpile.go b/internal/app/transpile/transpile.go index 5c82e3c..6a3a0db 100644 --- a/internal/app/transpile/transpile.go +++ b/internal/app/transpile/transpile.go @@ -1,9 +1,11 @@ package transpile import ( + "bufio" "encoding/json" "os" "path" + "path/filepath" "regexp" "strings" "time" @@ -876,6 +878,27 @@ func DealWithDrop(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, return ingestProcessors, onFailureProcessors } +func TranslatePatterns(patterns []string) ([]string, bool) { + translated := []string{} + unknownPatterns := false + for _, pattern := range patterns { + switch pattern { + case "ISO8601": + translated = append(translated, "ISO8601") + case "UNIX": + translated = append(translated, "UNIX") + case "UNIX_MS": + translated = append(translated, "UNIX_MS") + case "TAI64N": + translated = append(translated, "TAI64N") + default: + translated = append(translated, pattern) + unknownPatterns = true + } + } + return translated, unknownPatterns +} + func DealWithDate(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, []IngestProcessor) { ingestProcessors := []IngestProcessor{} onFailureProcessors := []IngestProcessor{} @@ -902,9 +925,12 @@ func DealWithDate(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, case "match": matchArray := getArrayStringAttributes(attr) proc.Field = toElasticPipelineSelector(matchArray[0]) - proc.Formats = matchArray[1:] - log.Warn().Msgf("Date filter match patterns %v may have different semantics in Elasticsearch Ingest Pipeline. Refer to the respective documentations: https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-date#plugins-filters-date-match and https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/mapping-date-format", proc.Formats) - + formats := matchArray[1:] + translatedFormats, unknownPatterns := TranslatePatterns(formats) + proc.Formats = translatedFormats + if unknownPatterns { + log.Warn().Msgf("Date filter match patterns '%v' may have different semantics in Elasticsearch Ingest Pipeline. Refer to the respective documentations: https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-date#plugins-filters-date-match and https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/mapping-date-format", proc.Formats) + } default: log.Warn().Msgf("Attribute '%s' is currently not supported", attr.Name()) @@ -1162,6 +1188,51 @@ func DealWithGrok(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, onFailurePorcessors = DealWithTagOnFailure(attr, id, t) case "break_on_match": break_on_match = getBoolValue(attr) + case "patterns_dir": + pattenrsDir := getArrayStringAttributeOrStringAttrubute(attr) + for _, d := range pattenrsDir { + files, err := os.ReadDir(d) + if err != nil { + log.Warn().Err(err).Str("dir", d).Msg("Error while reading patterns_dir") + continue // Use continue so one bad dir doesn't stop the whole loop + } + + for _, file := range files { + if file.IsDir() { + continue + } + + fullPath := filepath.Join(d, file.Name()) + f, err := os.Open(fullPath) + if err != nil { + log.Warn().Err(err).Str("file", file.Name()).Msg("Error opening pattern file") + continue + } + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + + // Skip empty lines or comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + + // Split by first whitespace (handles tabs or multiple spaces) + parts := regexp.MustCompile(`\s+`).Split(line, 2) + if len(parts) < 2 { + log.Warn().Str("file", file.Name()).Str("line", line).Msg("Invalid pattern definition") + continue + } + + if gp.PatternDefinitions == nil { + gp.PatternDefinitions = make(map[string]string) + } + gp.PatternDefinitions[parts[0]] = parts[1] + } + f.Close() // Don't forget to close within the loop! + } + } default: log.Warn().Msgf("Attribute '%s' in Plugin '%s' is currently not supported", attr.Name(), plugin.Name()) From 69d8be535140dfd6e3b5a149b142a33f9bfd80b0 Mon Sep 17 00:00:00 2001 From: Mirko Bez Date: Wed, 18 Feb 2026 15:49:30 +0100 Subject: [PATCH 2/5] Add flag to override pattern location, make sure we can deal with whyscream/postfix syntax --- internal/app/transpile.go | 4 ++- internal/app/transpile/logstash_filter.go | 2 ++ internal/app/transpile/transpile.go | 36 ++++++++++++++++------- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/internal/app/transpile.go b/internal/app/transpile.go index ae01d19..3dd6a3a 100644 --- a/internal/app/transpile.go +++ b/internal/app/transpile.go @@ -17,6 +17,7 @@ var ( fidelity bool add_cleanup_processor bool inline bool + patterns_dir_path string ) func makeTranspileCmd() *cobra.Command { @@ -65,10 +66,11 @@ func makeTranspileCmd() *cobra.Command { cmd.Flags().BoolVar(&fidelity, "fidelity", true, "try to keep correct if-else semantic") cmd.Flags().BoolVar(&add_cleanup_processor, "add_cleanup_processor", true, "add a cleanup processor to remove temporary fields created by the transpiler") cmd.Flags().BoolVar(&inline, "inline", false, "whether the input is provided inline or via file paths(default false)") + cmd.Flags().StringVar(&patterns_dir_path, "patterns_dir_path", "", "directory containing pattern files") return cmd } func runTranspile(cmd *cobra.Command, args []string) error { - check := transpile.New(pipeline_threshold, log_level, deal_with_error_locally, add_default_global_on_failure, fidelity, add_cleanup_processor, inline) + check := transpile.New(pipeline_threshold, log_level, deal_with_error_locally, add_default_global_on_failure, fidelity, add_cleanup_processor, inline, patterns_dir_path) return check.Run(args) } diff --git a/internal/app/transpile/logstash_filter.go b/internal/app/transpile/logstash_filter.go index e450fac..2683682 100644 --- a/internal/app/transpile/logstash_filter.go +++ b/internal/app/transpile/logstash_filter.go @@ -35,6 +35,8 @@ func hashAttributeToMapArray(attr ast.Attribute) map[string][]string { m[keyString] = values } + default: // Unexpected Case --> PANIC + log.Panic().Msgf("Unexpected Case %s", attr.String()) } return m } diff --git a/internal/app/transpile/transpile.go b/internal/app/transpile/transpile.go index 6a3a0db..9f754a8 100644 --- a/internal/app/transpile/transpile.go +++ b/internal/app/transpile/transpile.go @@ -40,9 +40,10 @@ type Transpile struct { fidelity bool addCleanUpProcessor bool inline bool + patterns_dir_path string } -func New(threshold int, log_level string, deal_with_error_locally bool, addDefaultGlobalOnFailure bool, fidelity bool, addCleanupProcessor bool, inline bool) Transpile { +func New(threshold int, log_level string, deal_with_error_locally bool, addDefaultGlobalOnFailure bool, fidelity bool, addCleanupProcessor bool, inline bool, patterns_dir_path string) Transpile { return Transpile{ threshold: threshold, log_level: level[strings.ToLower(log_level)], @@ -51,6 +52,7 @@ func New(threshold int, log_level string, deal_with_error_locally bool, addDefau fidelity: fidelity, addCleanUpProcessor: addCleanupProcessor, inline: inline, + patterns_dir_path: patterns_dir_path, } } @@ -1165,13 +1167,23 @@ func DealWithGrok(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, } switch attr.Name() { case "match": - helpPatterns := hashAttributeToMapArray(attr) - // TODO: Deal with multiple keys, currently only the last is used - for key := range helpPatterns { - gp.Field = key - gp.Patterns = helpPatterns[key] - for i := range gp.Patterns { - gp.Patterns[i], _ = toElasticPipelineSelectorExpression(gp.Patterns[i], GrokContext) + // helpPatterns := hashAttributeToMapArray(attr) + + // // TODO: Deal with multiple keys, currently only the last is used + // for key := range helpPatterns { + // gp.Field = key + // gp.Patterns = helpPatterns[key] + // for i := range gp.Patterns { + // gp.Patterns[i], _ = toElasticPipelineSelectorExpression(gp.Patterns[i], GrokContext) + // } + // } + + keys, patterns := getHashAttributeKeyValue(attr) + for i := range keys { + gp.Field = keys[i] + gp.Patterns = []string{patterns[i]} + for j := range gp.Patterns { + gp.Patterns[j], _ = toElasticPipelineSelectorExpression(gp.Patterns[j], GrokContext) } } @@ -1189,8 +1201,12 @@ func DealWithGrok(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, case "break_on_match": break_on_match = getBoolValue(attr) case "patterns_dir": - pattenrsDir := getArrayStringAttributeOrStringAttrubute(attr) - for _, d := range pattenrsDir { + patternsDir := getArrayStringAttributeOrStringAttrubute(attr) + if t.patterns_dir_path != "" { + patternsDir = []string{t.patterns_dir_path} + log.Info().Msgf("Using patterns_dir path from Transpile struct: %s", patternsDir) + } + for _, d := range patternsDir { files, err := os.ReadDir(d) if err != nil { log.Warn().Err(err).Str("dir", d).Msg("Error while reading patterns_dir") From 9481ec316b3486b251f9c81b8243224ea19d92ad Mon Sep 17 00:00:00 2001 From: Mirko Bez Date: Wed, 18 Feb 2026 17:22:00 +0100 Subject: [PATCH 3/5] Make sure we stripp unnecessary patterns --- internal/app/transpile/transpile.go | 49 +++++++++++++++++++++++ internal/app/transpile/transpile_test.go | 50 +++++++++++++++++++++++- 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/internal/app/transpile/transpile.go b/internal/app/transpile/transpile.go index 9f754a8..2dac463 100644 --- a/internal/app/transpile/transpile.go +++ b/internal/app/transpile/transpile.go @@ -582,6 +582,31 @@ const ( CidrContext ) +func listUsedGrokPatterns(pattern string) []string { + // avoid consuming '}' in captures so multiple patterns in the same string + grokPartsFinder := regexp.MustCompile(`\%\{([^:}]+)(:[^}]+)?(:[^}]+)?\}`) + + matches := grokPartsFinder.FindAllStringSubmatch(pattern, -1) + + usedPatterns := []string{} + + for _, match := range matches { + usedPatterns = append(usedPatterns, match[1]) + } + + return usedPatterns +} + +func getAllUsedPatterns(patterns map[string]string, pattern string) []string { + usedPatterns := listUsedGrokPatterns(pattern) + for _, p := range usedPatterns { + if _, ok := patterns[p]; ok { + usedPatterns = append(usedPatterns, getAllUsedPatterns(patterns, patterns[p])...) + } + } + return usedPatterns +} + // Function that given an expression like "foo_%{[selector]}" returns the equivalent Elastic expression // "foo_{{selector}}" and boolean to indicate whether the string depends on input or not func toElasticPipelineSelectorExpression(s string, context int) (string, bool) { @@ -1254,6 +1279,30 @@ func DealWithGrok(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, } } + + usedPatterns := getAllUsedPatterns(gp.PatternDefinitions, gp.Patterns[0]) + log.Debug().Msgf("Used patterns: %v", usedPatterns) + + // Filter gp.PatternDefinitions to only keep patterns that are actually used. + if gp.PatternDefinitions != nil { + keep := make(map[string]string) + seen := make(map[string]struct{}) + for _, p := range usedPatterns { + if _, s := seen[p]; s { + continue + } + seen[p] = struct{}{} + if val, ok := gp.PatternDefinitions[p]; ok { + keep[p] = val + } + } + if len(keep) > 0 { + gp.PatternDefinitions = keep + } else { + gp.PatternDefinitions = nil + } + } + // Add _grok_parse_failure if len(gp.OnFailure) == 0 { onFailurePorcessors = DealWithTagOnFailure(ast.NewArrayAttribute("tag_on_failure", ast.NewStringAttribute("", "_grok_parse_failure", ast.DoubleQuoted)), id, t) diff --git a/internal/app/transpile/transpile_test.go b/internal/app/transpile/transpile_test.go index 840db24..f871a65 100644 --- a/internal/app/transpile/transpile_test.go +++ b/internal/app/transpile/transpile_test.go @@ -2,6 +2,7 @@ package transpile import ( "fmt" + "reflect" "testing" config "github.com/herrBez/baffo" @@ -246,7 +247,8 @@ func TestToElasticPipelineSelectorExpression(t *testing.T) { expectMatch bool }{ { - name: "ProcessorContext simple field", + name: "ProcessorContext simple field", + input: "foo_%{[bar]}", context: ProcessorContext, expected: "foo_{{{bar}}}", @@ -294,3 +296,49 @@ func TestToElasticPipelineSelectorExpression(t *testing.T) { }) } } + +func TestListUsedGrokPatterns(t *testing.T) { + tests := []struct { + name string + in string + want []string + }{ + {"no patterns", "plain text", []string{}}, + {"single pattern", "%{SYSLOG}", []string{"SYSLOG"}}, + {"pattern with field", "%{WORD:field}", []string{"WORD"}}, + {"multiple patterns", "%{WORD} %{NUM_BER}", []string{"WORD", "NUM_BER"}}, + {"pattern with convert", "%{IP:client:class}", []string{"IP"}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := listUsedGrokPatterns(tt.in) + if !reflect.DeepEqual(got, tt.want) { + t.Fatalf("want %v, got %v", tt.want, got) + } + }) + } +} + +func TestGetAllUsedPatterns(t *testing.T) { + patterns := map[string]string{ + "A": "%{B} %{C}", + "B": "%{D}", + "C": "literal", + "D": "%{E}", + "E": "", + } + + got := getAllUsedPatterns(patterns, "%{A}") + want := []string{"A", "B", "C", "D", "E"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("want %v, got %v", want, got) + } + + // when patterns map is empty we still return top-level used names + got2 := getAllUsedPatterns(map[string]string{}, "%{X} %{Y}") + want2 := []string{"X", "Y"} + if !reflect.DeepEqual(got2, want2) { + t.Fatalf("want %v, got %v", want2, got2) + } +} From 9c6129d39f372539b36cba0b2eb045cb0e078a83 Mon Sep 17 00:00:00 2001 From: Mirko Bez Date: Thu, 19 Feb 2026 09:06:34 +0100 Subject: [PATCH 4/5] Make sure we can interpret match both as Hash and List --- internal/app/transpile/logstash_filter.go | 18 +++++++++++ internal/app/transpile/transpile.go | 38 +++++++++++++---------- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/internal/app/transpile/logstash_filter.go b/internal/app/transpile/logstash_filter.go index 2683682..f76d5d4 100644 --- a/internal/app/transpile/logstash_filter.go +++ b/internal/app/transpile/logstash_filter.go @@ -211,3 +211,21 @@ func hashAttributeToMap(attr ast.Attribute) map[string]string { } return m } + +func isList(attr ast.Attribute) bool { + switch attr.(type) { + case ast.ArrayAttribute: + return true + default: + return false + } +} + +func isHash(attr ast.Attribute) bool { + switch attr.(type) { + case ast.HashAttribute: + return true + default: + return false + } +} diff --git a/internal/app/transpile/transpile.go b/internal/app/transpile/transpile.go index 2dac463..4386780 100644 --- a/internal/app/transpile/transpile.go +++ b/internal/app/transpile/transpile.go @@ -1192,24 +1192,28 @@ func DealWithGrok(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, } switch attr.Name() { case "match": - // helpPatterns := hashAttributeToMapArray(attr) - - // // TODO: Deal with multiple keys, currently only the last is used - // for key := range helpPatterns { - // gp.Field = key - // gp.Patterns = helpPatterns[key] - // for i := range gp.Patterns { - // gp.Patterns[i], _ = toElasticPipelineSelectorExpression(gp.Patterns[i], GrokContext) - // } - // } - - keys, patterns := getHashAttributeKeyValue(attr) - for i := range keys { - gp.Field = keys[i] - gp.Patterns = []string{patterns[i]} - for j := range gp.Patterns { - gp.Patterns[j], _ = toElasticPipelineSelectorExpression(gp.Patterns[j], GrokContext) + if isHash(attr) { + helpPatterns := hashAttributeToMapArray(attr) + // TODO: Deal with multiple keys, currently only the last is used + for key := range helpPatterns { + gp.Field = key + gp.Patterns = helpPatterns[key] + for i := range gp.Patterns { + gp.Patterns[i], _ = toElasticPipelineSelectorExpression(gp.Patterns[i], GrokContext) + } } + } else if isList(attr) { + // Note that this use-case should be less common given that it's not documented but it works + keys, patterns := getHashAttributeKeyValue(attr) + for i := range keys { + gp.Field = keys[i] + gp.Patterns = []string{patterns[i]} + for j := range gp.Patterns { + gp.Patterns[j], _ = toElasticPipelineSelectorExpression(gp.Patterns[j], GrokContext) + } + } + } else { + log.Panic().Msgf("Unexpected format for match attribute in Grok plugin: %s", attr) } case "ecs_compatibility": From 476e9b6c0722326891eb8904773b27243a18536b Mon Sep 17 00:00:00 2001 From: Mirko Bez Date: Fri, 20 Feb 2026 11:47:16 +0100 Subject: [PATCH 5/5] Update README --- README.md | 1 + internal/app/transpile/README.md | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 6a31bc3..535ebe4 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ For the transpilation we have different flags at disposal: - `pipeline_threshold`: determine how many processors will cause the creation of a new pipeline when converting if-else statements - `add_cleanup_processor`: whether we add a final remove processor to remove temporary fields created by the transpiler (and the `@metadata` field) - `inline`: whether the positional arguments are the script or a path to the script (default) +- `patterns_dir_path`: override the grok patterns_dir by default is empty By default, we try to keep the semantics as close as possible with the original Logstash Pipeline. To obtain idiomatic pipelines, consider using the following settings: diff --git a/internal/app/transpile/README.md b/internal/app/transpile/README.md index 2c89e90..e636d3e 100644 --- a/internal/app/transpile/README.md +++ b/internal/app/transpile/README.md @@ -12,7 +12,7 @@ This document lists the components the transpiler currently recognizes and the G | filter | [dissect](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-dissect) | `DealWithDissect` | ✅ | | filter | [drop](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-drop) | `DealWithDrop` | ✅ | | filter | [geoip](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-geoip) | `DealWithGeoIP` | Limited configuration options available | -| filter | [grok](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-grok) | `DealWithGrok` | Limited configuration options available (notably no `patterns_dir`) | +| filter | [grok](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-grok) | `DealWithGrok` | Limited configuration options available | | filter | [json](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-json) | `DealWithJSON` | Limited configuration options availale source and target. TODO: Add ecs_compatibility support| | filter | [kv](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-kv) | `DealWithKV` | Limited configuration options supported | | filter | [mutate](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-mutate) | `DealWithMutate` | ✅ (merge not supported yet) |