Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.37.1
1.37.2
19 changes: 2 additions & 17 deletions lib/logflare/backends/cache_warmer.ex
Original file line number Diff line number Diff line change
@@ -1,31 +1,16 @@
defmodule Logflare.Backends.CacheWarmer do
alias Logflare.Backends
alias Logflare.Repo

use Cachex.Warmer
@impl true
def execute(_state) do
backends =
Backends.list_backends(ingesting: true, limit: 1_000)
|> Repo.preload(:sources)
backends = Backends.list_backends(ingesting: true, limit: 1_000)

get_kv =
for b <- backends do
{{:get_backend, [b.id]}, {:cached, b}}
end

# Group backends by source_id to warm the `{:list_backends, [[source_id: id]]}` keys
# used by the ingestion hot path (dispatch_to_backends, SourceSup.init, SourceSupWorker)
list_by_source_kv =
backends
|> Enum.flat_map(fn b -> Enum.map(b.sources, &{&1.id, b}) end)
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
|> Enum.map(fn {source_id, source_backends} ->
# Strip preloaded sources to match what `list_backends(source_id:)` returns
stripped = Enum.map(source_backends, &%{&1 | sources: %Ecto.Association.NotLoaded{}})
{{:list_backends, [[source_id: source_id]]}, {:cached, stripped}}
end)

{:ok, get_kv ++ list_by_source_kv}
{:ok, get_kv}
end
end
10 changes: 3 additions & 7 deletions lib/logflare/backends/source_sup_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ defmodule Logflare.Backends.SourceSupWorker do
alias Logflare.Sources
alias Logflare.Backends
alias Logflare.Rules
alias Logflare.Sources.Source
alias Logflare.Backends.SourceSup

@default_interval 30_000
Expand All @@ -31,12 +30,9 @@ defmodule Logflare.Backends.SourceSupWorker do

defp do_check(nil), do: :noop

defp do_check(%Source{} = source) do
backends = Backends.Cache.list_backends(source_id: source.id)

rules =
Rules.Cache.list_by_source_id(source.id)
|> Enum.filter(& &1.backend_id)
defp do_check(source) do
backends = Backends.list_backends(source_id: source.id)
rules = Rules.list_rules_with_backend(source)

# start rules source-backends
rules_backend_ids =
Expand Down
10 changes: 1 addition & 9 deletions lib/logflare/rules/cache_warmer.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
defmodule Logflare.Rules.CacheWarmer do
alias Logflare.Repo
alias Logflare.Sources.Source
alias Logflare.Sources.SourceRouter.RulesTree

import Ecto.Query

Expand All @@ -23,13 +22,6 @@ defmodule Logflare.Rules.CacheWarmer do
{{:list_by_source_id, [s.id]}, {:cached, s.rules}}
end

# Also warm `{:rules_tree_by_source_id, [id]}` keys used by the ingestion hot path
tree_entries =
for s <- sources do
tree = RulesTree.build(s.rules)
{{:rules_tree_by_source_id, [s.id]}, {:cached, tree}}
end

{:ok, entries ++ tree_entries}
{:ok, entries}
end
end
9 changes: 0 additions & 9 deletions test/logflare/backends_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,6 @@ defmodule Logflare.BackendsTest do
# one source-backend from rules
insert_pair(:rule, source: source, backend: rule_backend)

# Bust caches so the worker sees newly inserted data
# (in production, WAL-driven cache busting handles this automatically)
Cachex.clear(Backends.Cache)
Cachex.clear(Rules.Cache)

# start an out-of-tree SourceSupWorker
start_supervised({SourceSupWorker, [source: source, interval: 100]})
:timer.sleep(200)
Expand All @@ -665,10 +660,6 @@ defmodule Logflare.BackendsTest do
Logflare.Repo.delete_all(Logflare.Backends.SourcesBackend)
Logflare.Repo.delete_all(Logflare.Backends.Backend)

# Bust caches after deletions
Cachex.clear(Backends.Cache)
Cachex.clear(Rules.Cache)

:timer.sleep(200)
# removal
new_length = Supervisor.which_children(via) |> length()
Expand Down
2 changes: 1 addition & 1 deletion test/logflare/rules/rule_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ defmodule Logflare.Rules.CacheTest do

test "cache warming" do
assert Cachex.warm!(@subject, wait: true) == [Logflare.Rules.CacheWarmer]
assert Cachex.size!(@subject) == 2
assert Cachex.size!(@subject) == 1
end
end
end
Loading