Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dialyzer_ignore.exs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
{"lib/logflare_web/controllers/stripe_controller.ex", :pattern_match_cov},
{"lib/logflare_web/live/billingaccount_live/payment_method_component.ex", :call},
{"lib/logflare_web/live/monaco_editor_component.ex", :pattern_match},
{"lib/logflare_web/live/endpoints/endpoints_live.ex", :unused_fun},
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to get test typings to pass.

{"lib/logflare_web/live/query_live.ex", :pattern_match},
{"lib/logflare_web/live/query_live.ex", :no_return},
{"lib/logflare_web/live/search_live/logs_search_lv.ex", :pattern_match},
Expand Down
31 changes: 17 additions & 14 deletions lib/logflare/backends/adaptor/postgres_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,23 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do

def execute_query(%Backend{} = backend, {query_string, params}, _opts)
when is_non_empty_binary(query_string) and is_list(params) do
{:ok, result} =
SharedRepo.with_repo(backend, fn ->
SharedRepo.query(query_string, params)
end)

rows =
for row <- result.rows do
result.columns
|> Enum.zip(row)
|> Map.new()
|> nested_map_update()
end

{:ok, rows}
case SharedRepo.with_repo(backend, fn ->
SharedRepo.query(query_string, params)
end) do
{:ok, result} ->
rows =
for row <- result.rows do
result.columns
|> Enum.zip(row)
|> Map.new()
|> nested_map_update()
end

{:ok, rows}

{:error, reason} ->
{:error, reason}
end
end

def execute_query(%Backend{} = backend, {query_string, declared_params, input_params}, opts)
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/endpoints.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule Logflare.Endpoints do
@valid_sql_languages ~w(bq_sql ch_sql pg_sql)a

@typep language :: :bq_sql | :ch_sql | :pg_sql | :lql
@typep run_query_return :: {:ok, %{rows: [map()]}} | {:error, String.t()}
@typep run_query_return :: {:ok, %{rows: [map()]}} | {:error, term()}

defguardp is_integer_or_string(value) when is_integer(value) or is_non_empty_binary(value)

Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/endpoints/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ defmodule Logflare.Endpoints.Query do

def update_source_mapping(changeset), do: changeset

@spec map_backend_to_language(Backend.t(), supabase_mode :: boolean()) ::
@spec map_backend_to_language(Backend.t() | nil, supabase_mode :: boolean()) ::
:bq_sql | :ch_sql | :pg_sql
def map_backend_to_language(%Backend{type: :clickhouse}, _supabase_mode), do: :ch_sql
def map_backend_to_language(%Backend{type: :postgres}, false), do: :pg_sql
Expand Down
47 changes: 47 additions & 0 deletions lib/logflare_web/components/query_components.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,56 @@
defmodule LogflareWeb.QueryComponents do
use Phoenix.Component

import Phoenix.HTML.Form
import LogflareWeb.ErrorHelpers

alias Logflare.Backends.Backend
alias LogflareWeb.Utils
alias Phoenix.LiveView.JS

attr :backends, :list, required: true
attr :form, :any, required: true
attr :field, :atom, default: :backend_id
attr :show_language, :boolean, default: true
attr :label, :string, default: nil

slot :help

def backend_select(assigns) do
assigns =
assign_new(assigns, :options, fn ->
[{"Default (BigQuery)", nil}] ++
Enum.map(assigns.backends, fn backend ->
{"#{backend.name} (#{backend.type})", backend.id}
end)
end)

~H"""
<div class="form-group">
<label :if={@label}>{@label}</label>
<small :if={render_slot(@help)} class="form-text text-muted">{render_slot(@help)}</small>
<select id={@form[@field].id} name={@form[@field].name} class="form-control">
{options_for_select(@options, @form[@field].value)}
</select>
{error_tag(@form, @field)}
<div :if={@show_language} class="tw-mt-2">
<strong>Query Language: <span id="query-language">{format_query_language(@form[@field].value, @backends)}</span></strong>
</div>
</div>
"""
end

defp format_query_language(nil, _backends), do: "BigQuery SQL"

defp format_query_language(backend_id, backends) do
backend = Enum.find(backends, &(to_string(&1.id) == to_string(backend_id)))
format_backend_language(backend)
end

defp format_backend_language(%Backend{type: :clickhouse}), do: "ClickHouse SQL"
defp format_backend_language(%Backend{type: :postgres}), do: "Postgres SQL"
defp format_backend_language(_), do: "BigQuery SQL"

attr :bytes, :integer, default: nil

def query_cost(assigns) do
Expand Down
74 changes: 58 additions & 16 deletions lib/logflare_web/controllers/api/query_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ defmodule LogflareWeb.Api.QueryController do
use OpenApiSpex.ControllerSpecs

