From e9f238b0a5507e3383cc77cc651a4f6274814135 Mon Sep 17 00:00:00 2001 From: Evan O'Brien Date: Thu, 11 Sep 2025 19:18:22 +0200 Subject: [PATCH 1/5] Use AWS Redshift Data API --- lib/redshift_ecto.ex | 47 ++--------- lib/redshift_ecto/connection.ex | 30 +++++-- lib/redshift_ecto/data_api.ex | 139 ++++++++++++++++++++++++++++++++ lib/redshift_ecto/query.ex | 21 +++++ mix.exs | 3 +- 5 files changed, 191 insertions(+), 49 deletions(-) create mode 100644 lib/redshift_ecto/data_api.ex create mode 100644 lib/redshift_ecto/query.ex diff --git a/lib/redshift_ecto.ex b/lib/redshift_ecto.ex index 231a092..eaf5262 100644 --- a/lib/redshift_ecto.ex +++ b/lib/redshift_ecto.ex @@ -2,8 +2,9 @@ defmodule RedshiftEcto do @moduledoc """ Ecto adapter for [AWS Redshift](https://aws.amazon.com/redshift/). - It uses `Postgrex` for communicating to the database and a connection pool, - such as `DBConnection.Poolboy`. + It communicates with Redshift through the AWS Redshift Data API via the + [`AWS.RedshiftData`](https://hexdocs.pm/aws/AWS.RedshiftData.html) module. + Connections are managed using `DBConnection`. This adapter is based on Ecto's builtin `Ecto.Adapters.Postgres` adapter. It delegates some functions to it but changes the implementation of most that @@ -203,43 +204,9 @@ defmodule RedshiftEcto do ## Helpers defp run_query(sql, opts) do - {:ok, _} = Application.ensure_all_started(:postgrex) - - opts = - opts - |> Keyword.drop([:name, :log]) - |> Keyword.put(:pool, DBConnection.Connection) - |> Keyword.put(:backoff_type, :stop) - - {:ok, pid} = Task.Supervisor.start_link() - - task = - Task.Supervisor.async_nolink(pid, fn -> - {:ok, conn} = Postgrex.start_link(opts) - - value = RedshiftEcto.Connection.execute(conn, sql, [], opts) - GenServer.stop(conn) - value - end) - - timeout = Keyword.get(opts, :timeout, 15_000) - - case Task.yield(task, timeout) || Task.shutdown(task) do - {:ok, {:ok, result}} -> - {:ok, result} - - {:ok, {:error, error}} -> - {:error, error} - - {:exit, {%{__struct__: struct} = error, _}} - when struct in [Postgrex.Error, DBConnection.Error] -> - {:error, error} - - {:exit, reason} -> - {:error, RuntimeError.exception(Exception.format_exit(reason))} - - nil -> - {:error, RuntimeError.exception("command timed out")} - end + {:ok, conn} = RedshiftEcto.DataAPI.start_link(opts) + result = RedshiftEcto.DataAPI.execute(conn, sql, []) + GenServer.stop(conn) + result end end diff --git a/lib/redshift_ecto/connection.ex b/lib/redshift_ecto/connection.ex index 8bf9784..8118352 100644 --- a/lib/redshift_ecto/connection.ex +++ b/lib/redshift_ecto/connection.ex @@ -3,6 +3,7 @@ if Code.ensure_loaded?(Postgrex) do @moduledoc false alias Ecto.Adapters.Postgres.Connection, as: Postgres + alias RedshiftEcto.DataAPI @default_port 5439 @behaviour Ecto.Adapters.SQL.Connection @@ -10,20 +11,33 @@ if Code.ensure_loaded?(Postgrex) do ## Module and Options def child_spec(opts) do - opts - |> Keyword.put_new(:port, @default_port) - |> Keyword.put_new(:types, Ecto.Adapters.Postgres.TypeModule) - |> Postgrex.child_spec() + opts = Keyword.put_new(opts, :port, @default_port) + DBConnection.child_spec(DataAPI, opts) end # constraints may be defined but are not enforced by Amazon Redshift - def to_constraints(%Postgrex.Error{}), do: [] + # The Data API does not provide detailed constraint errors + def to_constraints(_), do: [] ## Query - defdelegate prepare_execute(conn, name, sql, params, opts), to: Postgres - defdelegate execute(conn, sql_or_query, params, opts), to: Postgres - defdelegate stream(conn, sql, params, opts), to: Postgres + def prepare_execute(conn, _name, sql, params, opts) do + query = %RedshiftEcto.Query{statement: sql} + DBConnection.prepare_execute(conn, query, params, opts) + end + + def execute(conn, %RedshiftEcto.Query{} = query, params, opts) do + DBConnection.execute(conn, query, params, opts) + end + + def execute(conn, sql, params, opts) when is_binary(sql) do + query = %RedshiftEcto.Query{statement: sql} + DBConnection.execute(conn, query, params, opts) + end + + def stream(_conn, _sql, _params, _opts) do + raise "stream is not supported by Redshift Data API" + end alias Ecto.Query alias Ecto.Query.{BooleanExpr, JoinExpr, QueryExpr} diff --git a/lib/redshift_ecto/data_api.ex b/lib/redshift_ecto/data_api.ex new file mode 100644 index 0000000..e0140b9 --- /dev/null +++ b/lib/redshift_ecto/data_api.ex @@ -0,0 +1,139 @@ +defmodule RedshiftEcto.DataAPI do + @moduledoc """ + Thin wrapper around `AWS.RedshiftData` used by the adapter to run SQL + statements through the Redshift Data API. The module provides a synchronous + interface that executes a statement and waits for the result before + returning. + """ + + use DBConnection + + alias AWS.RedshiftData + alias RedshiftEcto.Query + + # Client API -------------------------------------------------------------- + + @doc """ + Start a DBConnection process for the Data API. + """ + def start_link(opts) do + DBConnection.start_link(__MODULE__, opts) + end + + @doc """ + Execute `sql` with the given `params` using the connection. + """ + def execute(conn, sql, params, opts \\ []) do + timeout = Keyword.get(opts, :timeout, 15_000) + query = %Query{statement: sql} + DBConnection.execute(conn, query, params, timeout: timeout) + end + + # DBConnection callbacks -------------------------------------------------- + + @impl true + def connect(opts) do + client_opts = + opts + |> Keyword.take([:access_key_id, :secret_access_key, :region, :endpoint]) + + {:ok, %{client: AWS.Client.create(client_opts), opts: opts}} + end + + @impl true + def handle_execute(%Query{statement: sql} = query, params, _opts, state) do + sql = interpolate(sql, params) + + request = + state.opts + |> Keyword.take([:cluster_identifier, :database, :db_user, :secret_arn, :workgroup_name]) + |> Map.new() + |> Map.put(:sql, sql) + + case RedshiftData.execute_statement(state.client, request) do + {:ok, %{body: %{"Id" => id}}} -> + case await_result(state.client, id) do + {:ok, result} -> {:ok, query, result, state} + {:error, reason} -> {:error, reason, state} + end + + {:error, reason} -> + {:error, reason, state} + end + end + + @impl true + def disconnect(_err, _state), do: :ok + + @impl true + def handle_prepare(%Query{} = query, _opts, state) do + {:ok, query, state} + end + + @impl true + def handle_close(_query, _opts, state) do + {:ok, nil, state} + end + + # Internal helpers ------------------------------------------------------- + + defp await_result(client, id) do + Stream.repeatedly(fn -> :ok end) + |> Enum.reduce_while(nil, fn _, _ -> + case RedshiftData.describe_statement(client, %{id: id}) do + {:ok, %{body: %{"Status" => "FINISHED"}}} -> + {:halt, fetch_result(client, id)} + + {:ok, %{body: %{"Status" => status}}} when status in ["STARTED", "SUBMITTED", "PICKED"] -> + Process.sleep(200) + {:cont, nil} + + {:ok, %{body: %{"Error" => error}}} -> + {:halt, {:error, error}} + + other -> + {:halt, {:error, other}} + end + end) + end + + defp fetch_result(client, id) do + case RedshiftData.get_statement_result(client, %{id: id}) do + {:ok, %{body: body}} -> + columns = Enum.map(body["ColumnMetadata"], & &1["name"]) + rows = Enum.map(body["Records"], &Enum.map(&1, &value_from_field/1)) + result = %DBConnection.Result{columns: columns, rows: rows, num_rows: length(rows)} + {:ok, result} + + other -> + {:error, other} + end + end + + defp value_from_field(%{"longValue" => v}), do: v + defp value_from_field(%{"doubleValue" => v}), do: v + defp value_from_field(%{"stringValue" => v}), do: v + defp value_from_field(%{"booleanValue" => v}), do: v + defp value_from_field(_), do: nil + + defp interpolate(sql, params) do + Enum.with_index(params, 1) + |> Enum.reduce(sql, fn {value, idx}, acc -> + placeholder = "$#{idx}" + String.replace(acc, placeholder, quote_value(value)) + end) + end + + defp quote_value(value) when is_binary(value) do + "'" <> String.replace(value, "'", "''") <> "'" + end + + defp quote_value(value) when is_integer(value) or is_float(value) do + to_string(value) + end + + defp quote_value(true), do: "true" + defp quote_value(false), do: "false" + defp quote_value(nil), do: "null" + defp quote_value(other), do: "'" <> to_string(other) <> "'" +end diff --git a/lib/redshift_ecto/query.ex b/lib/redshift_ecto/query.ex new file mode 100644 index 0000000..f0ed0e3 --- /dev/null +++ b/lib/redshift_ecto/query.ex @@ -0,0 +1,21 @@ +defmodule RedshiftEcto.Query do + @moduledoc false + @behaviour DBConnection.Query + + defstruct [:statement] + + @impl true + def parse(%__MODULE__{} = query, _opts), do: query + + @impl true + def describe(query, _opts), do: query + + @impl true + def encode(_query, params, _opts), do: {:ok, params} + + @impl true + def decode(_query, result, _opts), do: {:ok, result} + + @impl true + def close(_query, _opts), do: :ok +end diff --git a/mix.exs b/mix.exs index 27969a7..4936f87 100644 --- a/mix.exs +++ b/mix.exs @@ -38,7 +38,8 @@ defmodule RedshiftEcto.MixProject do {:postgrex, "~> 0.15.1"}, {:ecto_replay_sandbox, "~> 2.0"}, {:ex_doc, "~> 0.18", only: :dev, runtime: false}, - {:jason, "~> 1.1", optional: true} + {:jason, "~> 1.1", optional: true}, + {:aws, "~> 0.13"} ] end From 69f3ab2f405de4196de1fd08b553764f18d20da3 Mon Sep 17 00:00:00 2001 From: Evan O'Brien Date: Fri, 12 Sep 2025 11:08:45 +0200 Subject: [PATCH 2/5] Use aws 1.0 and drop postgrex --- config/config.exs | 4 +- lib/redshift_ecto.ex | 2 +- lib/redshift_ecto/connection.ex | 1595 +++++++++++++++---------------- mix.exs | 3 +- 4 files changed, 800 insertions(+), 804 deletions(-) diff --git a/config/config.exs b/config/config.exs index 165dc31..2b27ac2 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,3 +1,3 @@ -use Mix.Config +import Config -config :postgrex, :json_library, Jason +config :aws, :json_library, Jason diff --git a/lib/redshift_ecto.ex b/lib/redshift_ecto.ex index eaf5262..d135f00 100644 --- a/lib/redshift_ecto.ex +++ b/lib/redshift_ecto.ex @@ -96,7 +96,7 @@ defmodule RedshiftEcto do """ # Inherit all behaviour from Ecto.Adapters.SQL - use Ecto.Adapters.SQL, driver: :postgrex, migration_lock: nil + use Ecto.Adapters.SQL, driver: :aws, migration_lock: nil alias Ecto.Adapters.Postgres diff --git a/lib/redshift_ecto/connection.ex b/lib/redshift_ecto/connection.ex index 8118352..a2e55cb 100644 --- a/lib/redshift_ecto/connection.ex +++ b/lib/redshift_ecto/connection.ex @@ -1,1014 +1,1011 @@ -if Code.ensure_loaded?(Postgrex) do - defmodule RedshiftEcto.Connection do - @moduledoc false +defmodule RedshiftEcto.Connection do + @moduledoc false - alias Ecto.Adapters.Postgres.Connection, as: Postgres - alias RedshiftEcto.DataAPI + alias Ecto.Adapters.Postgres.Connection, as: Postgres + alias RedshiftEcto.DataAPI - @default_port 5439 - @behaviour Ecto.Adapters.SQL.Connection + @default_port 5439 + @behaviour Ecto.Adapters.SQL.Connection - ## Module and Options + ## Module and Options - def child_spec(opts) do - opts = Keyword.put_new(opts, :port, @default_port) - DBConnection.child_spec(DataAPI, opts) - end + def child_spec(opts) do + opts = Keyword.put_new(opts, :port, @default_port) + DBConnection.child_spec(DataAPI, opts) + end - # constraints may be defined but are not enforced by Amazon Redshift - # The Data API does not provide detailed constraint errors - def to_constraints(_), do: [] + # constraints may be defined but are not enforced by Amazon Redshift + # The Data API does not provide detailed constraint errors + def to_constraints(_), do: [] - ## Query + ## Query - def prepare_execute(conn, _name, sql, params, opts) do - query = %RedshiftEcto.Query{statement: sql} - DBConnection.prepare_execute(conn, query, params, opts) - end + def prepare_execute(conn, _name, sql, params, opts) do + query = %RedshiftEcto.Query{statement: sql} + DBConnection.prepare_execute(conn, query, params, opts) + end - def execute(conn, %RedshiftEcto.Query{} = query, params, opts) do - DBConnection.execute(conn, query, params, opts) - end + def execute(conn, %RedshiftEcto.Query{} = query, params, opts) do + DBConnection.execute(conn, query, params, opts) + end - def execute(conn, sql, params, opts) when is_binary(sql) do - query = %RedshiftEcto.Query{statement: sql} - DBConnection.execute(conn, query, params, opts) - end + def execute(conn, sql, params, opts) when is_binary(sql) do + query = %RedshiftEcto.Query{statement: sql} + DBConnection.execute(conn, query, params, opts) + end - def stream(_conn, _sql, _params, _opts) do - raise "stream is not supported by Redshift Data API" - end + def stream(_conn, _sql, _params, _opts) do + raise "stream is not supported by Redshift Data API" + end - alias Ecto.Query - alias Ecto.Query.{BooleanExpr, JoinExpr, QueryExpr} - - def all(query) do - sources = create_names(query) - {select_distinct, order_by_distinct} = distinct(query.distinct, sources, query) - - from = from(query, sources) - select = select(query, select_distinct, sources) - join = join(query, sources) - where = where(query, sources) - group_by = group_by(query, sources) - having = having(query, sources) - order_by = order_by(query, order_by_distinct, sources) - limit = limit(query, sources) - offset = offset(query, sources) - lock = lock(query.lock) - - [select, from, join, where, group_by, having, order_by, limit, offset | lock] - end + alias Ecto.Query + alias Ecto.Query.{BooleanExpr, JoinExpr, QueryExpr} + + def all(query) do + sources = create_names(query) + {select_distinct, order_by_distinct} = distinct(query.distinct, sources, query) + + from = from(query, sources) + select = select(query, select_distinct, sources) + join = join(query, sources) + where = where(query, sources) + group_by = group_by(query, sources) + having = having(query, sources) + order_by = order_by(query, order_by_distinct, sources) + limit = limit(query, sources) + offset = offset(query, sources) + lock = lock(query.lock) + + [select, from, join, where, group_by, having, order_by, limit, offset | lock] + end - def update_all(%{from: from, select: nil} = query) do - sources = sources_unaliased(query) - {from, _} = get_source(query, sources, 0, from) + def update_all(%{from: from, select: nil} = query) do + sources = sources_unaliased(query) + {from, _} = get_source(query, sources, 0, from) - fields = update_fields(query, sources) - {join, wheres} = using_join(query, :update_all, "FROM", sources) - where = where(%{query | wheres: wheres ++ query.wheres}, sources) + fields = update_fields(query, sources) + {join, wheres} = using_join(query, :update_all, "FROM", sources) + where = where(%{query | wheres: wheres ++ query.wheres}, sources) - ["UPDATE ", from, " SET ", fields, join, where] - end + ["UPDATE ", from, " SET ", fields, join, where] + end - def update_all(_query) do - error!(nil, "RETURNING is not supported by Redshift") - end + def update_all(_query) do + error!(nil, "RETURNING is not supported by Redshift") + end - def update(prefix, table, fields, filters, []) do - {fields, count} = - intersperse_reduce(fields, ", ", 1, fn field, acc -> - {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1} - end) + def update(prefix, table, fields, filters, []) do + {fields, count} = + intersperse_reduce(fields, ", ", 1, fn field, acc -> + {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1} + end) - {filters, _count} = - intersperse_reduce(filters, " AND ", count, fn field, acc -> - {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1} - end) + {filters, _count} = + intersperse_reduce(filters, " AND ", count, fn field, acc -> + {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1} + end) - ["UPDATE ", quote_table(prefix, table), " SET ", fields, " WHERE " | filters] - end + ["UPDATE ", quote_table(prefix, table), " SET ", fields, " WHERE " | filters] + end - def update(_prefix, _table, _fields, _filters, _returning) do - error!(nil, "RETURNING is not supported by Redshift") - end + def update(_prefix, _table, _fields, _filters, _returning) do + error!(nil, "RETURNING is not supported by Redshift") + end - def delete_all(%{from: from, select: nil} = query) do - sources = sources_unaliased(query) - {from, _} = get_source(query, sources, 0, from) + def delete_all(%{from: from, select: nil} = query) do + sources = sources_unaliased(query) + {from, _} = get_source(query, sources, 0, from) - {join, wheres} = using_join(query, :delete_all, "USING", sources) - where = where(%{query | wheres: wheres ++ query.wheres}, sources) + {join, wheres} = using_join(query, :delete_all, "USING", sources) + where = where(%{query | wheres: wheres ++ query.wheres}, sources) - ["DELETE FROM ", from, join, where] - end + ["DELETE FROM ", from, join, where] + end - def delete_all(_query) do - error!(nil, "RETURNING is not supported by Redshift") - end + def delete_all(_query) do + error!(nil, "RETURNING is not supported by Redshift") + end - def delete(prefix, table, filters, []) do - {filters, _} = - intersperse_reduce(filters, " AND ", 1, fn field, acc -> - {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1} - end) + def delete(prefix, table, filters, []) do + {filters, _} = + intersperse_reduce(filters, " AND ", 1, fn field, acc -> + {[quote_name(field), " = $" | Integer.to_string(acc)], acc + 1} + end) - ["DELETE FROM ", quote_table(prefix, table), " WHERE " | filters] - end + ["DELETE FROM ", quote_table(prefix, table), " WHERE " | filters] + end - def delete(_prefix, _table, _filters, _returning) do - error!(nil, "RETURNING is not supported by Redshift") - end + def delete(_prefix, _table, _filters, _returning) do + error!(nil, "RETURNING is not supported by Redshift") + end - def insert(prefix, table, header, rows, {:raise, _, []}, []) do - values = - if header == [] do - [" VALUES " | intersperse_map(rows, ?,, fn _ -> "(DEFAULT)" end)] - else - [?\s, ?(, intersperse_map(header, ?,, "e_name/1), ") VALUES " | insert_all(rows, 1)] - end + def insert(prefix, table, header, rows, {:raise, _, []}, []) do + values = + if header == [] do + [" VALUES " | intersperse_map(rows, ?,, fn _ -> "(DEFAULT)" end)] + else + [?\s, ?(, intersperse_map(header, ?,, "e_name/1), ") VALUES " | insert_all(rows, 1)] + end - ["INSERT INTO ", quote_table(prefix, table) | values] - end + ["INSERT INTO ", quote_table(prefix, table) | values] + end - def insert(_prefix, _table, _header, _rows, _on_conflict, []) do - error!(nil, "ON CONFLICT is not supported by Redshift") - end + def insert(_prefix, _table, _header, _rows, _on_conflict, []) do + error!(nil, "ON CONFLICT is not supported by Redshift") + end - def insert(_prefix, _table, _header, _rows, _on_conflict, _returning) do - error!(nil, "RETURNING is not supported by Redshift") - end + def insert(_prefix, _table, _header, _rows, _on_conflict, _returning) do + error!(nil, "RETURNING is not supported by Redshift") + end - defp insert_all(rows, counter) do - intersperse_reduce(rows, ?,, counter, fn row, counter -> - {row, counter} = insert_each(row, counter) - {[?(, row, ?)], counter} - end) - |> elem(0) - end + defp insert_all(rows, counter) do + intersperse_reduce(rows, ?,, counter, fn row, counter -> + {row, counter} = insert_each(row, counter) + {[?(, row, ?)], counter} + end) + |> elem(0) + end - defp insert_each(values, counter) do - intersperse_reduce(values, ?,, counter, fn - nil, counter -> - {"DEFAULT", counter} + defp insert_each(values, counter) do + intersperse_reduce(values, ?,, counter, fn + nil, counter -> + {"DEFAULT", counter} - _, counter -> - {[?$ | Integer.to_string(counter)], counter + 1} - end) - end + _, counter -> + {[?$ | Integer.to_string(counter)], counter + 1} + end) + end - ## Query generation - - binary_ops = [ - ==: " = ", - !=: " != ", - <=: " <= ", - >=: " >= ", - <: " < ", - >: " > ", - and: " AND ", - or: " OR ", - ilike: " ILIKE ", - like: " LIKE " - ] + ## Query generation - @binary_ops Keyword.keys(binary_ops) + binary_ops = [ + ==: " = ", + !=: " != ", + <=: " <= ", + >=: " >= ", + <: " < ", + >: " > ", + and: " AND ", + or: " OR ", + ilike: " ILIKE ", + like: " LIKE " + ] - Enum.map(binary_ops, fn {op, str} -> - defp handle_call(unquote(op), 2), do: {:binary_op, unquote(str)} - end) + @binary_ops Keyword.keys(binary_ops) - defp handle_call(fun, _arity), do: {:fun, Atom.to_string(fun)} + Enum.map(binary_ops, fn {op, str} -> + defp handle_call(unquote(op), 2), do: {:binary_op, unquote(str)} + end) - defp select(%Query{select: %{fields: fields}} = query, select_distinct, sources) do - ["SELECT", select_distinct, ?\s | select_fields(fields, sources, query)] - end + defp handle_call(fun, _arity), do: {:fun, Atom.to_string(fun)} - defp select_fields([], _sources, _query), do: "TRUE" + defp select(%Query{select: %{fields: fields}} = query, select_distinct, sources) do + ["SELECT", select_distinct, ?\s | select_fields(fields, sources, query)] + end - defp select_fields(fields, sources, query) do - intersperse_map(fields, ", ", fn - {key, value} -> - [expr(value, sources, query), " AS " | quote_name(key)] + defp select_fields([], _sources, _query), do: "TRUE" - value -> - expr(value, sources, query) - end) - end + defp select_fields(fields, sources, query) do + intersperse_map(fields, ", ", fn + {key, value} -> + [expr(value, sources, query), " AS " | quote_name(key)] - defp distinct(nil, _, _), do: {[], []} - defp distinct(%QueryExpr{expr: []}, _, _), do: {[], []} - defp distinct(%QueryExpr{expr: true}, _, _), do: {" DISTINCT", []} - defp distinct(%QueryExpr{expr: false}, _, _), do: {[], []} - - defp distinct(%QueryExpr{expr: exprs}, sources, query) do - {[ - " DISTINCT ON (", - intersperse_map(exprs, ", ", fn {_, expr} -> expr(expr, sources, query) end), - ?) - ], exprs} - end + value -> + expr(value, sources, query) + end) + end - defp from(%{from: from} = query, sources) do - {from, name} = get_source(query, sources, 0, from) - [" FROM ", from, " AS " | name] - end + defp distinct(nil, _, _), do: {[], []} + defp distinct(%QueryExpr{expr: []}, _, _), do: {[], []} + defp distinct(%QueryExpr{expr: true}, _, _), do: {" DISTINCT", []} + defp distinct(%QueryExpr{expr: false}, _, _), do: {[], []} + + defp distinct(%QueryExpr{expr: exprs}, sources, query) do + {[ + " DISTINCT ON (", + intersperse_map(exprs, ", ", fn {_, expr} -> expr(expr, sources, query) end), + ?) + ], exprs} + end - defp update_fields(%Query{updates: updates} = query, sources) do - for( - %{expr: expr} <- updates, - {op, kw} <- expr, - {key, value} <- kw, - do: update_op(op, key, value, sources, query) - ) - |> Enum.intersperse(", ") - end + defp from(%{from: from} = query, sources) do + {from, name} = get_source(query, sources, 0, from) + [" FROM ", from, " AS " | name] + end - defp update_op(:set, key, value, sources, query) do - [quote_name(key), " = " | expr(value, sources, query)] - end + defp update_fields(%Query{updates: updates} = query, sources) do + for( + %{expr: expr} <- updates, + {op, kw} <- expr, + {key, value} <- kw, + do: update_op(op, key, value, sources, query) + ) + |> Enum.intersperse(", ") + end - defp update_op(:inc, key, value, sources, query) do - [ - quote_name(key), - " = ", - quote_qualified_name(key, sources, 0), - " + " - | expr(value, sources, query) - ] - end + defp update_op(:set, key, value, sources, query) do + [quote_name(key), " = " | expr(value, sources, query)] + end - defp update_op(command, _key, _value, _sources, query) do - error!(query, "Unknown update operation #{inspect(command)} for Redshift") - end + defp update_op(:inc, key, value, sources, query) do + [ + quote_name(key), + " = ", + quote_qualified_name(key, sources, 0), + " + " + | expr(value, sources, query) + ] + end - defp using_join(%Query{joins: []}, _kind, _prefix, _sources), do: {[], []} + defp update_op(command, _key, _value, _sources, query) do + error!(query, "Unknown update operation #{inspect(command)} for Redshift") + end - defp using_join(%Query{joins: joins} = query, kind, prefix, sources) do - froms = - intersperse_map(joins, ", ", fn - %JoinExpr{qual: :inner, ix: ix, source: source} -> - {join, _} = get_source(query, sources, ix, source) - join + defp using_join(%Query{joins: []}, _kind, _prefix, _sources), do: {[], []} - %JoinExpr{qual: qual} -> - error!(query, "Redshift supports only inner joins on #{kind}, got: `#{qual}`") - end) + defp using_join(%Query{joins: joins} = query, kind, prefix, sources) do + froms = + intersperse_map(joins, ", ", fn + %JoinExpr{qual: :inner, ix: ix, source: source} -> + {join, _} = get_source(query, sources, ix, source) + join - wheres = - for %JoinExpr{on: %QueryExpr{expr: value} = expr} <- joins, - value != true, - do: expr |> Map.put(:__struct__, BooleanExpr) |> Map.put(:op, :and) + %JoinExpr{qual: qual} -> + error!(query, "Redshift supports only inner joins on #{kind}, got: `#{qual}`") + end) - {[?\s, prefix, ?\s | froms], wheres} - end + wheres = + for %JoinExpr{on: %QueryExpr{expr: value} = expr} <- joins, + value != true, + do: expr |> Map.put(:__struct__, BooleanExpr) |> Map.put(:op, :and) - defp join(%Query{joins: []}, _sources), do: [] + {[?\s, prefix, ?\s | froms], wheres} + end - defp join(%Query{joins: joins} = query, sources) do - [ - ?\s - | intersperse_map(joins, ?\s, fn %JoinExpr{ - on: %QueryExpr{expr: expr}, - qual: qual, - ix: ix, - source: source - } -> - {join, name} = get_source(query, sources, ix, source) - [join_qual(qual), join, " AS ", name, " ON " | expr(expr, sources, query)] - end) - ] - end + defp join(%Query{joins: []}, _sources), do: [] + + defp join(%Query{joins: joins} = query, sources) do + [ + ?\s + | intersperse_map(joins, ?\s, fn %JoinExpr{ + on: %QueryExpr{expr: expr}, + qual: qual, + ix: ix, + source: source + } -> + {join, name} = get_source(query, sources, ix, source) + [join_qual(qual), join, " AS ", name, " ON " | expr(expr, sources, query)] + end) + ] + end - defp join_qual(:inner), do: "INNER JOIN " - defp join_qual(:inner_lateral), do: "INNER JOIN LATERAL " - defp join_qual(:left), do: "LEFT OUTER JOIN " - defp join_qual(:left_lateral), do: "LEFT OUTER JOIN LATERAL " - defp join_qual(:right), do: "RIGHT OUTER JOIN " - defp join_qual(:full), do: "FULL OUTER JOIN " - defp join_qual(:cross), do: "CROSS JOIN " + defp join_qual(:inner), do: "INNER JOIN " + defp join_qual(:inner_lateral), do: "INNER JOIN LATERAL " + defp join_qual(:left), do: "LEFT OUTER JOIN " + defp join_qual(:left_lateral), do: "LEFT OUTER JOIN LATERAL " + defp join_qual(:right), do: "RIGHT OUTER JOIN " + defp join_qual(:full), do: "FULL OUTER JOIN " + defp join_qual(:cross), do: "CROSS JOIN " - defp where(%Query{wheres: wheres} = query, sources) do - boolean(" WHERE ", wheres, sources, query) - end + defp where(%Query{wheres: wheres} = query, sources) do + boolean(" WHERE ", wheres, sources, query) + end - defp having(%Query{havings: havings} = query, sources) do - boolean(" HAVING ", havings, sources, query) - end + defp having(%Query{havings: havings} = query, sources) do + boolean(" HAVING ", havings, sources, query) + end - defp group_by(%Query{group_bys: []}, _sources), do: [] + defp group_by(%Query{group_bys: []}, _sources), do: [] - defp group_by(%Query{group_bys: group_bys} = query, sources) do - [ - " GROUP BY " - | intersperse_map(group_bys, ", ", fn %QueryExpr{expr: expr} -> - intersperse_map(expr, ", ", &expr(&1, sources, query)) - end) - ] - end + defp group_by(%Query{group_bys: group_bys} = query, sources) do + [ + " GROUP BY " + | intersperse_map(group_bys, ", ", fn %QueryExpr{expr: expr} -> + intersperse_map(expr, ", ", &expr(&1, sources, query)) + end) + ] + end - defp order_by(%Query{order_bys: []}, _distinct, _sources), do: [] + defp order_by(%Query{order_bys: []}, _distinct, _sources), do: [] - defp order_by(%Query{order_bys: order_bys} = query, distinct, sources) do - order_bys = Enum.flat_map(order_bys, & &1.expr) + defp order_by(%Query{order_bys: order_bys} = query, distinct, sources) do + order_bys = Enum.flat_map(order_bys, & &1.expr) - [ - " ORDER BY " - | intersperse_map(distinct ++ order_bys, ", ", &order_by_expr(&1, sources, query)) - ] - end + [ + " ORDER BY " + | intersperse_map(distinct ++ order_bys, ", ", &order_by_expr(&1, sources, query)) + ] + end - defp order_by_expr({dir, expr}, sources, query) do - str = expr(expr, sources, query) + defp order_by_expr({dir, expr}, sources, query) do + str = expr(expr, sources, query) - case dir do - :asc -> str - :desc -> [str | " DESC"] - end + case dir do + :asc -> str + :desc -> [str | " DESC"] end + end - defp limit(%Query{limit: nil}, _sources), do: [] + defp limit(%Query{limit: nil}, _sources), do: [] - defp limit(%Query{limit: %QueryExpr{expr: expr}} = query, sources) do - [" LIMIT " | expr(expr, sources, query)] - end + defp limit(%Query{limit: %QueryExpr{expr: expr}} = query, sources) do + [" LIMIT " | expr(expr, sources, query)] + end - defp offset(%Query{offset: nil}, _sources), do: [] + defp offset(%Query{offset: nil}, _sources), do: [] - defp offset(%Query{offset: %QueryExpr{expr: expr}} = query, sources) do - [" OFFSET " | expr(expr, sources, query)] - end + defp offset(%Query{offset: %QueryExpr{expr: expr}} = query, sources) do + [" OFFSET " | expr(expr, sources, query)] + end - defp lock(nil), do: [] - defp lock(lock_clause), do: [?\s | lock_clause] + defp lock(nil), do: [] + defp lock(lock_clause), do: [?\s | lock_clause] - defp boolean(_name, [], _sources, _query), do: [] + defp boolean(_name, [], _sources, _query), do: [] - defp boolean(name, [%{expr: expr, op: op} | query_exprs], sources, query) do - [ - name - | Enum.reduce(query_exprs, {op, paren_expr(expr, sources, query)}, fn - %BooleanExpr{expr: expr, op: op}, {op, acc} -> - {op, [acc, operator_to_boolean(op), paren_expr(expr, sources, query)]} - - %BooleanExpr{expr: expr, op: op}, {_, acc} -> - {op, [?(, acc, ?), operator_to_boolean(op), paren_expr(expr, sources, query)]} - end) - |> elem(1) - ] - end + defp boolean(name, [%{expr: expr, op: op} | query_exprs], sources, query) do + [ + name + | Enum.reduce(query_exprs, {op, paren_expr(expr, sources, query)}, fn + %BooleanExpr{expr: expr, op: op}, {op, acc} -> + {op, [acc, operator_to_boolean(op), paren_expr(expr, sources, query)]} - defp operator_to_boolean(:and), do: " AND " - defp operator_to_boolean(:or), do: " OR " + %BooleanExpr{expr: expr, op: op}, {_, acc} -> + {op, [?(, acc, ?), operator_to_boolean(op), paren_expr(expr, sources, query)]} + end) + |> elem(1) + ] + end - defp paren_expr(expr, sources, query) do - [?(, expr(expr, sources, query), ?)] - end + defp operator_to_boolean(:and), do: " AND " + defp operator_to_boolean(:or), do: " OR " - defp expr({:^, [], [ix]}, _sources, _query) do - [?$ | Integer.to_string(ix + 1)] - end - - defp expr({{:., _, [{:&, _, [idx]}, field]}, _, []}, sources, _query) when is_atom(field) do - quote_qualified_name(field, sources, idx) - end + defp paren_expr(expr, sources, query) do + [?(, expr(expr, sources, query), ?)] + end - defp expr({:&, _, [idx]}, sources, query) do - {source, _name, _schema} = elem(sources, idx) + defp expr({:^, [], [ix]}, _sources, _query) do + [?$ | Integer.to_string(ix + 1)] + end - error!( - query, - "Redshift does not support selecting all fields from #{source} without a schema. " <> - "Please specify a schema or specify exactly which fields you want to select" - ) - end + defp expr({{:., _, [{:&, _, [idx]}, field]}, _, []}, sources, _query) when is_atom(field) do + quote_qualified_name(field, sources, idx) + end - defp expr({:in, _, [_left, []]}, _sources, _query) do - "false" - end + defp expr({:&, _, [idx]}, sources, query) do + {source, _name, _schema} = elem(sources, idx) - defp expr({:in, _, [left, right]}, sources, query) when is_list(right) do - args = intersperse_map(right, ?,, &expr(&1, sources, query)) - [expr(left, sources, query), " IN (", args, ?)] - end + error!( + query, + "Redshift does not support selecting all fields from #{source} without a schema. " <> + "Please specify a schema or specify exactly which fields you want to select" + ) + end - defp expr({:in, _, [_, {:^, _, [_, 0]}]}, _sources, _query) do - "false" - end + defp expr({:in, _, [_left, []]}, _sources, _query) do + "false" + end - defp expr({:in, _, [left, {:^, _, [ix, length]}]}, sources, query) do - args = (ix + 1)..(ix + length) |> Enum.map(&"$#{&1}") |> Enum.intersperse(?,) - [expr(left, sources, query), " IN (", args, ?)] - end + defp expr({:in, _, [left, right]}, sources, query) when is_list(right) do + args = intersperse_map(right, ?,, &expr(&1, sources, query)) + [expr(left, sources, query), " IN (", args, ?)] + end - defp expr({:in, _, [left, right]}, sources, query) do - [expr(left, sources, query), " IN ", paren_expr(right, sources, query)] - end + defp expr({:in, _, [_, {:^, _, [_, 0]}]}, _sources, _query) do + "false" + end - defp expr({:is_nil, _, [arg]}, sources, query) do - [expr(arg, sources, query) | " IS NULL"] - end + defp expr({:in, _, [left, {:^, _, [ix, length]}]}, sources, query) do + args = (ix + 1)..(ix + length) |> Enum.map(&"$#{&1}") |> Enum.intersperse(?,) + [expr(left, sources, query), " IN (", args, ?)] + end - defp expr({:not, _, [expr]}, sources, query) do - ["NOT (", expr(expr, sources, query), ?)] - end + defp expr({:in, _, [left, right]}, sources, query) do + [expr(left, sources, query), " IN ", paren_expr(right, sources, query)] + end - defp expr(%Ecto.SubQuery{query: query}, _sources, _query) do - all(query) - end + defp expr({:is_nil, _, [arg]}, sources, query) do + [expr(arg, sources, query) | " IS NULL"] + end - defp expr({:fragment, _, [kw]}, _sources, query) when is_list(kw) or tuple_size(kw) == 3 do - error!(query, "Redshift adapter does not support keyword or interpolated fragments") - end + defp expr({:not, _, [expr]}, sources, query) do + ["NOT (", expr(expr, sources, query), ?)] + end - defp expr({:fragment, _, parts}, sources, query) do - Enum.map(parts, fn - {:raw, part} -> part - {:expr, expr} -> expr(expr, sources, query) - end) - end + defp expr(%Ecto.SubQuery{query: query}, _sources, _query) do + all(query) + end - defp expr({:datetime_add, _, [datetime, count, interval]}, sources, query) do - [ - ?(, - expr(datetime, sources, query), - "::timestamp + ", - interval(count, interval, sources, query) | ")::timestamp" - ] - end + defp expr({:fragment, _, [kw]}, _sources, query) when is_list(kw) or tuple_size(kw) == 3 do + error!(query, "Redshift adapter does not support keyword or interpolated fragments") + end - defp expr({:date_add, _, [date, count, interval]}, sources, query) do - [ - ?(, - expr(date, sources, query), - "::date + ", - interval(count, interval, sources, query) | ")::date" - ] - end + defp expr({:fragment, _, parts}, sources, query) do + Enum.map(parts, fn + {:raw, part} -> part + {:expr, expr} -> expr(expr, sources, query) + end) + end - defp expr({fun, _, args}, sources, query) when is_atom(fun) and is_list(args) do - {modifier, args} = - case args do - [rest, :distinct] -> {"DISTINCT ", [rest]} - _ -> {[], args} - end + defp expr({:datetime_add, _, [datetime, count, interval]}, sources, query) do + [ + ?(, + expr(datetime, sources, query), + "::timestamp + ", + interval(count, interval, sources, query) | ")::timestamp" + ] + end - case handle_call(fun, length(args)) do - {:binary_op, op} -> - [left, right] = args - [op_to_binary(left, sources, query), op | op_to_binary(right, sources, query)] + defp expr({:date_add, _, [date, count, interval]}, sources, query) do + [ + ?(, + expr(date, sources, query), + "::date + ", + interval(count, interval, sources, query) | ")::date" + ] + end - {:fun, fun} -> - [fun, ?(, modifier, intersperse_map(args, ", ", &expr(&1, sources, query)), ?)] + defp expr({fun, _, args}, sources, query) when is_atom(fun) and is_list(args) do + {modifier, args} = + case args do + [rest, :distinct] -> {"DISTINCT ", [rest]} + _ -> {[], args} end - end - defp expr(list, _sources, query) when is_list(list) do - error!(query, "Array type is not supported by Redshift") - end + case handle_call(fun, length(args)) do + {:binary_op, op} -> + [left, right] = args + [op_to_binary(left, sources, query), op | op_to_binary(right, sources, query)] - defp expr(%Decimal{} = decimal, _sources, _query) do - Decimal.to_string(decimal, :normal) + {:fun, fun} -> + [fun, ?(, modifier, intersperse_map(args, ", ", &expr(&1, sources, query)), ?)] end + end - defp expr(%Ecto.Query.Tagged{value: binary, type: :binary}, _sources, query) - when is_binary(binary) do - error!(query, "The Redshift Adapter doesn't support binaries") - end + defp expr(list, _sources, query) when is_list(list) do + error!(query, "Array type is not supported by Redshift") + end - defp expr(%Ecto.Query.Tagged{value: other, type: type}, sources, query) do - [expr(other, sources, query), ?:, ?: | tagged_to_db(type)] - end + defp expr(%Decimal{} = decimal, _sources, _query) do + Decimal.to_string(decimal, :normal) + end - defp expr(nil, _sources, _query), do: "NULL" - defp expr(true, _sources, _query), do: "TRUE" - defp expr(false, _sources, _query), do: "FALSE" + defp expr(%Ecto.Query.Tagged{value: binary, type: :binary}, _sources, query) + when is_binary(binary) do + error!(query, "The Redshift Adapter doesn't support binaries") + end - defp expr(literal, _sources, _query) when is_binary(literal) do - [?\', escape_string(literal), ?\'] - end + defp expr(%Ecto.Query.Tagged{value: other, type: type}, sources, query) do + [expr(other, sources, query), ?:, ?: | tagged_to_db(type)] + end - defp expr(literal, _sources, _query) when is_integer(literal) do - Integer.to_string(literal) - end + defp expr(nil, _sources, _query), do: "NULL" + defp expr(true, _sources, _query), do: "TRUE" + defp expr(false, _sources, _query), do: "FALSE" - defp expr(literal, _sources, _query) when is_float(literal) do - [Float.to_string(literal) | "::float"] - end + defp expr(literal, _sources, _query) when is_binary(literal) do + [?\', escape_string(literal), ?\'] + end - # Always use the largest possible type for integers - defp tagged_to_db(:id), do: "bigint" - defp tagged_to_db(:integer), do: "bigint" - defp tagged_to_db(type), do: ecto_to_db(type) + defp expr(literal, _sources, _query) when is_integer(literal) do + Integer.to_string(literal) + end - defp interval(count, interval, _sources, _query) when is_integer(count) do - ["interval '", String.Chars.Integer.to_string(count), ?\s, interval, ?\'] - end + defp expr(literal, _sources, _query) when is_float(literal) do + [Float.to_string(literal) | "::float"] + end - defp interval(count, interval, _sources, _query) when is_float(count) do - count = :erlang.float_to_binary(count, [:compact, decimals: 16]) - ["interval '", count, ?\s, interval, ?\'] - end + # Always use the largest possible type for integers + defp tagged_to_db(:id), do: "bigint" + defp tagged_to_db(:integer), do: "bigint" + defp tagged_to_db(type), do: ecto_to_db(type) - defp interval(count, interval, sources, query) do - [?(, expr(count, sources, query), "::numeric * ", interval(1, interval, sources, query), ?)] - end + defp interval(count, interval, _sources, _query) when is_integer(count) do + ["interval '", String.Chars.Integer.to_string(count), ?\s, interval, ?\'] + end - defp op_to_binary({op, _, [_, _]} = expr, sources, query) when op in @binary_ops do - paren_expr(expr, sources, query) - end + defp interval(count, interval, _sources, _query) when is_float(count) do + count = :erlang.float_to_binary(count, [:compact, decimals: 16]) + ["interval '", count, ?\s, interval, ?\'] + end - defp op_to_binary(expr, sources, query) do - expr(expr, sources, query) - end + defp interval(count, interval, sources, query) do + [?(, expr(count, sources, query), "::numeric * ", interval(1, interval, sources, query), ?)] + end - defp sources_unaliased(%{prefix: prefix, sources: sources}) do - sources_unaliased(prefix, sources, 0, tuple_size(sources)) |> List.to_tuple() - end + defp op_to_binary({op, _, [_, _]} = expr, sources, query) when op in @binary_ops do + paren_expr(expr, sources, query) + end - defp sources_unaliased(prefix, sources, pos, limit) when pos < limit do - current = - case elem(sources, pos) do - {table, schema, _alias} -> - quoted = quote_table(prefix, table) - {quoted, quoted, schema} - - {:fragment, _, _} -> - error!( - nil, - "Redshift doesn't support fragment sources in UPDATE and DELETE statements" - ) - - %Ecto.SubQuery{} -> - error!( - nil, - "Redshift doesn't support subquery sources in UPDATE and DELETE statements" - ) - end - - [current | sources_unaliased(prefix, sources, pos + 1, limit)] - end + defp op_to_binary(expr, sources, query) do + expr(expr, sources, query) + end - defp sources_unaliased(_prefix, _sources, pos, pos) do - [] - end + defp sources_unaliased(%{prefix: prefix, sources: sources}) do + sources_unaliased(prefix, sources, 0, tuple_size(sources)) |> List.to_tuple() + end - defp create_names(%{prefix: prefix, sources: sources}) do - create_names(prefix, sources, 0, tuple_size(sources)) |> List.to_tuple() - end + defp sources_unaliased(prefix, sources, pos, limit) when pos < limit do + current = + case elem(sources, pos) do + {table, schema, _alias} -> + quoted = quote_table(prefix, table) + {quoted, quoted, schema} + + {:fragment, _, _} -> + error!( + nil, + "Redshift doesn't support fragment sources in UPDATE and DELETE statements" + ) + + %Ecto.SubQuery{} -> + error!( + nil, + "Redshift doesn't support subquery sources in UPDATE and DELETE statements" + ) + end - defp create_names(prefix, sources, pos, limit) when pos < limit do - current = - case elem(sources, pos) do - {table, schema, _alias} -> - name = [create_alias(table) | Integer.to_string(pos)] - {quote_table(prefix, table), name, schema} + [current | sources_unaliased(prefix, sources, pos + 1, limit)] + end - {:fragment, _, _} -> - {nil, [?f | Integer.to_string(pos)], nil} + defp sources_unaliased(_prefix, _sources, pos, pos) do + [] + end - %Ecto.SubQuery{} -> - {nil, [?s | Integer.to_string(pos)], nil} - end + defp create_names(%{prefix: prefix, sources: sources}) do + create_names(prefix, sources, 0, tuple_size(sources)) |> List.to_tuple() + end - [current | create_names(prefix, sources, pos + 1, limit)] - end + defp create_names(prefix, sources, pos, limit) when pos < limit do + current = + case elem(sources, pos) do + {table, schema, _alias} -> + name = [create_alias(table) | Integer.to_string(pos)] + {quote_table(prefix, table), name, schema} - defp create_names(_prefix, _sources, pos, pos), do: [] + {:fragment, _, _} -> + {nil, [?f | Integer.to_string(pos)], nil} - defp create_alias(<>) when first in ?a..?z when first in ?A..?Z do - <> - end + %Ecto.SubQuery{} -> + {nil, [?s | Integer.to_string(pos)], nil} + end - defp create_alias(_), do: "t" + [current | create_names(prefix, sources, pos + 1, limit)] + end - ## DDL + defp create_names(_prefix, _sources, pos, pos), do: [] - alias Ecto.Migration.{Table, Index, Reference, Constraint} + defp create_alias(<>) when first in ?a..?z when first in ?A..?Z do + <> + end - @drops [:drop, :drop_if_exists] + defp create_alias(_), do: "t" - def execute_ddl({command, %Table{} = table, columns}) - when command in [:create, :create_if_not_exists] do - table_name = quote_table(table.prefix, table.name) + ## DDL - query = [ - "CREATE TABLE ", - if_do(command == :create_if_not_exists, "IF NOT EXISTS "), - table_name, - ?\s, - ?(, - column_definitions(table, columns), - pk_definition(columns, ", "), - ?), - options_expr(table.options) - ] + alias Ecto.Migration.{Table, Index, Reference, Constraint} - [query] ++ - comments_on("TABLE", table_name, table.comment) ++ - comments_for_columns(table_name, columns) - end + @drops [:drop, :drop_if_exists] - def execute_ddl({command, %Table{} = table}) when command in @drops do - [ - [ - "DROP TABLE ", - if_do(command == :drop_if_exists, "IF EXISTS "), - quote_table(table.prefix, table.name) - ] - ] - end + def execute_ddl({command, %Table{} = table, columns}) + when command in [:create, :create_if_not_exists] do + table_name = quote_table(table.prefix, table.name) - # TODO: split multiple changes into multiple queries as Redshift doesn't - # support more than one change in a single ALTER TABLE - def execute_ddl({:alter, %Table{} = table, changes}) do - table_name = quote_table(table.prefix, table.name) + query = [ + "CREATE TABLE ", + if_do(command == :create_if_not_exists, "IF NOT EXISTS "), + table_name, + ?\s, + ?(, + column_definitions(table, columns), + pk_definition(columns, ", "), + ?), + options_expr(table.options) + ] - query = [ - "ALTER TABLE ", - table_name, - ?\s, - column_changes(table, changes), - pk_definition(changes, ", ADD ") + [query] ++ + comments_on("TABLE", table_name, table.comment) ++ + comments_for_columns(table_name, columns) + end + + def execute_ddl({command, %Table{} = table}) when command in @drops do + [ + [ + "DROP TABLE ", + if_do(command == :drop_if_exists, "IF EXISTS "), + quote_table(table.prefix, table.name) ] + ] + end - [query] ++ - comments_on("TABLE", table_name, table.comment) ++ - comments_for_columns(table_name, changes) - end + # TODO: split multiple changes into multiple queries as Redshift doesn't + # support more than one change in a single ALTER TABLE + def execute_ddl({:alter, %Table{} = table, changes}) do + table_name = quote_table(table.prefix, table.name) + + query = [ + "ALTER TABLE ", + table_name, + ?\s, + column_changes(table, changes), + pk_definition(changes, ", ADD ") + ] - def execute_ddl({_command, %Index{} = _index}) do - error!(nil, "CREATE INDEX and DROP INDEX are not supported by Redshift") - end + [query] ++ + comments_on("TABLE", table_name, table.comment) ++ + comments_for_columns(table_name, changes) + end - def execute_ddl({:rename, %Table{} = current_table, %Table{} = new_table}) do + def execute_ddl({_command, %Index{} = _index}) do + error!(nil, "CREATE INDEX and DROP INDEX are not supported by Redshift") + end + + def execute_ddl({:rename, %Table{} = current_table, %Table{} = new_table}) do + [ [ - [ - "ALTER TABLE ", - quote_table(current_table.prefix, current_table.name), - " RENAME TO ", - quote_table(nil, new_table.name) - ] + "ALTER TABLE ", + quote_table(current_table.prefix, current_table.name), + " RENAME TO ", + quote_table(nil, new_table.name) ] - end + ] + end - def execute_ddl({:rename, %Table{} = table, current_column, new_column}) do + def execute_ddl({:rename, %Table{} = table, current_column, new_column}) do + [ [ - [ - "ALTER TABLE ", - quote_table(table.prefix, table.name), - " RENAME ", - quote_name(current_column), - " TO ", - quote_name(new_column) - ] + "ALTER TABLE ", + quote_table(table.prefix, table.name), + " RENAME ", + quote_name(current_column), + " TO ", + quote_name(new_column) ] - end + ] + end - def execute_ddl({:create, %Constraint{}}) do - error!(nil, "CHECK and EXCLUDE constraints are not supported by Redshift") - end + def execute_ddl({:create, %Constraint{}}) do + error!(nil, "CHECK and EXCLUDE constraints are not supported by Redshift") + end - def execute_ddl({:drop, %Constraint{} = constraint}) do + def execute_ddl({:drop, %Constraint{} = constraint}) do + [ [ - [ - "ALTER TABLE ", - quote_table(constraint.prefix, constraint.table), - " DROP CONSTRAINT ", - quote_name(constraint.name) - ] + "ALTER TABLE ", + quote_table(constraint.prefix, constraint.table), + " DROP CONSTRAINT ", + quote_name(constraint.name) ] - end + ] + end - def execute_ddl(string) when is_binary(string), do: [string] + def execute_ddl(string) when is_binary(string), do: [string] - def execute_ddl(keyword) when is_list(keyword), - do: error!(nil, "PostgreSQL adapter does not support keyword lists in execute") + def execute_ddl(keyword) when is_list(keyword), + do: error!(nil, "PostgreSQL adapter does not support keyword lists in execute") - defp pk_definition(columns, prefix) do - pks = for {_, name, _, opts} <- columns, opts[:primary_key], do: name + defp pk_definition(columns, prefix) do + pks = for {_, name, _, opts} <- columns, opts[:primary_key], do: name - case pks do - [] -> [] - _ -> [prefix, "PRIMARY KEY (", intersperse_map(pks, ", ", "e_name/1), ")"] - end + case pks do + [] -> [] + _ -> [prefix, "PRIMARY KEY (", intersperse_map(pks, ", ", "e_name/1), ")"] end + end - defp comments_on(_object, _name, nil), do: [] + defp comments_on(_object, _name, nil), do: [] - defp comments_on(object, name, comment) do - [["COMMENT ON ", object, ?\s, name, " IS ", single_quote(comment)]] - end + defp comments_on(object, name, comment) do + [["COMMENT ON ", object, ?\s, name, " IS ", single_quote(comment)]] + end - defp comments_for_columns(table_name, columns) do - Enum.flat_map(columns, fn - {_operation, column_name, _column_type, opts} -> - column_name = [table_name, ?. | quote_name(column_name)] - comments_on("COLUMN", column_name, opts[:comment]) + defp comments_for_columns(table_name, columns) do + Enum.flat_map(columns, fn + {_operation, column_name, _column_type, opts} -> + column_name = [table_name, ?. | quote_name(column_name)] + comments_on("COLUMN", column_name, opts[:comment]) - _ -> - [] - end) - end + _ -> + [] + end) + end - defp column_definitions(table, columns) do - intersperse_map(columns, ", ", &column_definition(table, &1)) - end + defp column_definitions(table, columns) do + intersperse_map(columns, ", ", &column_definition(table, &1)) + end - defp column_definition(table, {:add, name, %Reference{} = ref, opts}) do - [ - quote_name(name), - ?\s, - reference_column_type(ref.type, opts), - column_options(ref.type, opts), - reference_expr(ref, table, name) - ] - end + defp column_definition(table, {:add, name, %Reference{} = ref, opts}) do + [ + quote_name(name), + ?\s, + reference_column_type(ref.type, opts), + column_options(ref.type, opts), + reference_expr(ref, table, name) + ] + end - defp column_definition(_table, {:add, name, type, opts}) do - [quote_name(name), ?\s, column_type(type, opts), column_options(type, opts)] - end + defp column_definition(_table, {:add, name, type, opts}) do + [quote_name(name), ?\s, column_type(type, opts), column_options(type, opts)] + end - defp column_changes(table, columns) do - intersperse_map(columns, ", ", &column_change(table, &1)) - end + defp column_changes(table, columns) do + intersperse_map(columns, ", ", &column_change(table, &1)) + end - defp column_change(table, {:add, name, %Reference{} = ref, opts}) do - [ - "ADD COLUMN ", - quote_name(name), - ?\s, - reference_column_type(ref.type, opts), - column_options(ref.type, opts), - reference_expr(ref, table, name) - ] - end + defp column_change(table, {:add, name, %Reference{} = ref, opts}) do + [ + "ADD COLUMN ", + quote_name(name), + ?\s, + reference_column_type(ref.type, opts), + column_options(ref.type, opts), + reference_expr(ref, table, name) + ] + end - defp column_change(_table, {:add, name, type, opts}) do - ["ADD COLUMN ", quote_name(name), ?\s, column_type(type, opts), column_options(type, opts)] - end + defp column_change(_table, {:add, name, type, opts}) do + ["ADD COLUMN ", quote_name(name), ?\s, column_type(type, opts), column_options(type, opts)] + end - defp column_change(table, {:modify, name, %Reference{} = ref, _opts}) do - constraint_expr(ref, table, name) - end + defp column_change(table, {:modify, name, %Reference{} = ref, _opts}) do + constraint_expr(ref, table, name) + end - defp column_change(_table, {:modify, _name, _type, _opts}) do - error!(nil, "ALTER COLUMN is not supported by Redshift") - end + defp column_change(_table, {:modify, _name, _type, _opts}) do + error!(nil, "ALTER COLUMN is not supported by Redshift") + end - defp column_change(_table, {:remove, name}), do: ["DROP COLUMN ", quote_name(name)] + defp column_change(_table, {:remove, name}), do: ["DROP COLUMN ", quote_name(name)] - defp column_options(type, opts) do - default = Keyword.fetch(opts, :default) + defp column_options(type, opts) do + default = Keyword.fetch(opts, :default) - {opts, _rest} = - Keyword.split(opts, [:identity, :encode, :distkey, :sortkey, :null, :unique]) + {opts, _rest} = + Keyword.split(opts, [:identity, :encode, :distkey, :sortkey, :null, :unique]) - [default_expr(default, type), Enum.map(opts, &column_option/1)] - end + [default_expr(default, type), Enum.map(opts, &column_option/1)] + end - @compression_encodings [ - :raw, - :bytedict, - :delta, - :delta32k, - :lzo, - :mostly8, - :mostly16, - :mostly32, - :runlength, - :text255, - :text32k, - :zstd - ] + @compression_encodings [ + :raw, + :bytedict, + :delta, + :delta32k, + :lzo, + :mostly8, + :mostly16, + :mostly32, + :runlength, + :text255, + :text32k, + :zstd + ] + + defp column_option({:encode, encoding}) when encoding in @compression_encodings do + [" ENCODE ", to_string(encoding)] + end - defp column_option({:encode, encoding}) when encoding in @compression_encodings do - [" ENCODE ", to_string(encoding)] - end + defp column_option({:identity, {seed, step}}) when is_integer(seed) and is_integer(step) do + [" IDENTITY(", to_string(seed), ?,, to_string(step), ?)] + end - defp column_option({:identity, {seed, step}}) when is_integer(seed) and is_integer(step) do - [" IDENTITY(", to_string(seed), ?,, to_string(step), ?)] - end + defp column_option({:distkey, true}), do: " DISTKEY" + defp column_option({:sortkey, true}), do: " SORTKEY" + defp column_option({:null, false}), do: " NOT NULL" + defp column_option({:null, true}), do: " NULL" + defp column_option({:unique, true}), do: " UNIQUE" - defp column_option({:distkey, true}), do: " DISTKEY" - defp column_option({:sortkey, true}), do: " SORTKEY" - defp column_option({:null, false}), do: " NOT NULL" - defp column_option({:null, true}), do: " NULL" - defp column_option({:unique, true}), do: " UNIQUE" + defp column_option(expr) do + error!(nil, "unrecognized column option `#{inspect(expr)}`") + end - defp column_option(expr) do - error!(nil, "unrecognized column option `#{inspect(expr)}`") - end + defp default_expr({:ok, nil}, _type), do: " DEFAULT NULL" - defp default_expr({:ok, nil}, _type), do: " DEFAULT NULL" + defp default_expr({:ok, literal}, _type) when is_binary(literal), + do: [" DEFAULT '", escape_string(literal), ?'] - defp default_expr({:ok, literal}, _type) when is_binary(literal), - do: [" DEFAULT '", escape_string(literal), ?'] + defp default_expr({:ok, literal}, _type) when is_number(literal) or is_boolean(literal), + do: [" DEFAULT ", to_string(literal)] - defp default_expr({:ok, literal}, _type) when is_number(literal) or is_boolean(literal), - do: [" DEFAULT ", to_string(literal)] + defp default_expr({:ok, %{} = map}, :map) do + default = json_library().encode!(map) + [" DEFAULT ", single_quote(default)] + end - defp default_expr({:ok, %{} = map}, :map) do - default = json_library().encode!(map) - [" DEFAULT ", single_quote(default)] - end + defp default_expr({:ok, {:fragment, expr}}, _type), do: [" DEFAULT ", expr] - defp default_expr({:ok, {:fragment, expr}}, _type), do: [" DEFAULT ", expr] + defp default_expr({:ok, expr}, type), + do: + raise( + ArgumentError, + "unknown default `#{inspect(expr)}` for type `#{inspect(type)}`. " <> + ":default may be a string, number, boolean, map (when type is Map), or a fragment(...)" + ) - defp default_expr({:ok, expr}, type), - do: - raise( - ArgumentError, - "unknown default `#{inspect(expr)}` for type `#{inspect(type)}`. " <> - ":default may be a string, number, boolean, map (when type is Map), or a fragment(...)" - ) + defp default_expr(:error, _), do: [] - defp default_expr(:error, _), do: [] + defp options_expr(nil), do: [] + defp options_expr(keyword) when is_list(keyword), do: Enum.map(keyword, &table_attribute/1) + defp options_expr(options), do: [?\s, options] - defp options_expr(nil), do: [] - defp options_expr(keyword) when is_list(keyword), do: Enum.map(keyword, &table_attribute/1) - defp options_expr(options), do: [?\s, options] + defp table_attribute({:diststyle, :even}), do: " DISTSTYLE EVEN" + defp table_attribute({:diststyle, :all}), do: " DISTSTYLE ALL" + defp table_attribute({:diststyle, :key}), do: " DISTSTYLE KEY" - defp table_attribute({:diststyle, :even}), do: " DISTSTYLE EVEN" - defp table_attribute({:diststyle, :all}), do: " DISTSTYLE ALL" - defp table_attribute({:diststyle, :key}), do: " DISTSTYLE KEY" + defp table_attribute({:distkey, key}) when is_atom(key) or is_binary(key) do + [" DISTKEY (", quote_name(key), ?)] + end - defp table_attribute({:distkey, key}) when is_atom(key) or is_binary(key) do - [" DISTKEY (", quote_name(key), ?)] - end + defp table_attribute({:sortkey, key}) when is_atom(key) or is_binary(key) do + [" SORTKEY (", quote_name(key), ?)] + end - defp table_attribute({:sortkey, key}) when is_atom(key) or is_binary(key) do - [" SORTKEY (", quote_name(key), ?)] - end + defp table_attribute({:sortkey, keys}) when is_list(keys) do + [" SORTKEY (", intersperse_map(keys, ", ", "e_name/1), ?)] + end - defp table_attribute({:sortkey, keys}) when is_list(keys) do - [" SORTKEY (", intersperse_map(keys, ", ", "e_name/1), ?)] - end + defp table_attribute({:sortkey, {:compound, keys}}) do + [" COMPOUND", table_attribute({:sortkey, keys})] + end - defp table_attribute({:sortkey, {:compound, keys}}) do - [" COMPOUND", table_attribute({:sortkey, keys})] - end + defp table_attribute({:sortkey, {:interleaved, keys}}) do + [" INTERLEAVED", table_attribute({:sortkey, keys})] + end - defp table_attribute({:sortkey, {:interleaved, keys}}) do - [" INTERLEAVED", table_attribute({:sortkey, keys})] - end + defp table_attribute(table_attribute) do + error!(nil, "unrecognized table attribute `#{inspect(table_attribute)}`") + end - defp table_attribute(table_attribute) do - error!(nil, "unrecognized table attribute `#{inspect(table_attribute)}`") - end + defp column_type(type, opts) do + size = Keyword.get(opts, :size) + precision = Keyword.get(opts, :precision) + scale = Keyword.get(opts, :scale) + type_name = ecto_to_db(type) - defp column_type(type, opts) do - size = Keyword.get(opts, :size) - precision = Keyword.get(opts, :precision) - scale = Keyword.get(opts, :scale) - type_name = ecto_to_db(type) - - cond do - size -> [type_name, ?(, to_string(size), ?)] - precision -> [type_name, ?(, to_string(precision), ?,, to_string(scale || 0), ?)] - type == :string -> [type_name, "(255)"] - true -> type_name - end + cond do + size -> [type_name, ?(, to_string(size), ?)] + precision -> [type_name, ?(, to_string(precision), ?,, to_string(scale || 0), ?)] + type == :string -> [type_name, "(255)"] + true -> type_name end + end - defp reference_expr(%Reference{on_delete: :nothing, on_update: :nothing} = ref, table, name), - do: [ - " CONSTRAINT ", - reference_name(ref, table, name), - " REFERENCES ", - quote_table(table.prefix, ref.table), - ?(, - quote_name(ref.column), - ?) - ] - - defp reference_expr(%Reference{on_delete: :nothing}, _table, _name), - do: error!(nil, "ON UPDATE is not supported by Redshift") - - defp reference_expr(%Reference{}, _table, _name), - do: error!(nil, "ON DELETE is not supported by Redshift") - - defp constraint_expr(%Reference{on_delete: :nothing, on_update: :nothing} = ref, table, name), - do: [ - "ADD CONSTRAINT ", - reference_name(ref, table, name), - ?\s, - "FOREIGN KEY (", - quote_name(name), - ") REFERENCES ", - quote_table(table.prefix, ref.table), - ?(, - quote_name(ref.column), - ?) - ] + defp reference_expr(%Reference{on_delete: :nothing, on_update: :nothing} = ref, table, name), + do: [ + " CONSTRAINT ", + reference_name(ref, table, name), + " REFERENCES ", + quote_table(table.prefix, ref.table), + ?(, + quote_name(ref.column), + ?) + ] - defp constraint_expr(%Reference{on_delete: :nothing}, _table, _name), - do: error!(nil, "ON UPDATE is not supported by Redshift") + defp reference_expr(%Reference{on_delete: :nothing}, _table, _name), + do: error!(nil, "ON UPDATE is not supported by Redshift") + + defp reference_expr(%Reference{}, _table, _name), + do: error!(nil, "ON DELETE is not supported by Redshift") + + defp constraint_expr(%Reference{on_delete: :nothing, on_update: :nothing} = ref, table, name), + do: [ + "ADD CONSTRAINT ", + reference_name(ref, table, name), + ?\s, + "FOREIGN KEY (", + quote_name(name), + ") REFERENCES ", + quote_table(table.prefix, ref.table), + ?(, + quote_name(ref.column), + ?) + ] - defp constraint_expr(%Reference{}, _table, _name), - do: error!(nil, "ON DELETE is not supported by Redshift") + defp constraint_expr(%Reference{on_delete: :nothing}, _table, _name), + do: error!(nil, "ON UPDATE is not supported by Redshift") - defp reference_name(%Reference{name: nil}, table, column), - do: quote_name("#{table.name}_#{column}_fkey") + defp constraint_expr(%Reference{}, _table, _name), + do: error!(nil, "ON DELETE is not supported by Redshift") - defp reference_name(%Reference{name: name}, _table, _column), do: quote_name(name) + defp reference_name(%Reference{name: nil}, table, column), + do: quote_name("#{table.name}_#{column}_fkey") - defp reference_column_type(:serial, _opts), do: "integer" - defp reference_column_type(:bigserial, _opts), do: "bigint" - defp reference_column_type(type, opts), do: column_type(type, opts) + defp reference_name(%Reference{name: name}, _table, _column), do: quote_name(name) - ## Helpers + defp reference_column_type(:serial, _opts), do: "integer" + defp reference_column_type(:bigserial, _opts), do: "bigint" + defp reference_column_type(type, opts), do: column_type(type, opts) - defp get_source(query, sources, ix, source) do - {expr, name, _schema} = elem(sources, ix) - {expr || paren_expr(source, sources, query), name} - end + ## Helpers - defp quote_qualified_name(name, sources, ix) do - {_, source, _} = elem(sources, ix) - [source, ?. | quote_name(name)] - end + defp get_source(query, sources, ix, source) do + {expr, name, _schema} = elem(sources, ix) + {expr || paren_expr(source, sources, query), name} + end - defp quote_name(name) when is_atom(name) do - quote_name(Atom.to_string(name)) - end + defp quote_qualified_name(name, sources, ix) do + {_, source, _} = elem(sources, ix) + [source, ?. | quote_name(name)] + end - defp quote_name(name) do - if String.contains?(name, "\"") do - error!(nil, "bad field name #{inspect(name)}") - end + defp quote_name(name) when is_atom(name) do + quote_name(Atom.to_string(name)) + end - [?", name, ?"] + defp quote_name(name) do + if String.contains?(name, "\"") do + error!(nil, "bad field name #{inspect(name)}") end - defp quote_table(nil, name), do: quote_table(name) - defp quote_table(prefix, name), do: [quote_table(prefix), ?., quote_table(name)] + [?", name, ?"] + end - defp quote_table(name) when is_atom(name), do: quote_table(Atom.to_string(name)) + defp quote_table(nil, name), do: quote_table(name) + defp quote_table(prefix, name), do: [quote_table(prefix), ?., quote_table(name)] - defp quote_table(name) do - if String.contains?(name, "\"") do - error!(nil, "bad table name #{inspect(name)}") - end + defp quote_table(name) when is_atom(name), do: quote_table(Atom.to_string(name)) - [?", name, ?"] + defp quote_table(name) do + if String.contains?(name, "\"") do + error!(nil, "bad table name #{inspect(name)}") end - defp single_quote(value), do: [?', escape_string(value), ?'] + [?", name, ?"] + end - defp intersperse_map(list, separator, mapper, acc \\ []) - defp intersperse_map([], _separator, _mapper, acc), do: acc - defp intersperse_map([elem], _separator, mapper, acc), do: [acc | mapper.(elem)] + defp single_quote(value), do: [?', escape_string(value), ?'] - defp intersperse_map([elem | rest], separator, mapper, acc), - do: intersperse_map(rest, separator, mapper, [acc, mapper.(elem), separator]) + defp intersperse_map(list, separator, mapper, acc \\ []) + defp intersperse_map([], _separator, _mapper, acc), do: acc + defp intersperse_map([elem], _separator, mapper, acc), do: [acc | mapper.(elem)] - defp intersperse_reduce(list, separator, user_acc, reducer, acc \\ []) - defp intersperse_reduce([], _separator, user_acc, _reducer, acc), do: {acc, user_acc} + defp intersperse_map([elem | rest], separator, mapper, acc), + do: intersperse_map(rest, separator, mapper, [acc, mapper.(elem), separator]) - defp intersperse_reduce([elem], _separator, user_acc, reducer, acc) do - {elem, user_acc} = reducer.(elem, user_acc) - {[acc | elem], user_acc} - end + defp intersperse_reduce(list, separator, user_acc, reducer, acc \\ []) + defp intersperse_reduce([], _separator, user_acc, _reducer, acc), do: {acc, user_acc} - defp intersperse_reduce([elem | rest], separator, user_acc, reducer, acc) do - {elem, user_acc} = reducer.(elem, user_acc) - intersperse_reduce(rest, separator, user_acc, reducer, [acc, elem, separator]) - end + defp intersperse_reduce([elem], _separator, user_acc, reducer, acc) do + {elem, user_acc} = reducer.(elem, user_acc) + {[acc | elem], user_acc} + end - defp if_do(condition, value) do - if condition, do: value, else: [] - end + defp intersperse_reduce([elem | rest], separator, user_acc, reducer, acc) do + {elem, user_acc} = reducer.(elem, user_acc) + intersperse_reduce(rest, separator, user_acc, reducer, [acc, elem, separator]) + end - defp escape_string(value) when is_binary(value) do - :binary.replace(value, "'", "''", [:global]) - end + defp if_do(condition, value) do + if condition, do: value, else: [] + end - defp ecto_to_db({:array, _}), do: error!(nil, "Array type is not supported by Redshift") - defp ecto_to_db(:id), do: "bigint" - defp ecto_to_db(:serial), do: "integer" - defp ecto_to_db(:bigserial), do: "bigint" - defp ecto_to_db(:binary_id), do: "char(36)" - defp ecto_to_db(:uuid), do: "char(36)" - defp ecto_to_db(:string), do: "varchar" - defp ecto_to_db(:binary), do: "varchar(max)" - defp ecto_to_db(:map), do: "varchar(max)" - defp ecto_to_db({:map, _}), do: "varchar(max)" - defp ecto_to_db(:utc_datetime), do: "timestamp" - defp ecto_to_db(:naive_datetime), do: "timestamp" - defp ecto_to_db(other), do: Atom.to_string(other) - - defp error!(nil, message) do - raise ArgumentError, message - end + defp escape_string(value) when is_binary(value) do + :binary.replace(value, "'", "''", [:global]) + end - defp error!(query, message) do - raise Ecto.QueryError, query: query, message: message - end + defp ecto_to_db({:array, _}), do: error!(nil, "Array type is not supported by Redshift") + defp ecto_to_db(:id), do: "bigint" + defp ecto_to_db(:serial), do: "integer" + defp ecto_to_db(:bigserial), do: "bigint" + defp ecto_to_db(:binary_id), do: "char(36)" + defp ecto_to_db(:uuid), do: "char(36)" + defp ecto_to_db(:string), do: "varchar" + defp ecto_to_db(:binary), do: "varchar(max)" + defp ecto_to_db(:map), do: "varchar(max)" + defp ecto_to_db({:map, _}), do: "varchar(max)" + defp ecto_to_db(:utc_datetime), do: "timestamp" + defp ecto_to_db(:naive_datetime), do: "timestamp" + defp ecto_to_db(other), do: Atom.to_string(other) + + defp error!(nil, message) do + raise ArgumentError, message + end - defp json_library do - Application.get_env(:postgrex, :json_library) || - raise "REPLACE WITH BETTER ERROR ABOUT CONFIGURING JSON LIBRARY" - end + defp error!(query, message) do + raise Ecto.QueryError, query: query, message: message + end + + defp json_library do + Application.get_env(:aws, :json_library, Ecto.Adapter.json_library()) end end diff --git a/mix.exs b/mix.exs index 4936f87..2df2b5e 100644 --- a/mix.exs +++ b/mix.exs @@ -35,11 +35,10 @@ defmodule RedshiftEcto.MixProject do defp deps do [ {:ecto_sql, "~> 3.2"}, - {:postgrex, "~> 0.15.1"}, {:ecto_replay_sandbox, "~> 2.0"}, {:ex_doc, "~> 0.18", only: :dev, runtime: false}, {:jason, "~> 1.1", optional: true}, - {:aws, "~> 0.13"} + {:aws, "~> 1.0"} ] end From fc02791ae17bc809635eb19d756e210c0c2dd856 Mon Sep 17 00:00:00 2001 From: Evan O'Brien Date: Fri, 12 Sep 2025 11:14:48 +0200 Subject: [PATCH 3/5] docs: add Data API connection instructions --- README.md | 40 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 141e8f8..6369572 100644 --- a/README.md +++ b/README.md @@ -13,17 +13,51 @@ Add `redshift_ecto` to your list of dependencies in `mix.exs`: ```elixir def deps do [ - {:redshift_ecto, "~> 0.1.0"} + {:redshift_ecto, "~> 0.2.0"} ] end ``` -### Example configuration +## Connecting + +`RedshiftEcto` communicates with Redshift through the +[Redshift Data API](https://docs.aws.amazon.com/redshift/latest/mgmt/data-api.html) +via the [`AWS.RedshiftData`](https://hexdocs.pm/aws/AWS.RedshiftData.html) +module. The repository configuration accepts the connection options required +by the AWS client. + +### Production cluster + +Provide your AWS credentials through the environment or explicitly in the +configuration. Specify the cluster identifier, database name and either a +database user or a Secrets Manager ARN: + +```elixir +config :my_app, MyApp.Repo, + adapter: RedshiftEcto, + region: "us-east-1", + cluster_identifier: "my-redshift-cluster", + database: "dev", + db_user: "awsuser" + # or: secret_arn: "arn:aws:secretsmanager:..." +``` + +### Localstack + +When developing locally you can point the adapter at a Localstack instance +that provides the Redshift Data API. Use the Localstack endpoint and dummy +credentials: ```elixir config :my_app, MyApp.Repo, adapter: RedshiftEcto, - url: "ecto://user:pass@data-warehouse.abc123.us-east-1.redshift.amazonaws.com:5439/db" + region: "us-east-1", + cluster_identifier: "local", + database: "dev", + db_user: "test", + access_key_id: "test", + secret_access_key: "test", + endpoint: "http://localhost:4566" ``` ## Testing From 915bea8c80c18351665ab3596b97a50baef1609f Mon Sep 17 00:00:00 2001 From: Evan O'Brien Date: Fri, 12 Sep 2025 11:23:49 +0200 Subject: [PATCH 4/5] Fix nested capture in Data API --- lib/redshift_ecto/data_api.ex | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/redshift_ecto/data_api.ex b/lib/redshift_ecto/data_api.ex index e0140b9..a7a917b 100644 --- a/lib/redshift_ecto/data_api.ex +++ b/lib/redshift_ecto/data_api.ex @@ -101,7 +101,12 @@ defmodule RedshiftEcto.DataAPI do case RedshiftData.get_statement_result(client, %{id: id}) do {:ok, %{body: body}} -> columns = Enum.map(body["ColumnMetadata"], & &1["name"]) - rows = Enum.map(body["Records"], &Enum.map(&1, &value_from_field/1)) + + rows = + Enum.map(body["Records"], fn record -> + Enum.map(record, &value_from_field/1) + end) + result = %DBConnection.Result{columns: columns, rows: rows, num_rows: length(rows)} {:ok, result} From 02e39608d66c4eb1c158c561a7fb2fa9c4aea1cb Mon Sep 17 00:00:00 2001 From: Evan O'Brien Date: Fri, 12 Sep 2025 12:42:18 +0200 Subject: [PATCH 5/5] implement missing DBConnection callbacks --- lib/redshift_ecto/data_api.ex | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/lib/redshift_ecto/data_api.ex b/lib/redshift_ecto/data_api.ex index a7a917b..01ce614 100644 --- a/lib/redshift_ecto/data_api.ex +++ b/lib/redshift_ecto/data_api.ex @@ -31,6 +31,30 @@ defmodule RedshiftEcto.DataAPI do # DBConnection callbacks -------------------------------------------------- + @impl true + def checkout(state), do: {:ok, state} + + @impl true + def checkin(state), do: {:ok, state} + + @impl true + def handle_begin(_opts, state), do: {:ok, state} + + @impl true + def handle_commit(_opts, state), do: {:ok, state} + + @impl true + def handle_rollback(_opts, state), do: {:ok, state} + + @impl true + def handle_status(_opts, state), do: {:idle, state} + + @impl true + def ping(state), do: {:ok, state} + + @impl true + def handle_info(_msg, state), do: {:ok, state} + @impl true def connect(opts) do client_opts =