diff --git a/lib/philomena/images.ex b/lib/philomena/images.ex index fe3f27d01..2ab2d282e 100644 --- a/lib/philomena/images.ex +++ b/lib/philomena/images.ex @@ -96,22 +96,25 @@ defmodule Philomena.Images do |> Multi.insert(:image, image) |> Multi.run(:added_tag_count, fn repo, %{image: image} -> tag_ids = image.added_tags |> Enum.map(& &1.id) - tags = Tag |> where([t], t.id in ^tag_ids) - {count, nil} = repo.update_all(tags, inc: [images_count: 1]) + count = Tags.update_image_counts(repo, 1, tag_ids) {:ok, count} end) |> maybe_subscribe_on(:image, attribution[:user], :watch_on_upload) |> Repo.transaction() |> case do - {:ok, %{image: image}} = result -> - async_upload(image, attrs["image"]) + {:ok, %{image: image}} -> + upload_pid = async_upload(image, attrs["image"]) reindex_image(image) Tags.reindex_tags(image.added_tags) maybe_approve_image(image, attribution[:user]) - result + # Return the upload PID along with the created image so that the caller + # can control the lifecycle of the upload if needed. It's useful, for + # example for the seeding process to know when to delete the temp file + # used for uploading. + {:ok, %{image: image, upload_pid: upload_pid}} result -> result @@ -138,6 +141,8 @@ defmodule Philomena.Images do # Free up the linked process send(linked_pid, :ready) + + linked_pid end defp try_upload(image, retry_count) when retry_count < 100 do @@ -666,9 +671,8 @@ defmodule Philomena.Images do repo, %{image: {_image, _added, removed_tags}} -> tag_ids = removed_tags |> Enum.map(& &1.id) - tags = Tag |> where([t], t.id in ^tag_ids) - {count, nil} = repo.update_all(tags, inc: [images_count: -1]) + count = Tags.update_image_counts(repo, -1, tag_ids) {:ok, count} end) @@ -955,9 +959,8 @@ defmodule Philomena.Images do # to way too much drift, and the index has to be # maintained. tag_ids = Enum.map(image.tags, & &1.id) - query = where(Tag, [t], t.id in ^tag_ids) - repo.update_all(query, inc: [images_count: -1]) + Tags.update_image_counts(repo, -1, tag_ids) {:ok, image.tags} end) diff --git a/lib/philomena/tags.ex b/lib/philomena/tags.ex index 11595f3a7..62cc874a9 100644 --- a/lib/philomena/tags.ex +++ b/lib/philomena/tags.ex @@ -4,6 +4,7 @@ defmodule Philomena.Tags do """ import Ecto.Query, warn: false + alias Ecto.Multi alias Philomena.Repo alias PhilomenaQuery.Search @@ -24,6 +25,47 @@ defmodule Philomena.Tags do alias Philomena.DnpEntries.DnpEntry alias Philomena.Channels.Channel + # There is a really delicate nuance that must be known to avoid deadlocks in + # vectorized mutation queries such as `INSERT ON CONFLICT UPDATE`, `UPDATE`, + # `DELETE`, `SELECT FOR [NO KEY] UPDATE` that touch multiple records. Note that + # `INSERT ON CONFLICT DO NOTHING` doesn't lock the conflicting records, so this + # nuance doesn't apply in that case (https://dba.stackexchange.com/questions/322912/will-insert-on-conflict-do-nothing-lock-the-row-in-case-of-conflict) + # + # If a vectorized mutation is run without a consistent locking order of the records, + # it can end up with a deadlock where one transaction locks a set of records + # that overlap with the other transaction while the other transaction locks + # the other set that overlaps with the first transaction. Thus, both transactions + # wait for each other to release the locks on records they locked resulting in + # a deadlock. + # + # For raw `UPDATE/DELETE ... WHERE ... IN (...)` queries, the items inside `IN (...)` + # don't influence the order of locking. These queries also don't have an `ORDER BY` + # clause. Thus, this function returns a `SELECT [lock_type]` query that establishes a + # consistent order of records by primary keys that must be used with all vectorized + # mutation queries to avoid deadlocks. This query can be used as a subquery in + # the `WHERE` clause for the vectorized mutation. + # + # If no locking order is set, the deadlock can appear randomly and its probability + # increases with the amount of items in the vectorized mutation query and with + # the number of overlapping records in concurrent transactions. + # + # This phenomena was discovered when @MareStare was trying to parallelize + # the image creation process for seeding the images during development, where + # tons of image uploads are issued in parallel with many overlapping tags + # (https://github.com/philomena-dev/philomena/pull/481). + # + # Big thanks to this StackOverflow post for explanations: + # https://stackoverflow.com/questions/27262900/postgres-update-and-lock-ordering/27263824#27263824 + defmacro vectorized_mutation_lock(lock_type, tag_ids) do + quote do + Tag + |> select([t], t.id) + |> lock(unquote(lock_type)) + |> where([t], t.id in ^unquote(tag_ids)) + |> order_by([t], t.id) + end + end + @doc """ Gets existing tags or creates new ones from a tag list string. @@ -39,43 +81,72 @@ defmodule Philomena.Tags do """ @spec get_or_create_tags(String.t()) :: list() def get_or_create_tags(tag_list) do - tag_names = Tag.parse_tag_list(tag_list) - - existent_tags = - Tag - |> where([t], t.name in ^tag_names) - |> preload([:implied_tags, aliased_tag: :implied_tags]) - |> Repo.all() - |> Enum.uniq_by(& &1.name) - - existent_tag_names = - existent_tags - |> Map.new(&{&1.name, true}) + case Tag.parse_tag_list(tag_list) do + [] -> [] + tag_names -> get_or_create_non_empty_tags_list(tag_names) + end + end - nonexistent_tag_names = + @spec get_or_create_non_empty_tags_list(list(String.t())) :: list() + defp get_or_create_non_empty_tags_list(tag_names) do + tags = tag_names - |> Enum.reject(&existent_tag_names[&1]) - - # Now get rid of the aliases - existent_tags = - existent_tags - |> Enum.map(&(&1.aliased_tag || &1)) - - new_tags = - nonexistent_tag_names - |> Enum.map(fn name -> - {:ok, tag} = - %Tag{} - |> Tag.creation_changeset(%{name: name}) - |> Repo.insert() - - %{tag | implied_tags: []} + |> Enum.sort() + |> Enum.dedup() + |> Enum.map(fn tag_name -> + %Tag{} + |> Tag.creation_changeset(%{name: tag_name}) + |> Ecto.Changeset.apply_changes() + |> Map.take([ + :slug, + :name, + :category, + :images_count, + :description, + :short_description, + :namespace, + :name_in_namespace, + :image, + :image_format, + :image_mime_type, + :mod_notes + ]) + |> Map.merge(%{ + created_at: {:placeholder, :timestamp}, + updated_at: {:placeholder, :timestamp} + }) end) + %{new_tags: {_rows_affected, new_tags}, all_tags: all_tags} = + Multi.new() + |> Multi.insert_all( + :new_tags, + Tag, + tags, + placeholders: %{timestamp: DateTime.utc_now() |> DateTime.truncate(:second)}, + on_conflict: :nothing, + returning: [:id] + ) + |> Multi.all( + :all_tags, + Tag + |> where([t], t.name in ^tag_names) + |> distinct([t], t.name) + |> preload([:implied_tags, aliased_tag: :implied_tags]) + ) + |> Repo.transaction() + |> case do + {:ok, ok} -> + ok + + result -> + raise "get_or_create_tags failed: #{inspect(result)}\ntag_names: #{inspect(tag_names)}" + end + new_tags |> reindex_tags() - existent_tags ++ new_tags + all_tags end @doc """ @@ -545,9 +616,7 @@ defmodule Philomena.Tags do tag_ids = Enum.map(taggings, & &1.tag_id) - Tag - |> where([t], t.id in ^tag_ids) - |> Repo.update_all(inc: [images_count: 1]) + update_image_counts(Repo, 1, tag_ids) tag_ids end) @@ -557,6 +626,25 @@ defmodule Philomena.Tags do |> Repo.all() end + @doc """ + Accepts IDs of tags and increments their `images_count` by 1. + """ + @spec update_image_counts(term(), integer(), [integer()]) :: integer() + def update_image_counts(repo, diff, tag_ids) + + def update_image_counts(_repo, _diff, []), do: 0 + + def update_image_counts(repo, diff, tag_ids) do + locked_tags = vectorized_mutation_lock("FOR NO KEY UPDATE", tag_ids) + + {rows_affected, _} = + Tag + |> where([t], t.id in subquery(locked_tags)) + |> repo.update_all(inc: [images_count: diff]) + + rows_affected + end + @doc """ Returns an `%Ecto.Changeset{}` for tracking tag changes.