Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions changelog/fragments/1773776832-Add-file-delivery-sources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Adds file-delivery source routing to integration indices

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Adds an optional query parameter (?source) to the existing file delivery API. When
omitted, files are pulled from the fleet-owned file data streams as before. When used,
fleet server will pull from the calling integration's owned indices.

# Affected component; a word indicating the component this changeset affects.
component: fleet-server

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/fleet-server/pull/6599

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
18 changes: 18 additions & 0 deletions internal/pkg/api/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,24 @@ func NewHTTPErrResp(err error) HTTPErrResp {
zerolog.InfoLevel,
},
},
{
ErrClientFileForbidden,
HTTPErrResp{
http.StatusForbidden,
"ErrClientFileForbidden",
"client forbidden",
zerolog.InfoLevel,
},
},
{
ErrFileForDeliveryNotFound,
HTTPErrResp{
http.StatusNotFound,
"ErrFileForDeliveryNotFound",
"file not found",
zerolog.InfoLevel,
},
},
{
ErrPolicyNotFound,
HTTPErrResp{
Expand Down
46 changes: 41 additions & 5 deletions internal/pkg/api/handleFileDelivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,35 @@ package api

import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"

"github.com/rs/zerolog"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/file"
"github.com/elastic/fleet-server/v7/internal/pkg/file/delivery"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/go-elasticsearch/v8"
)

var (
ErrClientFileForbidden = errors.New("agent not authorized for library")
ErrFileForDeliveryNotFound = errors.New("unable to retrieve file")
ErrLibraryFileNotFound = fmt.Errorf("%w from library", ErrFileForDeliveryNotFound)
ErrTargetFileNotFound = fmt.Errorf("%w for agent", ErrFileForDeliveryNotFound)

// allowlist of clients to use file-library functionality
// prevents arbitrary index reading unless integration opts in
KnownProductOriginFileUsers = map[string]string{
"endpoint-security": "endpoint",
}
)

type FileDeliveryT struct {
bulker bulk.Bulk
cache cache.Cache
Expand All @@ -41,13 +57,33 @@ func (ft *FileDeliveryT) handleSendFile(zlog zerolog.Logger, w http.ResponseWrit
return err
}

// find file
info, err := ft.deliverer.FindFileForAgent(r.Context(), fileID, agent.Agent.ID)
if err != nil {
return err
// determine storage place for file lookup Can be either in integration libraries ( ?source=X ) OR agent-targeted, fleet-owned stream
var info file.MetaDoc
var idx string
libStorageSrc := strings.TrimSpace(r.URL.Query().Get("source"))
if libStorageSrc != "" {
// determine integration client for library file
clientSrc := strings.ToLower(strings.TrimSpace(r.Header.Get("x-elastic-product-origin")))
if clientSrc == "" {
return fmt.Errorf("%w: Client not specified", ErrClientFileForbidden)
}
integration, ok := KnownProductOriginFileUsers[clientSrc]
if !ok {
return ErrClientFileForbidden
}
info, idx, err = ft.deliverer.FindLibraryFile(r.Context(), fileID, integration, libStorageSrc)
if err != nil {
return fmt.Errorf("%w: %w", ErrLibraryFileNotFound, err)
}
} else {
// file is not stored in wide-distribution, is limited to intended agents
info, idx, err = ft.deliverer.FindFileForAgent(r.Context(), fileID, agent.Agent.ID)
if err != nil {
return fmt.Errorf("%w: %w", ErrTargetFileNotFound, err)
}
}

chunks, err := ft.deliverer.LocateChunks(r.Context(), zlog, fileID)
chunks, err := ft.deliverer.LocateChunks(r.Context(), zlog, fileID, idx)
if errors.Is(err, delivery.ErrNoFile) {
w.WriteHeader(http.StatusNotFound)
return err
Expand Down
124 changes: 121 additions & 3 deletions internal/pkg/api/handleFileDelivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
isFileChunkSearch = mock.MatchedBy(func(idx string) bool {
return strings.HasPrefix(idx, fmt.Sprintf(delivery.FileDataIndexPattern, ""))
})

sampleDocBody_ABCD = hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD")
)

func TestFileDeliveryRouteDisallowedMethods(t *testing.T) {
Expand Down Expand Up @@ -172,7 +174,7 @@ func TestFileDelivery(t *testing.T) {
}, nil,
)

tx.Response = sendBodyBytes(hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD"))
tx.Response = sendBodyBytes(sampleDocBody_ABCD)

hr.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/fleet/file/X", nil))

Expand Down Expand Up @@ -310,7 +312,7 @@ func TestFileDeliverySetsHeaders(t *testing.T) {
},
}, nil,
)
tx.Response = sendBodyBytes(hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD"))
tx.Response = sendBodyBytes(sampleDocBody_ABCD)

hr.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/fleet/file/X", nil))

Expand Down Expand Up @@ -371,14 +373,130 @@ func TestFileDeliverySetsHashWhenPresent(t *testing.T) {
},
}, nil,
)
tx.Response = sendBodyBytes(hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD"))
tx.Response = sendBodyBytes(sampleDocBody_ABCD)

hr.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/fleet/file/X", nil))

require.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "deadbeef", rec.Header().Get("X-File-SHA2"))
}