alias Logflare.Alerting
alias Logflare.Backends
alias Logflare.Backends.Backend
alias Logflare.Endpoints
alias Logflare.Endpoints.Query
alias Logflare.Sql
alias Logflare.User
alias LogflareWeb.OpenApi.BadRequest
alias LogflareWeb.OpenApi.List
alias LogflareWeb.OpenApi.One
alias LogflareWeb.OpenApiSchemas.QueryParseResult
alias LogflareWeb.OpenApiSchemas.QueryResult
alias Logflare.Sql

action_fallback(LogflareWeb.Api.FallbackController)

Expand Down Expand Up @@ -75,7 +79,7 @@ defmodule LogflareWeb.Api.QueryController do
parameters: [
sql: [
in: :query,
description: "BigQuery SQL string, alias for bq_sql",
description: "SQL string",
type: :string,
required: false
],
Expand All @@ -99,33 +103,71 @@ defmodule LogflareWeb.Api.QueryController do
type: :string,
required: false,
example: "select current_date() as 'test'"
],
backend_id: [
in: :query,
description:
"Backend ID to execute the query against. When provided with sql=, the language is inferred from the backend type.",
type: :integer,
required: false
]
],
responses: %{200 => List.response(QueryResult)}
)

def query(conn, %{"sql" => sql}), do: query(conn, %{"bq_sql" => sql})

def query(%{assigns: %{user: user}} = conn, %{"bq_sql" => sql}) do
with {:ok, %{rows: rows}} <- Endpoints.run_query_string(user, {:bq_sql, sql}) do
def query(%{assigns: %{user: user}} = conn, params) do
with {:ok, language, sql} <- extract_query(params),
{:ok, backend} <- fetch_backend(user, params),
language = resolve_language(language, backend),
opts = build_query_opts(backend),
{:ok, %{rows: rows}} <- Endpoints.run_query_string(user, {language, sql}, opts) do
json(conn, %{result: rows})
end
end

def query(%{assigns: %{user: user}} = conn, %{"ch_sql" => sql}) do
with {:ok, %{rows: rows}} <- Endpoints.run_query_string(user, {:ch_sql, sql}) do
json(conn, %{result: rows})
end
@spec extract_query(map()) :: {:ok, atom(), String.t()} | {:error, String.t()}
defp extract_query(%{"sql" => sql}), do: {:ok, :infer, sql}
defp extract_query(%{"bq_sql" => sql}), do: {:ok, :bq_sql, sql}
defp extract_query(%{"ch_sql" => sql}), do: {:ok, :ch_sql, sql}
defp extract_query(%{"pg_sql" => sql}), do: {:ok, :pg_sql, sql}

defp extract_query(_) do
{:error,
"No query params provided. Supported query params are sql=, bq_sql=, ch_sql=, and pg_sql="}
end

def query(%{assigns: %{user: user}} = conn, %{"pg_sql" => sql}) do
with {:ok, %{rows: rows}} <- Endpoints.run_query_string(user, {:pg_sql, sql}) do
json(conn, %{result: rows})
@spec resolve_language(atom(), Backend.t() | nil) :: atom()
defp resolve_language(:infer, backend), do: Query.map_backend_to_language(backend, false)
defp resolve_language(language, _backend), do: language

@spec fetch_backend(User.t(), map()) :: {:ok, Backend.t() | nil} | {:error, String.t()}
defp fetch_backend(_user, %{"backend_id" => backend_id}) when backend_id in [nil, ""],
do: {:ok, nil}

defp fetch_backend(user, %{"backend_id" => backend_id}) when is_binary(backend_id) do
case Integer.parse(backend_id) do
{id, ""} -> fetch_backend(user, %{"backend_id" => id})
_ -> {:error, "Invalid backend_id: must be an integer"}
end
end

def query(_conn, _params) do
{:error,
"No query params provided. Supported query params are sql=, bq_sql=, ch_sql=, and pg_sql="}
defp fetch_backend(user, %{"backend_id" => backend_id}) when is_integer(backend_id) do
case Backends.get_backend(backend_id) do
%Backend{user_id: user_id} = backend when user_id == user.id ->
if Backends.Adaptor.can_query?(backend) do
{:ok, backend}
else
{:error, "Backend does not support querying"}
end

_ ->
{:error, "Backend not found"}
end
end

defp fetch_backend(_user, _params), do: {:ok, nil}

@spec build_query_opts(Backend.t() | nil) :: keyword()
defp build_query_opts(nil), do: []
defp build_query_opts(%Backend{id: id}), do: [backend_id: id]
end
15 changes: 3 additions & 12 deletions lib/logflare_web/live/endpoints/components/endpoint_form.html.heex
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,9 @@
</small>
</div>

