From 945b6a326fe98294d0e5c174ecbc6ee5bba9a0fa Mon Sep 17 00:00:00 2001 From: MareStare Date: Sun, 30 Mar 2025 19:42:05 +0000 Subject: [PATCH 1/7] Fix the tag updates atomicity and deadlocks --- lib/philomena/images.ex | 21 +++--- lib/philomena/tags.ex | 160 +++++++++++++++++++++++++++++++--------- 2 files changed, 138 insertions(+), 43 deletions(-) diff --git a/lib/philomena/images.ex b/lib/philomena/images.ex index fe3f27d01..605cc3b85 100644 --- a/lib/philomena/images.ex +++ b/lib/philomena/images.ex @@ -96,22 +96,25 @@ defmodule Philomena.Images do |> Multi.insert(:image, image) |> Multi.run(:added_tag_count, fn repo, %{image: image} -> tag_ids = image.added_tags |> Enum.map(& &1.id) - tags = Tag |> where([t], t.id in ^tag_ids) - {count, nil} = repo.update_all(tags, inc: [images_count: 1]) + count = Tags.update_images_counts(repo, +1, tag_ids) {:ok, count} end) |> maybe_subscribe_on(:image, attribution[:user], :watch_on_upload) |> Repo.transaction() |> case do - {:ok, %{image: image}} = result -> - async_upload(image, attrs["image"]) + {:ok, %{image: image}} -> + upload_pid = async_upload(image, attrs["image"]) reindex_image(image) Tags.reindex_tags(image.added_tags) maybe_approve_image(image, attribution[:user]) - result + # Return the upload PID along with the created image so that the caller + # can control the lifecycle of the upload if needed. It's useful, for + # example for the seeding process to know when to delete the temp file + # used for uploading. + {:ok, %{image: image, upload_pid: upload_pid}} result -> result @@ -138,6 +141,8 @@ defmodule Philomena.Images do # Free up the linked process send(linked_pid, :ready) + + linked_pid end defp try_upload(image, retry_count) when retry_count < 100 do @@ -666,9 +671,8 @@ defmodule Philomena.Images do repo, %{image: {_image, _added, removed_tags}} -> tag_ids = removed_tags |> Enum.map(& &1.id) - tags = Tag |> where([t], t.id in ^tag_ids) - {count, nil} = repo.update_all(tags, inc: [images_count: -1]) + count = Tags.update_images_counts(repo, -1, tag_ids) {:ok, count} end) @@ -955,9 +959,8 @@ defmodule Philomena.Images do # to way too much drift, and the index has to be # maintained. tag_ids = Enum.map(image.tags, & &1.id) - query = where(Tag, [t], t.id in ^tag_ids) - repo.update_all(query, inc: [images_count: -1]) + Tags.update_images_counts(repo, -1, tag_ids) {:ok, image.tags} end) diff --git a/lib/philomena/tags.ex b/lib/philomena/tags.ex index 11595f3a7..dc484748d 100644 --- a/lib/philomena/tags.ex +++ b/lib/philomena/tags.ex @@ -4,6 +4,7 @@ defmodule Philomena.Tags do """ import Ecto.Query, warn: false + alias Ecto.Multi alias Philomena.Repo alias PhilomenaQuery.Search @@ -24,6 +25,47 @@ defmodule Philomena.Tags do alias Philomena.DnpEntries.DnpEntry alias Philomena.Channels.Channel + # There is a really delicate nuance that must be known to avoid deadlocks in + # vectorized mutation queries such as `INSERT ON CONFLICT UPDATE`, `UPDATE`, + # `DELETE`, `SELECT FOR [NO KEY] UPDATE` that touch multiple records. Note that + # `INSERT ON CONFLICT DO NOTHING` doesn't lock the conflicting records, so this + # nuance doesn't apply in that case (https://dba.stackexchange.com/questions/322912/will-insert-on-conflict-do-nothing-lock-the-row-in-case-of-conflict) + # + # If a vectorized mutation is run without a consistent locking order of the records, + # it can end up with a deadlock where one transaction locks a set of records + # that overlap with the other transaction while the other transaction locks + # the other set that overlaps with the first transaction. Thus, both transactions + # wait for each other to release the locks on records they locked resulting in + # a deadlock. + # + # For raw `UPDATE/DELETE ... WHERE ... IN (...)` queries, the items inside `IN (...)` + # don't influence the order of locking. These queries also don't have an `ORDER BY` + # clause. Thus, this function returns a `SELECT [lock_type]` query that establishes a + # consistent order of records by primary keys that must be used with all vectorized + # mutation queries to avoid deadlocks. This query can be used as a subquery in + # the `WHERE` clause for the vectorized mutation. + # + # If no locking order is set, the deadlock can appear randomly and its probability + # increases with the amount of items in the vectorized mutation query and with + # the number of overlapping records in concurrent transactions. + # + # This phenomena was discovered when @MareStare was trying to parallelize + # the image creation process for seeding the images during development, where + # tons of image uploads are issued in parallel with many overlapping tags + # (https://github.com/philomena-dev/philomena/pull/481). + # + # Big thanks to this StackOverflow post for explanations: + # https://stackoverflow.com/questions/27262900/postgres-update-and-lock-ordering/27263824#27263824 + defmacro vectorized_mutation_lock(lock_type, tag_ids) do + quote do + Tag + |> select([t], t.id) + |> lock(unquote(lock_type)) + |> where([t], t.id in ^unquote(tag_ids)) + |> order_by([t], t.id) + end + end + @doc """ Gets existing tags or creates new ones from a tag list string. @@ -39,43 +81,74 @@ defmodule Philomena.Tags do """ @spec get_or_create_tags(String.t()) :: list() def get_or_create_tags(tag_list) do - tag_names = Tag.parse_tag_list(tag_list) - - existent_tags = - Tag - |> where([t], t.name in ^tag_names) - |> preload([:implied_tags, aliased_tag: :implied_tags]) - |> Repo.all() - |> Enum.uniq_by(& &1.name) - - existent_tag_names = - existent_tags - |> Map.new(&{&1.name, true}) + case Tag.parse_tag_list(tag_list) do + [] -> [] + tag_names -> get_or_create_non_empty_tags_list(tag_names) + end + end - nonexistent_tag_names = + @spec get_or_create_non_empty_tags_list(list(String.t())) :: list() + defp get_or_create_non_empty_tags_list(tag_names) do + tags = tag_names - |> Enum.reject(&existent_tag_names[&1]) - - # Now get rid of the aliases - existent_tags = - existent_tags - |> Enum.map(&(&1.aliased_tag || &1)) - - new_tags = - nonexistent_tag_names - |> Enum.map(fn name -> - {:ok, tag} = - %Tag{} - |> Tag.creation_changeset(%{name: name}) - |> Repo.insert() - - %{tag | implied_tags: []} - end) + |> Enum.sort() + |> Enum.dedup() + |> Enum.map( + &(%Tag{ + created_at: {:placeholder, :timestamp}, + updated_at: {:placeholder, :timestamp} + } + |> Tag.creation_changeset(%{name: &1}) + |> Ecto.Changeset.apply_changes() + |> Map.from_struct() + |> Map.drop([ + :id, + :__meta__, + :aliases, + :aliased_tag, + :channels, + :implied_tags, + :implied_by_tags, + :verified_links, + :public_links, + :hidden_links, + :dnp_entries, + :uploaded_image, + :removed_image, + :implied_tag_list + ])) + ) + + %{new_tags: {_rows_affected, new_tags}, all_tags: all_tags} = + Multi.new() + |> Multi.insert_all( + :new_tags, + Tag, + tags, + placeholders: %{timestamp: DateTime.utc_now() |> DateTime.truncate(:second)}, + on_conflict: :nothing, + returning: [:id] + ) + |> Multi.all( + :all_tags, + Tag + |> where([t], t.name in ^tag_names) + |> distinct([t], t.name) + |> preload([:implied_tags, aliased_tag: :implied_tags]) + ) + |> Repo.transaction() + |> case do + {:ok, ok} -> + ok + + {:error, err} -> + raise "get_or_create_tags failed: #{inspect(err)}\ntag_names: #{inspect(tag_names)}" + end new_tags |> reindex_tags() - existent_tags ++ new_tags + all_tags end @doc """ @@ -545,9 +618,7 @@ defmodule Philomena.Tags do tag_ids = Enum.map(taggings, & &1.tag_id) - Tag - |> where([t], t.id in ^tag_ids) - |> Repo.update_all(inc: [images_count: 1]) + update_images_counts(Repo, +1, tag_ids) tag_ids end) @@ -557,6 +628,27 @@ defmodule Philomena.Tags do |> Repo.all() end + @doc """ + Accepts IDs of tags and increments their `images_count` by 1. + """ + @spec update_images_counts(term(), integer(), [integer()]) :: integer() + def update_images_counts(repo, diff, tag_ids) do + case tag_ids do + [] -> + 0 + + _ -> + locked_tags = vectorized_mutation_lock("FOR NO KEY UPDATE", tag_ids) + + {rows_affected, _} = + Tag + |> where([t], t.id in subquery(locked_tags)) + |> repo.update_all(inc: [images_count: diff]) + + rows_affected + end + end + @doc """ Returns an `%Ecto.Changeset{}` for tracking tag changes. From e72f7db356a7715cc4bbb0ca520349d481e9b4cb Mon Sep 17 00:00:00 2001 From: MareStare Date: Sun, 30 Mar 2025 21:03:50 +0000 Subject: [PATCH 2/7] Replace `Map.drop()` with `Map.take()` as per Liam's feedback in DM --- lib/philomena/tags.ex | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/lib/philomena/tags.ex b/lib/philomena/tags.ex index dc484748d..55422e723 100644 --- a/lib/philomena/tags.ex +++ b/lib/philomena/tags.ex @@ -94,29 +94,27 @@ defmodule Philomena.Tags do |> Enum.sort() |> Enum.dedup() |> Enum.map( - &(%Tag{ - created_at: {:placeholder, :timestamp}, - updated_at: {:placeholder, :timestamp} - } + &(%Tag{} |> Tag.creation_changeset(%{name: &1}) |> Ecto.Changeset.apply_changes() - |> Map.from_struct() - |> Map.drop([ - :id, - :__meta__, - :aliases, - :aliased_tag, - :channels, - :implied_tags, - :implied_by_tags, - :verified_links, - :public_links, - :hidden_links, - :dnp_entries, - :uploaded_image, - :removed_image, - :implied_tag_list - ])) + |> Map.take([ + :slug, + :name, + :category, + :images_count, + :description, + :short_description, + :namespace, + :name_in_namespace, + :image, + :image_format, + :image_mime_type, + :mod_notes + ]) + |> Map.merge(%{ + created_at: {:placeholder, :timestamp}, + updated_at: {:placeholder, :timestamp} + })) ) %{new_tags: {_rows_affected, new_tags}, all_tags: all_tags} = From a7f4105cce5c713a73c629e49f26499b87aa3992 Mon Sep 17 00:00:00 2001 From: MareStare Date: Mon, 31 Mar 2025 20:37:26 +0000 Subject: [PATCH 3/7] Fix error handling since there will be a tuple of more than 2 items returned as per docs --- lib/philomena/tags.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/philomena/tags.ex b/lib/philomena/tags.ex index 55422e723..9e542929d 100644 --- a/lib/philomena/tags.ex +++ b/lib/philomena/tags.ex @@ -139,8 +139,8 @@ defmodule Philomena.Tags do {:ok, ok} -> ok - {:error, err} -> - raise "get_or_create_tags failed: #{inspect(err)}\ntag_names: #{inspect(tag_names)}" + result -> + raise "get_or_create_tags failed: #{inspect(result)}\ntag_names: #{inspect(tag_names)}" end new_tags From 79c794c0c4fa62f7da021482600c77217c23bfc7 Mon Sep 17 00:00:00 2001 From: MareStare Date: Mon, 12 May 2025 08:29:33 +0000 Subject: [PATCH 4/7] Replace `+1` with `1` for update_images_counts --- lib/philomena/images.ex | 2 +- lib/philomena/tags.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/philomena/images.ex b/lib/philomena/images.ex index 605cc3b85..73c9f00cd 100644 --- a/lib/philomena/images.ex +++ b/lib/philomena/images.ex @@ -97,7 +97,7 @@ defmodule Philomena.Images do |> Multi.run(:added_tag_count, fn repo, %{image: image} -> tag_ids = image.added_tags |> Enum.map(& &1.id) - count = Tags.update_images_counts(repo, +1, tag_ids) + count = Tags.update_images_counts(repo, 1, tag_ids) {:ok, count} end) diff --git a/lib/philomena/tags.ex b/lib/philomena/tags.ex index 9e542929d..bd0b979e2 100644 --- a/lib/philomena/tags.ex +++ b/lib/philomena/tags.ex @@ -616,7 +616,7 @@ defmodule Philomena.Tags do tag_ids = Enum.map(taggings, & &1.tag_id) - update_images_counts(Repo, +1, tag_ids) + update_images_counts(Repo, 1, tag_ids) tag_ids end) From 270825c5ccae6a26370fe72ee97cdee699c62cc2 Mon Sep 17 00:00:00 2001 From: MareStare Date: Mon, 12 May 2025 08:43:57 +0000 Subject: [PATCH 5/7] Rename `update_images_counts` to `update_image_counts` and replace `case` with a function-level match --- lib/philomena/images.ex | 6 +++--- lib/philomena/tags.ex | 26 ++++++++++++-------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/lib/philomena/images.ex b/lib/philomena/images.ex index 73c9f00cd..2ab2d282e 100644 --- a/lib/philomena/images.ex +++ b/lib/philomena/images.ex @@ -97,7 +97,7 @@ defmodule Philomena.Images do |> Multi.run(:added_tag_count, fn repo, %{image: image} -> tag_ids = image.added_tags |> Enum.map(& &1.id) - count = Tags.update_images_counts(repo, 1, tag_ids) + count = Tags.update_image_counts(repo, 1, tag_ids) {:ok, count} end) @@ -672,7 +672,7 @@ defmodule Philomena.Images do repo, %{image: {_image, _added, removed_tags}} -> tag_ids = removed_tags |> Enum.map(& &1.id) - count = Tags.update_images_counts(repo, -1, tag_ids) + count = Tags.update_image_counts(repo, -1, tag_ids) {:ok, count} end) @@ -960,7 +960,7 @@ defmodule Philomena.Images do # maintained. tag_ids = Enum.map(image.tags, & &1.id) - Tags.update_images_counts(repo, -1, tag_ids) + Tags.update_image_counts(repo, -1, tag_ids) {:ok, image.tags} end) diff --git a/lib/philomena/tags.ex b/lib/philomena/tags.ex index bd0b979e2..e735a632f 100644 --- a/lib/philomena/tags.ex +++ b/lib/philomena/tags.ex @@ -616,7 +616,7 @@ defmodule Philomena.Tags do tag_ids = Enum.map(taggings, & &1.tag_id) - update_images_counts(Repo, 1, tag_ids) + update_image_counts(Repo, 1, tag_ids) tag_ids end) @@ -629,22 +629,20 @@ defmodule Philomena.Tags do @doc """ Accepts IDs of tags and increments their `images_count` by 1. """ - @spec update_images_counts(term(), integer(), [integer()]) :: integer() - def update_images_counts(repo, diff, tag_ids) do - case tag_ids do - [] -> - 0 + @spec update_image_counts(term(), integer(), [integer()]) :: integer() + def update_image_counts(repo, diff, tag_ids) - _ -> - locked_tags = vectorized_mutation_lock("FOR NO KEY UPDATE", tag_ids) + def update_image_counts(nil, _diff, []), do: 0 - {rows_affected, _} = - Tag - |> where([t], t.id in subquery(locked_tags)) - |> repo.update_all(inc: [images_count: diff]) + def update_image_counts(repo, diff, tag_ids) do + locked_tags = vectorized_mutation_lock("FOR NO KEY UPDATE", tag_ids) - rows_affected - end + {rows_affected, _} = + Tag + |> where([t], t.id in subquery(locked_tags)) + |> repo.update_all(inc: [images_count: diff]) + + rows_affected end @doc """ From 343b6e1751d7553733da0168d770928f40166cf2 Mon Sep 17 00:00:00 2001 From: MareStare Date: Mon, 12 May 2025 08:52:59 +0000 Subject: [PATCH 6/7] Replace the capture operator with an explicit `fn tag_name ->` --- lib/philomena/tags.ex | 46 +++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/lib/philomena/tags.ex b/lib/philomena/tags.ex index e735a632f..fb53cfd35 100644 --- a/lib/philomena/tags.ex +++ b/lib/philomena/tags.ex @@ -93,29 +93,29 @@ defmodule Philomena.Tags do tag_names |> Enum.sort() |> Enum.dedup() - |> Enum.map( - &(%Tag{} - |> Tag.creation_changeset(%{name: &1}) - |> Ecto.Changeset.apply_changes() - |> Map.take([ - :slug, - :name, - :category, - :images_count, - :description, - :short_description, - :namespace, - :name_in_namespace, - :image, - :image_format, - :image_mime_type, - :mod_notes - ]) - |> Map.merge(%{ - created_at: {:placeholder, :timestamp}, - updated_at: {:placeholder, :timestamp} - })) - ) + |> Enum.map(fn tag_name -> + %Tag{} + |> Tag.creation_changeset(%{name: tag_name}) + |> Ecto.Changeset.apply_changes() + |> Map.take([ + :slug, + :name, + :category, + :images_count, + :description, + :short_description, + :namespace, + :name_in_namespace, + :image, + :image_format, + :image_mime_type, + :mod_notes + ]) + |> Map.merge(%{ + created_at: {:placeholder, :timestamp}, + updated_at: {:placeholder, :timestamp} + }) + end) %{new_tags: {_rows_affected, new_tags}, all_tags: all_tags} = Multi.new() From 1a601a91727516ea9f21b842d15f4cb986ab5ac8 Mon Sep 17 00:00:00 2001 From: MareStare Date: Mon, 12 May 2025 08:53:37 +0000 Subject: [PATCH 7/7] Fix remove repo nil match autocompleted by Github Copilot (bruh) --- lib/philomena/tags.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/philomena/tags.ex b/lib/philomena/tags.ex index fb53cfd35..62cc874a9 100644 --- a/lib/philomena/tags.ex +++ b/lib/philomena/tags.ex @@ -632,7 +632,7 @@ defmodule Philomena.Tags do @spec update_image_counts(term(), integer(), [integer()]) :: integer() def update_image_counts(repo, diff, tag_ids) - def update_image_counts(nil, _diff, []), do: 0 + def update_image_counts(_repo, _diff, []), do: 0 def update_image_counts(repo, diff, tag_ids) do locked_tags = vectorized_mutation_lock("FOR NO KEY UPDATE", tag_ids)