func TestFileLibraryDeliveryStopsEmptyClients(t *testing.T) {
hr, _, _, _ := prepareFileDeliveryMock(t)
rec := httptest.NewRecorder()

req := httptest.NewRequest(http.MethodGet, "/api/fleet/file/X?source=foo", nil)
req.Header.Del("X-elastic-product-origin")
hr.ServeHTTP(rec, req)

assert.Equal(t, http.StatusForbidden, rec.Code)
}

func TestFileLibraryDeliveryStopsInvalidClients(t *testing.T) {
hr, _, _, _ := prepareFileDeliveryMock(t)
rec := httptest.NewRecorder()

req := httptest.NewRequest(http.MethodGet, "/api/fleet/file/X?source=foo", nil)
req.Header.Add("X-elastic-product-origin", "bar")
hr.ServeHTTP(rec, req)

assert.Equal(t, http.StatusForbidden, rec.Code)
}

func TestFileLibraryDeliveryAllowsValidClients(t *testing.T) {
hr, _, tx, bulk := prepareFileDeliveryMock(t)
rec := httptest.NewRecorder()

libName := "foolib"
dataIndex := fmt.Sprintf(delivery.LibraryFileDataIndexPattern, "endpoint", libName)

bulk.On("Read", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
[]byte(`{}`), nil,
)
bulk.On("Search", mock.Anything, dataIndex, mock.Anything, mock.Anything, mock.Anything).Return(
&es.ResultT{
HitsT: es.HitsT{
Hits: []es.HitT{
{
ID: "X.0",
SeqNo: 1,
Version: 1,
Index: fmt.Sprintf(delivery.LibraryFileDataIndexPattern, "endpoint", libName),
Fields: map[string]interface{}{
file.FieldBaseID: []interface{}{"X"},
file.FieldLast: []interface{}{true},
},
},
},
},
}, nil,
)
tx.Response = sendBodyBytes(sampleDocBody_ABCD)

req := httptest.NewRequest(http.MethodGet, "/api/fleet/file/X?source="+libName, nil)
req.Header.Set("X-elastic-product-origin", "endpoint-security")
hr.ServeHTTP(rec, req)

bulk.AssertCalled(t, "Read", mock.Anything, mock.Anything, mock.Anything, mock.Anything)

assert.Equal(t, http.StatusOK, rec.Code)
}

func TestFileLibraryDelivery(t *testing.T) {
hr, _, tx, bulk := prepareFileDeliveryMock(t)
rec := httptest.NewRecorder()

libName := "script"
dataIndex := fmt.Sprintf(delivery.LibraryFileDataIndexPattern, "endpoint", libName)

bulk.On("Read", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
[]byte(`{
"file": {
"created": "2023-06-05T15:23:37.499Z",
"Status": "READY",
"Updated": "2023-06-05T15:23:37.499Z",
"name": "somefile.csv",
"mime_type": "text/csv",
"size": 4,
"hash": {
"sha256": "deadbeef"
}
}
}`), nil,
)
bulk.On("Search", mock.Anything, dataIndex, mock.Anything, mock.Anything, mock.Anything).Return(
&es.ResultT{
HitsT: es.HitsT{
Hits: []es.HitT{
{
ID: "X.0",
SeqNo: 1,
Version: 1,
Index: fmt.Sprintf(delivery.LibraryFileDataIndexPattern, "endpoint", libName),
Fields: map[string]interface{}{
file.FieldBaseID: []interface{}{"X"},
file.FieldLast: []interface{}{true},
},
},
},
},
}, nil,
)
tx.Response = sendBodyBytes(sampleDocBody_ABCD)

req := httptest.NewRequest(http.MethodGet, "/api/fleet/file/X?source="+libName, nil)
req.Header.Set("X-elastic-product-origin", "endpoint-security")
hr.ServeHTTP(rec, req)

bulk.AssertCalled(t, "Read", mock.Anything, mock.Anything, mock.Anything, mock.Anything)

assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, []byte{0xAB, 0xCD}, rec.Body.Bytes())
assert.Equal(t, "text/csv", rec.Header().Get("Content-Type"))
assert.Equal(t, "4", rec.Header().Get("Content-Length"))
assert.Equal(t, "deadbeef", rec.Header().Get("X-File-SHA2"))
}