<div :if={@show_backend_selection} class="form-group">
{label(f, :backend_id, "Backend (optional)")}
{select(f, :backend_id, [{"Default (BigQuery)", nil}] ++ Enum.map(@backends, fn backend -> {"#{backend.name} (#{backend.type})", backend.id} end),
class: "form-control",
"phx-update": "ignore"
)}
{error_tag(f, :backend_id)}
<small class="form-text text-muted">
Choose which backend to execute this endpoint query against. Defaults to BigQuery if no backend is selected. <br />
<strong>Query Language: <span id="query-language">{format_query_language(@determined_language)}</span></strong>
</small>
</div>
<QueryComponents.backend_select :if={@show_backend_selection} backends={@backends} form={f} label="Backend (optional)">
<:help>Choose which backend to execute this endpoint query against. Defaults to BigQuery if no backend is selected.</:help>
</QueryComponents.backend_select>

<.header_with_anchor text="Query" />
<section class="tw-flex tw-flex-col">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@
</.inputs_for>
{submit("Test query", class: "btn btn-secondary")}
</.form>
<div :if={@query_error_message} class="tw-mt-2">
<.alert variant="danger">
<strong>Query error!</strong>
<br />
<span>{@query_error_message}</span>
</.alert>
</div>
29 changes: 13 additions & 16 deletions lib/logflare_web/live/endpoints/endpoints_live.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ defmodule LogflareWeb.EndpointsLive do
|> refresh_endpoints()
|> assign(:query_result_rows, nil)
|> assign(:total_bytes_processed, nil)
|> assign(:query_error_message, nil)
|> assign(:show_endpoint, nil)
|> assign(:endpoint_changeset, Endpoints.change_query(%Endpoints.Query{}))
|> assign(:selected_backend_id, nil)
Expand Down Expand Up @@ -121,6 +122,7 @@ defmodule LogflareWeb.EndpointsLive do
socket
|> assign(:query_result_rows, nil)
|> assign(:total_bytes_processed, nil)
|> assign(:query_error_message, nil)
else
socket
end
Expand All @@ -131,6 +133,7 @@ defmodule LogflareWeb.EndpointsLive do
|> refresh_endpoints()
|> assign(:endpoint_changeset, nil)
|> assign(:query_result_rows, nil)
|> assign(:query_error_message, nil)

%{assigns: %{live_action: :new}} = socket ->
params =
Expand All @@ -157,6 +160,7 @@ defmodule LogflareWeb.EndpointsLive do
# reset test results
|> assign(:query_result_rows, nil)
|> assign(:redact_pii, false)
|> assign(:query_error_message, nil)
end)

{:noreply, socket}
Expand Down Expand Up @@ -238,27 +242,21 @@ defmodule LogflareWeb.EndpointsLive do
redact_pii: redact_pii,
backend_id: backend_id
) do
{:ok, %{rows: rows, total_bytes_processed: total_bytes_processed}} ->
{:noreply,
socket
|> put_flash(:info, "Ran query successfully")
|> assign(:prev_params, query_params)
|> assign(:query_result_rows, rows)
|> assign(:total_bytes_processed, total_bytes_processed)}
{:ok, %{rows: rows} = result} ->
total_bytes_or_nil = Map.get(result, :total_bytes_processed)

{:ok, %{rows: rows}} ->
# non-BQ results
{:noreply,
socket
|> put_flash(:info, "Ran query successfully")
|> assign(:prev_params, query_params)
|> assign(:query_result_rows, rows)
|> assign(:total_bytes_processed, nil)}
|> assign(:total_bytes_processed, total_bytes_or_nil)
|> assign(:query_error_message, nil)}

{:error, err} ->
{:noreply,
socket
|> put_flash(:error, "Error occured when running query: #{inspect(err)}")}
|> assign(:query_error_message, format_query_error(err))}
end
end

Expand Down Expand Up @@ -367,6 +365,10 @@ defmodule LogflareWeb.EndpointsLive do
{:noreply, socket}
end

defp format_query_error(error) do
if is_binary(error), do: error, else: inspect(error)
end

def handle_info({:query_string_updated, query_string}, socket) do
endpoint_language = get_current_endpoint_language(socket)

Expand Down Expand Up @@ -471,11 +473,6 @@ defmodule LogflareWeb.EndpointsLive do
select timestamp, event_message from YourApp.SourceName
"""

defp format_query_language(:bq_sql), do: "BigQuery SQL"
defp format_query_language(:ch_sql), do: "ClickHouse SQL"
defp format_query_language(:pg_sql), do: "Postgres SQL"
defp format_query_language(language), do: language |> to_string() |> String.upcase()

defp maybe_redact_query(query, redact_pii) when is_binary(query) do
if redact_pii do
PiiRedactor.redact_pii_from_value(query)
Expand Down
Loading
Loading