Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ For the transpilation we have different flags at disposal:
- `pipeline_threshold`: determine how many processors will cause the creation of a new pipeline when converting if-else statements
- `add_cleanup_processor`: whether we add a final remove processor to remove temporary fields created by the transpiler (and the `@metadata` field)
- `inline`: whether the positional arguments are the script or a path to the script (default)
- `patterns_dir_path`: override the grok patterns_dir by default is empty

By default, we try to keep the semantics as close as possible with the original Logstash Pipeline. To obtain idiomatic pipelines, consider using the following settings:

Expand Down
4 changes: 3 additions & 1 deletion internal/app/transpile.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
fidelity bool
add_cleanup_processor bool
inline bool
patterns_dir_path string
)

func makeTranspileCmd() *cobra.Command {
Expand Down Expand Up @@ -65,10 +66,11 @@ func makeTranspileCmd() *cobra.Command {
cmd.Flags().BoolVar(&fidelity, "fidelity", true, "try to keep correct if-else semantic")
cmd.Flags().BoolVar(&add_cleanup_processor, "add_cleanup_processor", true, "add a cleanup processor to remove temporary fields created by the transpiler")
cmd.Flags().BoolVar(&inline, "inline", false, "whether the input is provided inline or via file paths(default false)")
cmd.Flags().StringVar(&patterns_dir_path, "patterns_dir_path", "", "directory containing pattern files")
return cmd
}

func runTranspile(cmd *cobra.Command, args []string) error {
check := transpile.New(pipeline_threshold, log_level, deal_with_error_locally, add_default_global_on_failure, fidelity, add_cleanup_processor, inline)
check := transpile.New(pipeline_threshold, log_level, deal_with_error_locally, add_default_global_on_failure, fidelity, add_cleanup_processor, inline, patterns_dir_path)
return check.Run(args)
}
2 changes: 1 addition & 1 deletion internal/app/transpile/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This document lists the components the transpiler currently recognizes and the G
| filter | [dissect](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-dissect) | `DealWithDissect` | ✅ |
| filter | [drop](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-drop) | `DealWithDrop` | ✅ |
| filter | [geoip](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-geoip) | `DealWithGeoIP` | Limited configuration options available |
| filter | [grok](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-grok) | `DealWithGrok` | Limited configuration options available (notably no `patterns_dir`) |
| filter | [grok](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-grok) | `DealWithGrok` | Limited configuration options available |
| filter | [json](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-json) | `DealWithJSON` | Limited configuration options availale source and target. TODO: Add ecs_compatibility support|
| filter | [kv](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-kv) | `DealWithKV` | Limited configuration options supported |
| filter | [mutate](https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-mutate) | `DealWithMutate` | ✅ (merge not supported yet) |
Expand Down
20 changes: 20 additions & 0 deletions internal/app/transpile/logstash_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func hashAttributeToMapArray(attr ast.Attribute) map[string][]string {

m[keyString] = values
}
default: // Unexpected Case --> PANIC
log.Panic().Msgf("Unexpected Case %s", attr.String())
}
return m
}
Expand Down Expand Up @@ -209,3 +211,21 @@ func hashAttributeToMap(attr ast.Attribute) map[string]string {
}
return m
}

func isList(attr ast.Attribute) bool {
switch attr.(type) {
case ast.ArrayAttribute:
return true
default:
return false
}
}

func isHash(attr ast.Attribute) bool {
switch attr.(type) {
case ast.HashAttribute:
return true
default:
return false
}
}
162 changes: 151 additions & 11 deletions internal/app/transpile/transpile.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package transpile

