diff --git a/changelog/fragments/1773776832-Add-file-delivery-sources.yaml b/changelog/fragments/1773776832-Add-file-delivery-sources.yaml new file mode 100644 index 0000000000..949754436e --- /dev/null +++ b/changelog/fragments/1773776832-Add-file-delivery-sources.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: Adds file-delivery source routing to integration indices + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Adds an optional query parameter (?source) to the existing file delivery API. When + omitted, files are pulled from the fleet-owned file data streams as before. When used, + fleet server will pull from the calling integration's owned indices. + +# Affected component; a word indicating the component this changeset affects. +component: fleet-server + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/fleet-server/pull/6599 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/api/error.go b/internal/pkg/api/error.go index 00337dc631..708f2268bc 100644 --- a/internal/pkg/api/error.go +++ b/internal/pkg/api/error.go @@ -487,6 +487,24 @@ func NewHTTPErrResp(err error) HTTPErrResp { zerolog.InfoLevel, }, }, + { + ErrClientFileForbidden, + HTTPErrResp{ + http.StatusForbidden, + "ErrClientFileForbidden", + "client forbidden", + zerolog.InfoLevel, + }, + }, + { + ErrFileForDeliveryNotFound, + HTTPErrResp{ + http.StatusNotFound, + "ErrFileForDeliveryNotFound", + "file not found", + zerolog.InfoLevel, + }, + }, { ErrPolicyNotFound, HTTPErrResp{ diff --git a/internal/pkg/api/handleFileDelivery.go b/internal/pkg/api/handleFileDelivery.go index b58337807e..5b4df01bb6 100644 --- a/internal/pkg/api/handleFileDelivery.go +++ b/internal/pkg/api/handleFileDelivery.go @@ -6,19 +6,35 @@ package api import ( "errors" + "fmt" "net/http" "strconv" + "strings" "github.com/rs/zerolog" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/file" "github.com/elastic/fleet-server/v7/internal/pkg/file/delivery" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/go-elasticsearch/v8" ) +var ( + ErrClientFileForbidden = errors.New("agent not authorized for library") + ErrFileForDeliveryNotFound = errors.New("unable to retrieve file") + ErrLibraryFileNotFound = fmt.Errorf("%w from library", ErrFileForDeliveryNotFound) + ErrTargetFileNotFound = fmt.Errorf("%w for agent", ErrFileForDeliveryNotFound) + + // allowlist of clients to use file-library functionality + // prevents arbitrary index reading unless integration opts in + KnownProductOriginFileUsers = map[string]string{ + "endpoint-security": "endpoint", + } +) + type FileDeliveryT struct { bulker bulk.Bulk cache cache.Cache @@ -41,13 +57,33 @@ func (ft *FileDeliveryT) handleSendFile(zlog zerolog.Logger, w http.ResponseWrit return err } - // find file - info, err := ft.deliverer.FindFileForAgent(r.Context(), fileID, agent.Agent.ID) - if err != nil { - return err + // determine storage place for file lookup Can be either in integration libraries ( ?source=X ) OR agent-targeted, fleet-owned stream + var info file.MetaDoc + var idx string + libStorageSrc := strings.TrimSpace(r.URL.Query().Get("source")) + if libStorageSrc != "" { + // determine integration client for library file + clientSrc := strings.ToLower(strings.TrimSpace(r.Header.Get("x-elastic-product-origin"))) + if clientSrc == "" { + return fmt.Errorf("%w: Client not specified", ErrClientFileForbidden) + } + integration, ok := KnownProductOriginFileUsers[clientSrc] + if !ok { + return ErrClientFileForbidden + } + info, idx, err = ft.deliverer.FindLibraryFile(r.Context(), fileID, integration, libStorageSrc) + if err != nil { + return fmt.Errorf("%w: %w", ErrLibraryFileNotFound, err) + } + } else { + // file is not stored in wide-distribution, is limited to intended agents + info, idx, err = ft.deliverer.FindFileForAgent(r.Context(), fileID, agent.Agent.ID) + if err != nil { + return fmt.Errorf("%w: %w", ErrTargetFileNotFound, err) + } } - chunks, err := ft.deliverer.LocateChunks(r.Context(), zlog, fileID) + chunks, err := ft.deliverer.LocateChunks(r.Context(), zlog, fileID, idx) if errors.Is(err, delivery.ErrNoFile) { w.WriteHeader(http.StatusNotFound) return err diff --git a/internal/pkg/api/handleFileDelivery_test.go b/internal/pkg/api/handleFileDelivery_test.go index 1ffa1506b9..7626ad5128 100644 --- a/internal/pkg/api/handleFileDelivery_test.go +++ b/internal/pkg/api/handleFileDelivery_test.go @@ -37,6 +37,8 @@ var ( isFileChunkSearch = mock.MatchedBy(func(idx string) bool { return strings.HasPrefix(idx, fmt.Sprintf(delivery.FileDataIndexPattern, "")) }) + + sampleDocBody_ABCD = hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD") ) func TestFileDeliveryRouteDisallowedMethods(t *testing.T) { @@ -172,7 +174,7 @@ func TestFileDelivery(t *testing.T) { }, nil, ) - tx.Response = sendBodyBytes(hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD")) + tx.Response = sendBodyBytes(sampleDocBody_ABCD) hr.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/fleet/file/X", nil)) @@ -310,7 +312,7 @@ func TestFileDeliverySetsHeaders(t *testing.T) { }, }, nil, ) - tx.Response = sendBodyBytes(hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD")) + tx.Response = sendBodyBytes(sampleDocBody_ABCD) hr.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/fleet/file/X", nil)) @@ -371,7 +373,7 @@ func TestFileDeliverySetsHashWhenPresent(t *testing.T) { }, }, nil, ) - tx.Response = sendBodyBytes(hexDecode("A7665F696E64657878212E666C6565742D66696C6564656C69766572792D646174612D656E64706F696E74635F69646578797A2E30685F76657273696F6E01675F7365715F6E6F016D5F7072696D6172795F7465726D0165666F756E64F5666669656C6473A164646174618142ABCD")) + tx.Response = sendBodyBytes(sampleDocBody_ABCD) hr.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/api/fleet/file/X", nil)) @@ -379,6 +381,122 @@ func TestFileDeliverySetsHashWhenPresent(t *testing.T) { assert.Equal(t, "deadbeef", rec.Header().Get("X-File-SHA2")) } +func TestFileLibraryDeliveryStopsEmptyClients(t *testing.T) { + hr, _, _, _ := prepareFileDeliveryMock(t) + rec := httptest.NewRecorder() + + req := httptest.NewRequest(http.MethodGet, "/api/fleet/file/X?source=foo", nil) + req.Header.Del("X-elastic-product-origin") + hr.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusForbidden, rec.Code) +} + +func TestFileLibraryDeliveryStopsInvalidClients(t *testing.T) { + hr, _, _, _ := prepareFileDeliveryMock(t) + rec := httptest.NewRecorder() + + req := httptest.NewRequest(http.MethodGet, "/api/fleet/file/X?source=foo", nil) + req.Header.Add("X-elastic-product-origin", "bar") + hr.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusForbidden, rec.Code) +} + +func TestFileLibraryDeliveryAllowsValidClients(t *testing.T) { + hr, _, tx, bulk := prepareFileDeliveryMock(t) + rec := httptest.NewRecorder() + + libName := "foolib" + dataIndex := fmt.Sprintf(delivery.LibraryFileDataIndexPattern, "endpoint", libName) + + bulk.On("Read", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + []byte(`{}`), nil, + ) + bulk.On("Search", mock.Anything, dataIndex, mock.Anything, mock.Anything, mock.Anything).Return( + &es.ResultT{ + HitsT: es.HitsT{ + Hits: []es.HitT{ + { + ID: "X.0", + SeqNo: 1, + Version: 1, + Index: fmt.Sprintf(delivery.LibraryFileDataIndexPattern, "endpoint", libName), + Fields: map[string]interface{}{ + file.FieldBaseID: []interface{}{"X"}, + file.FieldLast: []interface{}{true}, + }, + }, + }, + }, + }, nil, + ) + tx.Response = sendBodyBytes(sampleDocBody_ABCD) + + req := httptest.NewRequest(http.MethodGet, "/api/fleet/file/X?source="+libName, nil) + req.Header.Set("X-elastic-product-origin", "endpoint-security") + hr.ServeHTTP(rec, req) + + bulk.AssertCalled(t, "Read", mock.Anything, mock.Anything, mock.Anything, mock.Anything) + + assert.Equal(t, http.StatusOK, rec.Code) +} + +func TestFileLibraryDelivery(t *testing.T) { + hr, _, tx, bulk := prepareFileDeliveryMock(t) + rec := httptest.NewRecorder() + + libName := "script" + dataIndex := fmt.Sprintf(delivery.LibraryFileDataIndexPattern, "endpoint", libName) + + bulk.On("Read", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + []byte(`{ + "file": { + "created": "2023-06-05T15:23:37.499Z", + "Status": "READY", + "Updated": "2023-06-05T15:23:37.499Z", + "name": "somefile.csv", + "mime_type": "text/csv", + "size": 4, + "hash": { + "sha256": "deadbeef" + } + } + }`), nil, + ) + bulk.On("Search", mock.Anything, dataIndex, mock.Anything, mock.Anything, mock.Anything).Return( + &es.ResultT{ + HitsT: es.HitsT{ + Hits: []es.HitT{ + { + ID: "X.0", + SeqNo: 1, + Version: 1, + Index: fmt.Sprintf(delivery.LibraryFileDataIndexPattern, "endpoint", libName), + Fields: map[string]interface{}{ + file.FieldBaseID: []interface{}{"X"}, + file.FieldLast: []interface{}{true}, + }, + }, + }, + }, + }, nil, + ) + tx.Response = sendBodyBytes(sampleDocBody_ABCD) + + req := httptest.NewRequest(http.MethodGet, "/api/fleet/file/X?source="+libName, nil) + req.Header.Set("X-elastic-product-origin", "endpoint-security") + hr.ServeHTTP(rec, req) + + bulk.AssertCalled(t, "Read", mock.Anything, mock.Anything, mock.Anything, mock.Anything) + + assert.Equal(t, http.StatusOK, rec.Code) + assert.Equal(t, []byte{0xAB, 0xCD}, rec.Body.Bytes()) + assert.Equal(t, "text/csv", rec.Header().Get("Content-Type")) + assert.Equal(t, "4", rec.Header().Get("Content-Length")) + assert.Equal(t, "deadbeef", rec.Header().Get("X-File-SHA2")) +} + /* Helpers and mocks */ diff --git a/internal/pkg/file/delivery/delivery.go b/internal/pkg/file/delivery/delivery.go index 6956f5d2c7..f9e909c765 100644 --- a/internal/pkg/file/delivery/delivery.go +++ b/internal/pkg/file/delivery/delivery.go @@ -39,28 +39,48 @@ func New(client *elasticsearch.Client, bulker bulk.Bulk, sizeLimit *uint64) *Del } } -func (d *Deliverer) FindFileForAgent(ctx context.Context, fileID string, agentID string) (file.MetaDoc, error) { +// returns file Metadata, the search index for the data chunks for the file, and err +func (d *Deliverer) FindFileForAgent(ctx context.Context, fileID string, agentID string) (file.MetaDoc, string, error) { span, ctx := apm.StartSpan(ctx, "findFile", "process") defer span.End() result, err := findFileForAgent(ctx, d.bulker, fileID, agentID) if err != nil { - return file.MetaDoc{}, err + return file.MetaDoc{}, "", err } if result == nil || len(result.Hits) == 0 { - return file.MetaDoc{}, ErrNoFile + return file.MetaDoc{}, "", ErrNoFile } var fi file.MetaDoc if err := json.Unmarshal(result.Hits[0].Source, &fi); err != nil { - return file.MetaDoc{}, fmt.Errorf("file meta doc parsing error: %w", err) + return file.MetaDoc{}, "", fmt.Errorf("file meta doc parsing error: %w", err) } - return fi, nil + return fi, fmt.Sprintf(FileDataIndexPattern, "*"), nil } -func (d *Deliverer) LocateChunks(ctx context.Context, zlog zerolog.Logger, fileID string) ([]file.ChunkInfo, error) { +func (d *Deliverer) FindLibraryFile(ctx context.Context, fileID string, integration string, libsrc string) (file.MetaDoc, string, error) { + span, ctx := apm.StartSpan(ctx, "findFile", "process") + defer span.End() + result, err := findFileInLibrary(ctx, d.bulker, fileID, fmt.Sprintf(LibraryFileHeaderIndexPattern, integration, libsrc)) + if err != nil { + return file.MetaDoc{}, "", err + } + if len(result) == 0 { + return file.MetaDoc{}, "", ErrNoFile + } + + var fi file.MetaDoc + if err := json.Unmarshal(result, &fi); err != nil { + return file.MetaDoc{}, "", fmt.Errorf("file meta doc parsing error: %w", err) + } + + return fi, fmt.Sprintf(LibraryFileDataIndexPattern, integration, libsrc), nil +} + +func (d *Deliverer) LocateChunks(ctx context.Context, zlog zerolog.Logger, fileID string, dataIndex string) ([]file.ChunkInfo, error) { // find chunk indices behind alias, doc IDs - infos, err := file.GetChunkInfos(ctx, d.bulker, FileDataIndexPattern, fileID, file.GetChunkInfoOpt{}) + infos, err := file.GetChunkInfos(ctx, d.bulker, dataIndex, fileID, file.GetChunkInfoOpt{}) if err != nil { zlog.Error().Err(err).Msg("problem getting infos") return nil, err diff --git a/internal/pkg/file/delivery/delivery_test.go b/internal/pkg/file/delivery/delivery_test.go index 62d544ab3f..9210b8415e 100644 --- a/internal/pkg/file/delivery/delivery_test.go +++ b/internal/pkg/file/delivery/delivery_test.go @@ -70,7 +70,7 @@ func TestFindFile(t *testing.T) { d := New(nil, fakeBulk, nil) - info, err := d.FindFileForAgent(context.Background(), fileID, agentID) + info, _, err := d.FindFileForAgent(context.Background(), fileID, agentID) require.NoError(t, err) assert.NotNil(t, info.File.Hash) @@ -94,7 +94,7 @@ func TestFindFileHandlesNoResults(t *testing.T) { d := New(nil, fakeBulk, nil) - _, err := d.FindFileForAgent(context.Background(), "somefile", "anyagent") + _, _, err := d.FindFileForAgent(context.Background(), "somefile", "anyagent") assert.ErrorIs(t, ErrNoFile, err) } @@ -134,7 +134,7 @@ func TestLocateChunks(t *testing.T) { d := New(nil, fakeBulk, nil) - chunks, err := d.LocateChunks(context.Background(), zerolog.Logger{}, baseID) + chunks, err := d.LocateChunks(context.Background(), zerolog.Logger{}, baseID, FileDataIndexPattern) require.NoError(t, err) assert.Len(t, chunks, 2) @@ -156,7 +156,7 @@ func TestLocateChunksEmpty(t *testing.T) { d := New(nil, fakeBulk, nil) - _, err := d.LocateChunks(context.Background(), zerolog.Logger{}, "afile") + _, err := d.LocateChunks(context.Background(), zerolog.Logger{}, "afile", FileDataIndexPattern) assert.Error(t, err) } diff --git a/internal/pkg/file/delivery/es.go b/internal/pkg/file/delivery/es.go index 174fec85f9..4e6b5e59cb 100644 --- a/internal/pkg/file/delivery/es.go +++ b/internal/pkg/file/delivery/es.go @@ -20,10 +20,14 @@ import ( ) const ( - // integration name is substituted in + // Agent-targeted ephemeral files. Integration name is substituted in FileHeaderIndexPattern = ".fleet-fileds-tohost-meta-%s" FileDataIndexPattern = ".fleet-fileds-tohost-data-%s" + // Long-lived library files, owned by integrations. Integration & target library substituted in + LibraryFileHeaderIndexPattern = "%s-fleetfiles-%s-meta" + LibraryFileDataIndexPattern = "%s-fleetfiles-%s-data" + FieldDocID = "_id" FieldTargetAgents = "file.Meta.target_agents" FieldStatus = "file.Status" @@ -63,6 +67,16 @@ func findFileForAgent(ctx context.Context, bulker bulk.Bulk, fileID string, agen return result, nil } +func findFileInLibrary(ctx context.Context, bulker bulk.Bulk, fileID string, index string) ([]byte, error) { + span, ctx := apm.StartSpan(ctx, "searchFile", "search") + defer span.End() + result, err := bulker.Read(ctx, index, fileID) + if err != nil { + return nil, err + } + return result, nil +} + func readChunkStream(ctx context.Context, client *elasticsearch.Client, idx string, docID string) (io.ReadCloser, error) { span, ctx := apm.StartSpan(ctx, "getChunk", "get") span.Context.SetLabel("index", idx) diff --git a/internal/pkg/file/es.go b/internal/pkg/file/es.go index ec8f0a0562..cbf8a3799c 100644 --- a/internal/pkg/file/es.go +++ b/internal/pkg/file/es.go @@ -76,7 +76,7 @@ func GetMetadata(ctx context.Context, bulker bulk.Bulk, indexPattern string, upl return nil, err } - return res.HitsT.Hits, nil + return res.Hits, nil } // Retrieves a file Metadata as an Info object @@ -137,7 +137,7 @@ type GetChunkInfoOpt struct { // the chunk's ordered index position (Pos) is also parsed from the document ID. // Optionally adding the calculated field "size", that is the length, in bytes, of the Data field. // and optionally validating that a hash field is present -func GetChunkInfos(ctx context.Context, bulker bulk.Bulk, indexPattern string, baseID string, opt GetChunkInfoOpt) ([]ChunkInfo, error) { +func GetChunkInfos(ctx context.Context, bulker bulk.Bulk, index string, baseID string, opt GetChunkInfoOpt) ([]ChunkInfo, error) { span, ctx := apm.StartSpan(ctx, "getChunksInfo", "process") defer span.End() tpl := QueryChunkInfo @@ -152,13 +152,13 @@ func GetChunkInfos(ctx context.Context, bulker bulk.Bulk, indexPattern string, b } bSpan, bCtx := apm.StartSpan(ctx, "searchChunksInfo", "search") - res, err := bulker.Search(bCtx, fmt.Sprintf(indexPattern, "*"), query) + res, err := bulker.Search(bCtx, index, query) bSpan.End() if err != nil { return nil, err } - chunks := make([]ChunkInfo, len(res.HitsT.Hits)) + chunks := make([]ChunkInfo, len(res.Hits)) var ( bid string @@ -170,7 +170,7 @@ func GetChunkInfos(ctx context.Context, bulker bulk.Bulk, indexPattern string, b vSpan, _ := apm.StartSpan(ctx, "validateChunksInfo", "validate") defer vSpan.End() - for i, h := range res.HitsT.Hits { + for i, h := range res.Hits { if bid, ok = getResultsFieldString(h.Fields, FieldBaseID); !ok { return nil, fmt.Errorf("unable to retrieve %s field from chunk document", FieldBaseID) } @@ -211,7 +211,7 @@ func getResultField(fields map[string]interface{}, key string) (interface{}, boo if !ok { return nil, false } - if array == nil || len(array) < 1 { + if len(array) < 1 { return nil, false } return array[0], true diff --git a/internal/pkg/file/uploader/finalize.go b/internal/pkg/file/uploader/finalize.go index b474674e84..8d3a0f64d2 100644 --- a/internal/pkg/file/uploader/finalize.go +++ b/internal/pkg/file/uploader/finalize.go @@ -49,7 +49,7 @@ func (u *Uploader) Complete(ctx context.Context, id string, transitHash string) return info, fmt.Errorf("unable to refresh chunk data index: %w", err) } - chunks, err := file.GetChunkInfos(ctx, u.bulker, UploadDataIndexPattern, info.DocID, file.GetChunkInfoOpt{IncludeSize: true, RequireHash: true}) + chunks, err := file.GetChunkInfos(ctx, u.bulker, fmt.Sprintf(UploadDataIndexPattern, info.Source), info.DocID, file.GetChunkInfoOpt{IncludeSize: true, RequireHash: true}) if err != nil { return info, err } diff --git a/internal/pkg/file/uploader/finalize_test.go b/internal/pkg/file/uploader/finalize_test.go index 051581b290..507cf51ee4 100644 --- a/internal/pkg/file/uploader/finalize_test.go +++ b/internal/pkg/file/uploader/finalize_test.go @@ -94,9 +94,9 @@ func TestUploadCompletePerformsRefreshBeforeChunkSearch(t *testing.T) { fakeBulk.On("Search", mock.MatchedBy(func(_ context.Context) bool { return true }), // match context.Context - ".fleet-fileds-fromhost-data-*", // *DATA* (chunk) search - mock.Anything, // query bytes - mock.Anything, // bulk opts + ".fleet-fileds-fromhost-data-"+fakeIntegrationSrc, // *DATA* (chunk) search + mock.Anything, // query bytes + mock.Anything, // bulk opts ).Run(func(args mock.Arguments) { // runs during execution, before return