/*
Helpers and mocks
*/
Expand Down
34 changes: 27 additions & 7 deletions internal/pkg/file/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,48 @@ func New(client *elasticsearch.Client, bulker bulk.Bulk, sizeLimit *uint64) *Del
}
}

func (d *Deliverer) FindFileForAgent(ctx context.Context, fileID string, agentID string) (file.MetaDoc, error) {
// returns file Metadata, the search index for the data chunks for the file, and err
func (d *Deliverer) FindFileForAgent(ctx context.Context, fileID string, agentID string) (file.MetaDoc, string, error) {
span, ctx := apm.StartSpan(ctx, "findFile", "process")
defer span.End()
result, err := findFileForAgent(ctx, d.bulker, fileID, agentID)
if err != nil {
return file.MetaDoc{}, err
return file.MetaDoc{}, "", err
}
if result == nil || len(result.Hits) == 0 {
return file.MetaDoc{}, ErrNoFile
return file.MetaDoc{}, "", ErrNoFile
}

var fi file.MetaDoc
if err := json.Unmarshal(result.Hits[0].Source, &fi); err != nil {
return file.MetaDoc{}, fmt.Errorf("file meta doc parsing error: %w", err)
return file.MetaDoc{}, "", fmt.Errorf("file meta doc parsing error: %w", err)
}

return fi, nil
return fi, fmt.Sprintf(FileDataIndexPattern, "*"), nil
}

func (d *Deliverer) LocateChunks(ctx context.Context, zlog zerolog.Logger, fileID string) ([]file.ChunkInfo, error) {
func (d *Deliverer) FindLibraryFile(ctx context.Context, fileID string, integration string, libsrc string) (file.MetaDoc, string, error) {
span, ctx := apm.StartSpan(ctx, "findFile", "process")
defer span.End()
result, err := findFileInLibrary(ctx, d.bulker, fileID, fmt.Sprintf(LibraryFileHeaderIndexPattern, integration, libsrc))
if err != nil {
return file.MetaDoc{}, "", err
}
if len(result) == 0 {
return file.MetaDoc{}, "", ErrNoFile
}

var fi file.MetaDoc
if err := json.Unmarshal(result, &fi); err != nil {
return file.MetaDoc{}, "", fmt.Errorf("file meta doc parsing error: %w", err)
}

return fi, fmt.Sprintf(LibraryFileDataIndexPattern, integration, libsrc), nil
}

func (d *Deliverer) LocateChunks(ctx context.Context, zlog zerolog.Logger, fileID string, dataIndex string) ([]file.ChunkInfo, error) {
// find chunk indices behind alias, doc IDs
infos, err := file.GetChunkInfos(ctx, d.bulker, FileDataIndexPattern, fileID, file.GetChunkInfoOpt{})
infos, err := file.GetChunkInfos(ctx, d.bulker, dataIndex, fileID, file.GetChunkInfoOpt{})
if err != nil {
zlog.Error().Err(err).Msg("problem getting infos")
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/file/delivery/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestFindFile(t *testing.T) {

d := New(nil, fakeBulk, nil)

info, err := d.FindFileForAgent(context.Background(), fileID, agentID)
info, _, err := d.FindFileForAgent(context.Background(), fileID, agentID)
require.NoError(t, err)

assert.NotNil(t, info.File.Hash)
Expand All @@ -94,7 +94,7 @@ func TestFindFileHandlesNoResults(t *testing.T) {

d := New(nil, fakeBulk, nil)

_, err := d.FindFileForAgent(context.Background(), "somefile", "anyagent")
_, _, err := d.FindFileForAgent(context.Background(), "somefile", "anyagent")
assert.ErrorIs(t, ErrNoFile, err)
}

Expand Down Expand Up @@ -134,7 +134,7 @@ func TestLocateChunks(t *testing.T) {

d := New(nil, fakeBulk, nil)

chunks, err := d.LocateChunks(context.Background(), zerolog.Logger{}, baseID)
chunks, err := d.LocateChunks(context.Background(), zerolog.Logger{}, baseID, FileDataIndexPattern)
require.NoError(t, err)

assert.Len(t, chunks, 2)
Expand All @@ -156,7 +156,7 @@ func TestLocateChunksEmpty(t *testing.T) {

d := New(nil, fakeBulk, nil)

_, err := d.LocateChunks(context.Background(), zerolog.Logger{}, "afile")
_, err := d.LocateChunks(context.Background(), zerolog.Logger{}, "afile", FileDataIndexPattern)
assert.Error(t, err)
}

Expand Down
Loading
Loading