diff --git a/README.md b/README.md index 6a31bc3..535ebe4 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ For the transpilation we have different flags at disposal: - `pipeline_threshold`: determine how many processors will cause the creation of a new pipeline when converting if-else statements - `add_cleanup_processor`: whether we add a final remove processor to remove temporary fields created by the transpiler (and the `@metadata` field) - `inline`: whether the positional arguments are the script or a path to the script (default) +- `patterns_dir_path`: override the grok patterns_dir by default is empty By default, we try to keep the semantics as close as possible with the original Logstash Pipeline. To obtain idiomatic pipelines, consider using the following settings: diff --git a/internal/app/transpile.go b/internal/app/transpile.go index ae01d19..3dd6a3a 100644 --- a/internal/app/transpile.go +++ b/internal/app/transpile.go @@ -17,6 +17,7 @@ var ( fidelity bool add_cleanup_processor bool inline bool + patterns_dir_path string ) func makeTranspileCmd() *cobra.Command { @@ -65,10 +66,11 @@ func makeTranspileCmd() *cobra.Command { cmd.Flags().BoolVar(&fidelity, "fidelity", true, "try to keep correct if-else semantic") cmd.Flags().BoolVar(&add_cleanup_processor, "add_cleanup_processor", true, "add a cleanup processor to remove temporary fields created by the transpiler") cmd.Flags().BoolVar(&inline, "inline", false, "whether the input is provided inline or via file paths(default false)") + cmd.Flags().StringVar(&patterns_dir_path, "patterns_dir_path", "", "directory containing pattern files") return cmd } func runTranspile(cmd *cobra.Command, args []string) error { - check := transpile.New(pipeline_threshold, log_level, deal_with_error_locally, add_default_global_on_failure, fidelity, add_cleanup_processor, inline) + check := transpile.New(pipeline_threshold, log_level, deal_with_error_locally, add_default_global_on_failure, fidelity, add_cleanup_processor, inline, patterns_dir_path) return check.Run(args) } diff --git a/internal/app/transpile/README.md b/internal/app/transpile/README.md index 2c89e90..e636d3e 100644 --- a/internal/app/transpile/README.md +++ b/internal/app/transpile/README.md @@ -12,7 +12,7 @@ This document lists the components the transpiler currently recognizes and the G | filter | [dissect](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-dissect) | `DealWithDissect` | ✅ | | filter | [drop](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-drop) | `DealWithDrop` | ✅ | | filter | [geoip](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-geoip) | `DealWithGeoIP` | Limited configuration options available | -| filter | [grok](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-grok) | `DealWithGrok` | Limited configuration options available (notably no `patterns_dir`) | +| filter | [grok](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-grok) | `DealWithGrok` | Limited configuration options available | | filter | [json](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-json) | `DealWithJSON` | Limited configuration options availale source and target. TODO: Add ecs_compatibility support| | filter | [kv](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-kv) | `DealWithKV` | Limited configuration options supported | | filter | [mutate](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-mutate) | `DealWithMutate` | ✅ (merge not supported yet) | diff --git a/internal/app/transpile/logstash_filter.go b/internal/app/transpile/logstash_filter.go index e450fac..f76d5d4 100644 --- a/internal/app/transpile/logstash_filter.go +++ b/internal/app/transpile/logstash_filter.go @@ -35,6 +35,8 @@ func hashAttributeToMapArray(attr ast.Attribute) map[string][]string { m[keyString] = values } + default: // Unexpected Case --> PANIC + log.Panic().Msgf("Unexpected Case %s", attr.String()) } return m } @@ -209,3 +211,21 @@ func hashAttributeToMap(attr ast.Attribute) map[string]string { } return m } + +func isList(attr ast.Attribute) bool { + switch attr.(type) { + case ast.ArrayAttribute: + return true + default: + return false + } +} + +func isHash(attr ast.Attribute) bool { + switch attr.(type) { + case ast.HashAttribute: + return true + default: + return false + } +} diff --git a/internal/app/transpile/transpile.go b/internal/app/transpile/transpile.go index 5c82e3c..4386780 100644 --- a/internal/app/transpile/transpile.go +++ b/internal/app/transpile/transpile.go @@ -1,9 +1,11 @@ package transpile import ( + "bufio" "encoding/json" "os" "path" + "path/filepath" "regexp" "strings" "time" @@ -38,9 +40,10 @@ type Transpile struct { fidelity bool addCleanUpProcessor bool inline bool + patterns_dir_path string } -func New(threshold int, log_level string, deal_with_error_locally bool, addDefaultGlobalOnFailure bool, fidelity bool, addCleanupProcessor bool, inline bool) Transpile { +func New(threshold int, log_level string, deal_with_error_locally bool, addDefaultGlobalOnFailure bool, fidelity bool, addCleanupProcessor bool, inline bool, patterns_dir_path string) Transpile { return Transpile{ threshold: threshold, log_level: level[strings.ToLower(log_level)], @@ -49,6 +52,7 @@ func New(threshold int, log_level string, deal_with_error_locally bool, addDefau fidelity: fidelity, addCleanUpProcessor: addCleanupProcessor, inline: inline, + patterns_dir_path: patterns_dir_path, } } @@ -578,6 +582,31 @@ const ( CidrContext ) +func listUsedGrokPatterns(pattern string) []string { + // avoid consuming '}' in captures so multiple patterns in the same string + grokPartsFinder := regexp.MustCompile(`\%\{([^:}]+)(:[^}]+)?(:[^}]+)?\}`) + + matches := grokPartsFinder.FindAllStringSubmatch(pattern, -1) + + usedPatterns := []string{} + + for _, match := range matches { + usedPatterns = append(usedPatterns, match[1]) + } + + return usedPatterns +} + +func getAllUsedPatterns(patterns map[string]string, pattern string) []string { + usedPatterns := listUsedGrokPatterns(pattern) + for _, p := range usedPatterns { + if _, ok := patterns[p]; ok { + usedPatterns = append(usedPatterns, getAllUsedPatterns(patterns, patterns[p])...) + } + } + return usedPatterns +} + // Function that given an expression like "foo_%{[selector]}" returns the equivalent Elastic expression // "foo_{{selector}}" and boolean to indicate whether the string depends on input or not func toElasticPipelineSelectorExpression(s string, context int) (string, bool) { @@ -876,6 +905,27 @@ func DealWithDrop(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, return ingestProcessors, onFailureProcessors } +func TranslatePatterns(patterns []string) ([]string, bool) { + translated := []string{} + unknownPatterns := false + for _, pattern := range patterns { + switch pattern { + case "ISO8601": + translated = append(translated, "ISO8601") + case "UNIX": + translated = append(translated, "UNIX") + case "UNIX_MS": + translated = append(translated, "UNIX_MS") + case "TAI64N": + translated = append(translated, "TAI64N") + default: + translated = append(translated, pattern) + unknownPatterns = true + } + } + return translated, unknownPatterns +} + func DealWithDate(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, []IngestProcessor) { ingestProcessors := []IngestProcessor{} onFailureProcessors := []IngestProcessor{} @@ -902,9 +952,12 @@ func DealWithDate(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, case "match": matchArray := getArrayStringAttributes(attr) proc.Field = toElasticPipelineSelector(matchArray[0]) - proc.Formats = matchArray[1:] - log.Warn().Msgf("Date filter match patterns %v may have different semantics in Elasticsearch Ingest Pipeline. Refer to the respective documentations: https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-date#plugins-filters-date-match and https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/mapping-date-format", proc.Formats) - + formats := matchArray[1:] + translatedFormats, unknownPatterns := TranslatePatterns(formats) + proc.Formats = translatedFormats + if unknownPatterns { + log.Warn().Msgf("Date filter match patterns '%v' may have different semantics in Elasticsearch Ingest Pipeline. Refer to the respective documentations: https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-date#plugins-filters-date-match and https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/mapping-date-format", proc.Formats) + } default: log.Warn().Msgf("Attribute '%s' is currently not supported", attr.Name()) @@ -1139,14 +1192,28 @@ func DealWithGrok(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, } switch attr.Name() { case "match": - helpPatterns := hashAttributeToMapArray(attr) - // TODO: Deal with multiple keys, currently only the last is used - for key := range helpPatterns { - gp.Field = key - gp.Patterns = helpPatterns[key] - for i := range gp.Patterns { - gp.Patterns[i], _ = toElasticPipelineSelectorExpression(gp.Patterns[i], GrokContext) + if isHash(attr) { + helpPatterns := hashAttributeToMapArray(attr) + // TODO: Deal with multiple keys, currently only the last is used + for key := range helpPatterns { + gp.Field = key + gp.Patterns = helpPatterns[key] + for i := range gp.Patterns { + gp.Patterns[i], _ = toElasticPipelineSelectorExpression(gp.Patterns[i], GrokContext) + } } + } else if isList(attr) { + // Note that this use-case should be less common given that it's not documented but it works + keys, patterns := getHashAttributeKeyValue(attr) + for i := range keys { + gp.Field = keys[i] + gp.Patterns = []string{patterns[i]} + for j := range gp.Patterns { + gp.Patterns[j], _ = toElasticPipelineSelectorExpression(gp.Patterns[j], GrokContext) + } + } + } else { + log.Panic().Msgf("Unexpected format for match attribute in Grok plugin: %s", attr) } case "ecs_compatibility": @@ -1162,11 +1229,84 @@ func DealWithGrok(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, onFailurePorcessors = DealWithTagOnFailure(attr, id, t) case "break_on_match": break_on_match = getBoolValue(attr) + case "patterns_dir": + patternsDir := getArrayStringAttributeOrStringAttrubute(attr) + if t.patterns_dir_path != "" { + patternsDir = []string{t.patterns_dir_path} + log.Info().Msgf("Using patterns_dir path from Transpile struct: %s", patternsDir) + } + for _, d := range patternsDir { + files, err := os.ReadDir(d) + if err != nil { + log.Warn().Err(err).Str("dir", d).Msg("Error while reading patterns_dir") + continue // Use continue so one bad dir doesn't stop the whole loop + } + + for _, file := range files { + if file.IsDir() { + continue + } + + fullPath := filepath.Join(d, file.Name()) + f, err := os.Open(fullPath) + if err != nil { + log.Warn().Err(err).Str("file", file.Name()).Msg("Error opening pattern file") + continue + } + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + + // Skip empty lines or comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + + // Split by first whitespace (handles tabs or multiple spaces) + parts := regexp.MustCompile(`\s+`).Split(line, 2) + if len(parts) < 2 { + log.Warn().Str("file", file.Name()).Str("line", line).Msg("Invalid pattern definition") + continue + } + + if gp.PatternDefinitions == nil { + gp.PatternDefinitions = make(map[string]string) + } + gp.PatternDefinitions[parts[0]] = parts[1] + } + f.Close() // Don't forget to close within the loop! + } + } default: log.Warn().Msgf("Attribute '%s' in Plugin '%s' is currently not supported", attr.Name(), plugin.Name()) } } + + usedPatterns := getAllUsedPatterns(gp.PatternDefinitions, gp.Patterns[0]) + log.Debug().Msgf("Used patterns: %v", usedPatterns) + + // Filter gp.PatternDefinitions to only keep patterns that are actually used. + if gp.PatternDefinitions != nil { + keep := make(map[string]string) + seen := make(map[string]struct{}) + for _, p := range usedPatterns { + if _, s := seen[p]; s { + continue + } + seen[p] = struct{}{} + if val, ok := gp.PatternDefinitions[p]; ok { + keep[p] = val + } + } + if len(keep) > 0 { + gp.PatternDefinitions = keep + } else { + gp.PatternDefinitions = nil + } + } + // Add _grok_parse_failure if len(gp.OnFailure) == 0 { onFailurePorcessors = DealWithTagOnFailure(ast.NewArrayAttribute("tag_on_failure", ast.NewStringAttribute("", "_grok_parse_failure", ast.DoubleQuoted)), id, t) diff --git a/internal/app/transpile/transpile_test.go b/internal/app/transpile/transpile_test.go index 840db24..f871a65 100644 --- a/internal/app/transpile/transpile_test.go +++ b/internal/app/transpile/transpile_test.go @@ -2,6 +2,7 @@ package transpile import ( "fmt" + "reflect" "testing" config "github.com/herrBez/baffo" @@ -246,7 +247,8 @@ func TestToElasticPipelineSelectorExpression(t *testing.T) { expectMatch bool }{ { - name: "ProcessorContext simple field", + name: "ProcessorContext simple field", + input: "foo_%{[bar]}", context: ProcessorContext, expected: "foo_{{{bar}}}", @@ -294,3 +296,49 @@ func TestToElasticPipelineSelectorExpression(t *testing.T) { }) } } + +func TestListUsedGrokPatterns(t *testing.T) { + tests := []struct { + name string + in string + want []string + }{ + {"no patterns", "plain text", []string{}}, + {"single pattern", "%{SYSLOG}", []string{"SYSLOG"}}, + {"pattern with field", "%{WORD:field}", []string{"WORD"}}, + {"multiple patterns", "%{WORD} %{NUM_BER}", []string{"WORD", "NUM_BER"}}, + {"pattern with convert", "%{IP:client:class}", []string{"IP"}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := listUsedGrokPatterns(tt.in) + if !reflect.DeepEqual(got, tt.want) { + t.Fatalf("want %v, got %v", tt.want, got) + } + }) + } +} + +func TestGetAllUsedPatterns(t *testing.T) { + patterns := map[string]string{ + "A": "%{B} %{C}", + "B": "%{D}", + "C": "literal", + "D": "%{E}", + "E": "", + } + + got := getAllUsedPatterns(patterns, "%{A}") + want := []string{"A", "B", "C", "D", "E"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("want %v, got %v", want, got) + } + + // when patterns map is empty we still return top-level used names + got2 := getAllUsedPatterns(map[string]string{}, "%{X} %{Y}") + want2 := []string{"X", "Y"} + if !reflect.DeepEqual(got2, want2) { + t.Fatalf("want %v, got %v", want2, got2) + } +}