import (
"bufio"
"encoding/json"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"time"
Expand Down Expand Up @@ -38,9 +40,10 @@ type Transpile struct {
fidelity bool
addCleanUpProcessor bool
inline bool
patterns_dir_path string
}

func New(threshold int, log_level string, deal_with_error_locally bool, addDefaultGlobalOnFailure bool, fidelity bool, addCleanupProcessor bool, inline bool) Transpile {
func New(threshold int, log_level string, deal_with_error_locally bool, addDefaultGlobalOnFailure bool, fidelity bool, addCleanupProcessor bool, inline bool, patterns_dir_path string) Transpile {
return Transpile{
threshold: threshold,
log_level: level[strings.ToLower(log_level)],
Expand All @@ -49,6 +52,7 @@ func New(threshold int, log_level string, deal_with_error_locally bool, addDefau
fidelity: fidelity,
addCleanUpProcessor: addCleanupProcessor,
inline: inline,
patterns_dir_path: patterns_dir_path,
}
}

Expand Down Expand Up @@ -578,6 +582,31 @@ const (
CidrContext
)

func listUsedGrokPatterns(pattern string) []string {
// avoid consuming '}' in captures so multiple patterns in the same string
grokPartsFinder := regexp.MustCompile(`\%\{([^:}]+)(:[^}]+)?(:[^}]+)?\}`)

matches := grokPartsFinder.FindAllStringSubmatch(pattern, -1)

usedPatterns := []string{}

for _, match := range matches {
usedPatterns = append(usedPatterns, match[1])
}

return usedPatterns
}

func getAllUsedPatterns(patterns map[string]string, pattern string) []string {
usedPatterns := listUsedGrokPatterns(pattern)
for _, p := range usedPatterns {
if _, ok := patterns[p]; ok {
usedPatterns = append(usedPatterns, getAllUsedPatterns(patterns, patterns[p])...)
}
}
return usedPatterns
}

// Function that given an expression like "foo_%{[selector]}" returns the equivalent Elastic expression
// "foo_{{selector}}" and boolean to indicate whether the string depends on input or not
func toElasticPipelineSelectorExpression(s string, context int) (string, bool) {
Expand Down Expand Up @@ -876,6 +905,27 @@ func DealWithDrop(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor,
return ingestProcessors, onFailureProcessors
}

func TranslatePatterns(patterns []string) ([]string, bool) {
translated := []string{}
unknownPatterns := false
for _, pattern := range patterns {
switch pattern {
case "ISO8601":
translated = append(translated, "ISO8601")
case "UNIX":
translated = append(translated, "UNIX")
case "UNIX_MS":
translated = append(translated, "UNIX_MS")
case "TAI64N":
translated = append(translated, "TAI64N")
default:
translated = append(translated, pattern)
unknownPatterns = true
}
}
return translated, unknownPatterns
}

func DealWithDate(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor, []IngestProcessor) {
ingestProcessors := []IngestProcessor{}
onFailureProcessors := []IngestProcessor{}
Expand All @@ -902,9 +952,12 @@ func DealWithDate(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor,
case "match":
matchArray := getArrayStringAttributes(attr)
proc.Field = toElasticPipelineSelector(matchArray[0])
proc.Formats = matchArray[1:]
log.Warn().Msgf("Date filter match patterns %v may have different semantics in Elasticsearch Ingest Pipeline. Refer to the respective documentations: https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-date#plugins-filters-date-match and https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/mapping-date-format", proc.Formats)

formats := matchArray[1:]
translatedFormats, unknownPatterns := TranslatePatterns(formats)
proc.Formats = translatedFormats
if unknownPatterns {
log.Warn().Msgf("Date filter match patterns '%v' may have different semantics in Elasticsearch Ingest Pipeline. Refer to the respective documentations: https://www.elastic.co/docs/reference/logstash/plugins/plugins-filters-date#plugins-filters-date-match and https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/mapping-date-format", proc.Formats)
}
default:
log.Warn().Msgf("Attribute '%s' is currently not supported", attr.Name())

Expand Down Expand Up @@ -1139,14 +1192,28 @@ func DealWithGrok(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor,
}
switch attr.Name() {
case "match":
helpPatterns := hashAttributeToMapArray(attr)
// TODO: Deal with multiple keys, currently only the last is used
for key := range helpPatterns {
gp.Field = key
gp.Patterns = helpPatterns[key]
for i := range gp.Patterns {
gp.Patterns[i], _ = toElasticPipelineSelectorExpression(gp.Patterns[i], GrokContext)
if isHash(attr) {
helpPatterns := hashAttributeToMapArray(attr)
// TODO: Deal with multiple keys, currently only the last is used
for key := range helpPatterns {
gp.Field = key
gp.Patterns = helpPatterns[key]
for i := range gp.Patterns {
gp.Patterns[i], _ = toElasticPipelineSelectorExpression(gp.Patterns[i], GrokContext)
}
}
} else if isList(attr) {
// Note that this use-case should be less common given that it's not documented but it works
keys, patterns := getHashAttributeKeyValue(attr)
for i := range keys {
gp.Field = keys[i]
gp.Patterns = []string{patterns[i]}
for j := range gp.Patterns {
gp.Patterns[j], _ = toElasticPipelineSelectorExpression(gp.Patterns[j], GrokContext)
}
}
} else {
log.Panic().Msgf("Unexpected format for match attribute in Grok plugin: %s", attr)
}

case "ecs_compatibility":
Expand All @@ -1162,11 +1229,84 @@ func DealWithGrok(plugin ast.Plugin, id string, t Transpile) ([]IngestProcessor,
onFailurePorcessors = DealWithTagOnFailure(attr, id, t)
case "break_on_match":
break_on_match = getBoolValue(attr)
case "patterns_dir":
patternsDir := getArrayStringAttributeOrStringAttrubute(attr)
if t.patterns_dir_path != "" {
patternsDir = []string{t.patterns_dir_path}
log.Info().Msgf("Using patterns_dir path from Transpile struct: %s", patternsDir)
}
for _, d := range patternsDir {
files, err := os.ReadDir(d)
if err != nil {
log.Warn().Err(err).Str("dir", d).Msg("Error while reading patterns_dir")
continue // Use continue so one bad dir doesn't stop the whole loop
}

for _, file := range files {
if file.IsDir() {
continue
}

fullPath := filepath.Join(d, file.Name())
f, err := os.Open(fullPath)
if err != nil {
log.Warn().Err(err).Str("file", file.Name()).Msg("Error opening pattern file")
continue
}

scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())

// Skip empty lines or comments
if line == "" || strings.HasPrefix(line, "#") {
continue
}

// Split by first whitespace (handles tabs or multiple spaces)
parts := regexp.MustCompile(`\s+`).Split(line, 2)
if len(parts) < 2 {
log.Warn().Str("file", file.Name()).Str("line", line).Msg("Invalid pattern definition")
continue
}

if gp.PatternDefinitions == nil {
gp.PatternDefinitions = make(map[string]string)
}
gp.PatternDefinitions[parts[0]] = parts[1]
}
f.Close() // Don't forget to close within the loop!
}
}
default:
log.Warn().Msgf("Attribute '%s' in Plugin '%s' is currently not supported", attr.Name(), plugin.Name())

}
}

usedPatterns := getAllUsedPatterns(gp.PatternDefinitions, gp.Patterns[0])
log.Debug().Msgf("Used patterns: %v", usedPatterns)

// Filter gp.PatternDefinitions to only keep patterns that are actually used.
if gp.PatternDefinitions != nil {
keep := make(map[string]string)
seen := make(map[string]struct{})
for _, p := range usedPatterns {
if _, s := seen[p]; s {
continue
}
seen[p] = struct{}{}
if val, ok := gp.PatternDefinitions[p]; ok {
keep[p] = val
}
}
if len(keep) > 0 {
gp.PatternDefinitions = keep
} else {
gp.PatternDefinitions = nil
}
}

// Add _grok_parse_failure
if len(gp.OnFailure) == 0 {
onFailurePorcessors = DealWithTagOnFailure(ast.NewArrayAttribute("tag_on_failure", ast.NewStringAttribute("", "_grok_parse_failure", ast.DoubleQuoted)), id, t)
Expand Down
50 changes: 49 additions & 1 deletion internal/app/transpile/transpile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transpile

import (
"fmt"
"reflect"
"testing"

config "github.com/herrBez/baffo"
Expand Down Expand Up @@ -246,7 +247,8 @@ func TestToElasticPipelineSelectorExpression(t *testing.T) {
expectMatch bool
}{
{
name: "ProcessorContext simple field",
name: "ProcessorContext simple field",

input: "foo_%{[bar]}",
context: ProcessorContext,
expected: "foo_{{{bar}}}",
Expand Down Expand Up @@ -294,3 +296,49 @@ func TestToElasticPipelineSelectorExpression(t *testing.T) {
})
}
}

func TestListUsedGrokPatterns(t *testing.T) {
tests := []struct {
name string
in string
want []string
}{
{"no patterns", "plain text", []string{}},
{"single pattern", "%{SYSLOG}", []string{"SYSLOG"}},
{"pattern with field", "%{WORD:field}", []string{"WORD"}},
{"multiple patterns", "%{WORD} %{NUM_BER}", []string{"WORD", "NUM_BER"}},
{"pattern with convert", "%{IP:client:class}", []string{"IP"}},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := listUsedGrokPatterns(tt.in)
if !reflect.DeepEqual(got, tt.want) {
t.Fatalf("want %v, got %v", tt.want, got)
}
})
}
}

func TestGetAllUsedPatterns(t *testing.T) {
patterns := map[string]string{
"A": "%{B} %{C}",
"B": "%{D}",
"C": "literal",
"D": "%{E}",
"E": "",
}

got := getAllUsedPatterns(patterns, "%{A}")
want := []string{"A", "B", "C", "D", "E"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("want %v, got %v", want, got)
}

// when patterns map is empty we still return top-level used names
got2 := getAllUsedPatterns(map[string]string{}, "%{X} %{Y}")
want2 := []string{"X", "Y"}
if !reflect.DeepEqual(got2, want2) {
t.Fatalf("want %v, got %v", want2, got2)
}
}
Loading