diff --git a/lib/philomena_media/objects.ex b/lib/philomena_media/objects.ex index f537c52f1..e9fef8a6b 100644 --- a/lib/philomena_media/objects.ex +++ b/lib/philomena_media/objects.ex @@ -42,9 +42,11 @@ defmodule PhilomenaMedia.Objects do """ alias PhilomenaMedia.Mime + alias ExAws.S3 require Logger @type key :: String.t() + @typep operation_fn :: (... -> ExAws.Operation.S3.t()) @doc """ Fetch a key from the storage backend and write it into the destination path. @@ -61,11 +63,12 @@ defmodule PhilomenaMedia.Objects do contents = backends() |> Enum.find_value(fn opts -> - ExAws.S3.get_object(opts[:bucket], key) - |> ExAws.request(opts[:config_overrides]) - |> case do - {:ok, result} -> result - _ -> nil + case request(&S3.get_object/2, [opts[:bucket], key], opts) do + {:ok, contents} -> + contents + + {:error, _} -> + nil end end) @@ -87,10 +90,10 @@ defmodule PhilomenaMedia.Objects do {_, mime} = Mime.file(file_path) contents = File.read!(file_path) - run_all(fn opts -> - ExAws.S3.put_object(opts[:bucket], key, contents, content_type: mime) - |> ExAws.request!(opts[:config_overrides]) - end) + replicate_request( + &S3.put_object/4, + &[&1[:bucket], key, {:log_byte_size, contents}, [content_type: mime]] + ) end @doc """ @@ -127,11 +130,7 @@ defmodule PhilomenaMedia.Objects do def copy(source_key, dest_key) do # Potential workaround for inconsistent PutObjectCopy on R2 # - # run_all(fn opts-> - # ExAws.S3.put_object_copy(opts[:bucket], dest_key, opts[:bucket], source_key) - # |> ExAws.request!(opts[:config_overrides]) - # end) - + # replicate_request(ExAws.S3.put_object_copy/4, &[&1[:bucket], dest_key, &1[:bucket], source_key]) try do file_path = Briefly.create!() download_file(source_key, file_path) @@ -154,10 +153,7 @@ defmodule PhilomenaMedia.Objects do """ @spec delete(key()) :: :ok def delete(key) do - run_all(fn opts -> - ExAws.S3.delete_object(opts[:bucket], key) - |> ExAws.request!(opts[:config_overrides]) - end) + replicate_request(&S3.delete_object/2, &[&1[:bucket], key]) end @doc """ @@ -176,32 +172,108 @@ defmodule PhilomenaMedia.Objects do """ @spec delete_multiple([key()]) :: :ok def delete_multiple(keys) do - run_all(fn opts -> - ExAws.S3.delete_multiple_objects(opts[:bucket], keys) - |> ExAws.request!(opts[:config_overrides]) - end) + replicate_request(&S3.delete_multiple_objects/2, &[&1[:bucket], keys]) end - defp run_all(wrapped) do - fun = fn opts -> - try do - wrapped.(opts) - :ok - catch - _kind, _value -> :error - end - end + # Run the S3 operation with the given list of arguments. The `opts` parameter + # is used to select the specific S3 backend to run the operation against. See + # the functions `primary_opts/0` and `replica_opts/0` responsible for reading + # the config for the primary and replica S3-compatible storages. + # + # This function serves as a thin wrapper over this call: + # ```ex + # operation_fn(...args) |> ExAws.request(opts[:config_overrides]) + # ``` + # + # Everything else in this function is just logging the request and the + # potential error. + # + # # Huge Payloads Logging + # + # There is a special case of the `s3:PutObject` request that accepts a binary + # payload to upload with the maximum size of 5GB (according to AWS limits). + # For this use case, this function specially handles arguments in the `args` + # list of shape `{:log_byte_size, binary}`. This is used to log the size of the + # binary payload in MB instead of logging the entire payload itself, which + # would be wasteful and useless. + @spec request(operation_fn(), [term()], keyword()) :: term() + defp request(operation, args, opts) do + {:name, operation_name} = Function.info(operation, :name) + + Logger.debug(fn -> + args_debug = + args + |> Enum.map(fn + {:log_byte_size, arg} -> "#{(byte_size(arg) / 1_000_000) |> Float.round(2)} MB" + arg -> inspect(arg) + end) + |> Enum.join(", ") + + "S3.#{operation_name}(#{args_debug})" + end) - backends() - |> Task.async_stream(fun, timeout: :infinity) - |> Enum.any?(fn {_, v} -> v == :error end) - |> if do - Logger.warning("Failed to operate on all backends") - else - :ok + args = + args + |> Enum.map(fn + {:log_byte_size, arg} -> arg + arg -> arg + end) + + operation + |> apply(args) + |> ExAws.request(opts[:config_overrides]) + |> case do + {:ok, output} -> + {:ok, output} + + # Specially handle the `:http_error` case. This is the most frequent error + # that can happen when the S3 backend responds with an error like + # `BucketNotFound` or `InvalidRequest`. In this case we are most + # interested in the response status and body which fully describe the + # error. We do it this way to provide nicer formatting for such errors. + {:error, {:http_error, status_code, %{body: body}} = err} -> + Logger.warning( + "S3.#{operation_name} failed (HTTP #{inspect(status_code)})\nError: #{body}" + ) + + {:error, err} + + # This is a less likely generic case of an error like connection timeout + {:error, err} -> + Logger.warning("S3.#{operation_name} failed\nError: #{inspect(err, pretty: true)}") + {:error, err} end + end - :ok + # Run the S3 request across the primary and replica backends. This is only + # useful for mutating operations that need to write the new changes to both + # destinations. Any errors will be just logged and **not** propagated to the + # caller. + # + # Ideally, a pro-active alert could be triggered to notify the ops about the + # issue immediately so they fix the problem and retry the upload. We'll leave + # this improvement for another day. + @spec replicate_request(operation_fn(), (keyword() -> [term()])) :: :ok + defp replicate_request(operation, args) do + {:name, operation_name} = Function.info(operation, :name) + backends = backends() + + total_err = + backends + |> Task.async_stream(&request(operation, args.(&1), &1), timeout: :infinity) + |> Enum.filter(&(not match?({:ok, {:ok, _}}, &1))) + |> Enum.count() + + cond do + total_err > 0 and total_err == length(backends) -> + Logger.error("S3.#{operation_name} failed for all (#{total_err}) backends") + + total_err > 0 -> + Logger.warning("S3.#{operation_name} failed for #{total_err} backends") + + true -> + :ok + end end defp backends do