Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions lib/philomena/images.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,25 @@ defmodule Philomena.Images do
|> Multi.insert(:image, image)
|> Multi.run(:added_tag_count, fn repo, %{image: image} ->
tag_ids = image.added_tags |> Enum.map(& &1.id)
tags = Tag |> where([t], t.id in ^tag_ids)

{count, nil} = repo.update_all(tags, inc: [images_count: 1])
count = Tags.update_image_counts(repo, 1, tag_ids)

{:ok, count}
end)
|> maybe_subscribe_on(:image, attribution[:user], :watch_on_upload)
|> Repo.transaction()
|> case do
{:ok, %{image: image}} = result ->
async_upload(image, attrs["image"])
{:ok, %{image: image}} ->
upload_pid = async_upload(image, attrs["image"])
reindex_image(image)
Tags.reindex_tags(image.added_tags)
maybe_approve_image(image, attribution[:user])

result
# Return the upload PID along with the created image so that the caller
# can control the lifecycle of the upload if needed. It's useful, for
# example for the seeding process to know when to delete the temp file
# used for uploading.
{:ok, %{image: image, upload_pid: upload_pid}}

result ->
result
Expand All @@ -138,6 +141,8 @@ defmodule Philomena.Images do

# Free up the linked process
send(linked_pid, :ready)

linked_pid
end

defp try_upload(image, retry_count) when retry_count < 100 do
Expand Down Expand Up @@ -666,9 +671,8 @@ defmodule Philomena.Images do

repo, %{image: {_image, _added, removed_tags}} ->
tag_ids = removed_tags |> Enum.map(& &1.id)
tags = Tag |> where([t], t.id in ^tag_ids)

{count, nil} = repo.update_all(tags, inc: [images_count: -1])
count = Tags.update_image_counts(repo, -1, tag_ids)

{:ok, count}
end)
Expand Down Expand Up @@ -955,9 +959,8 @@ defmodule Philomena.Images do
# to way too much drift, and the index has to be
# maintained.
tag_ids = Enum.map(image.tags, & &1.id)
query = where(Tag, [t], t.id in ^tag_ids)

repo.update_all(query, inc: [images_count: -1])
Tags.update_image_counts(repo, -1, tag_ids)

{:ok, image.tags}
end)
Expand Down
154 changes: 121 additions & 33 deletions lib/philomena/tags.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Philomena.Tags do
"""

import Ecto.Query, warn: false
alias Ecto.Multi
alias Philomena.Repo

alias PhilomenaQuery.Search
Expand All @@ -24,6 +25,47 @@ defmodule Philomena.Tags do
alias Philomena.DnpEntries.DnpEntry
alias Philomena.Channels.Channel

# There is a really delicate nuance that must be known to avoid deadlocks in
# vectorized mutation queries such as `INSERT ON CONFLICT UPDATE`, `UPDATE`,
# `DELETE`, `SELECT FOR [NO KEY] UPDATE` that touch multiple records. Note that
# `INSERT ON CONFLICT DO NOTHING` doesn't lock the conflicting records, so this
# nuance doesn't apply in that case (https://dba.stackexchange.com/questions/322912/will-insert-on-conflict-do-nothing-lock-the-row-in-case-of-conflict)
#
# If a vectorized mutation is run without a consistent locking order of the records,
# it can end up with a deadlock where one transaction locks a set of records
# that overlap with the other transaction while the other transaction locks
# the other set that overlaps with the first transaction. Thus, both transactions
# wait for each other to release the locks on records they locked resulting in
# a deadlock.
#
# For raw `UPDATE/DELETE ... WHERE ... IN (...)` queries, the items inside `IN (...)`
# don't influence the order of locking. These queries also don't have an `ORDER BY`
# clause. Thus, this function returns a `SELECT [lock_type]` query that establishes a
# consistent order of records by primary keys that must be used with all vectorized
# mutation queries to avoid deadlocks. This query can be used as a subquery in
# the `WHERE` clause for the vectorized mutation.
#
# If no locking order is set, the deadlock can appear randomly and its probability
# increases with the amount of items in the vectorized mutation query and with
# the number of overlapping records in concurrent transactions.
#
# This phenomena was discovered when @MareStare was trying to parallelize
# the image creation process for seeding the images during development, where
# tons of image uploads are issued in parallel with many overlapping tags
# (https://github.com/philomena-dev/philomena/pull/481).
#
# Big thanks to this StackOverflow post for explanations:
# https://stackoverflow.com/questions/27262900/postgres-update-and-lock-ordering/27263824#27263824
defmacro vectorized_mutation_lock(lock_type, tag_ids) do
quote do
Tag
|> select([t], t.id)
|> lock(unquote(lock_type))
|> where([t], t.id in ^unquote(tag_ids))
|> order_by([t], t.id)
end
end

@doc """
Gets existing tags or creates new ones from a tag list string.

Expand All @@ -39,43 +81,72 @@ defmodule Philomena.Tags do
"""
@spec get_or_create_tags(String.t()) :: list()
def get_or_create_tags(tag_list) do
tag_names = Tag.parse_tag_list(tag_list)

existent_tags =
Tag
|> where([t], t.name in ^tag_names)
|> preload([:implied_tags, aliased_tag: :implied_tags])
|> Repo.all()
|> Enum.uniq_by(& &1.name)

existent_tag_names =
existent_tags
|> Map.new(&{&1.name, true})
case Tag.parse_tag_list(tag_list) do
[] -> []
tag_names -> get_or_create_non_empty_tags_list(tag_names)
end
end

nonexistent_tag_names =
@spec get_or_create_non_empty_tags_list(list(String.t())) :: list()
defp get_or_create_non_empty_tags_list(tag_names) do
tags =
tag_names
|> Enum.reject(&existent_tag_names[&1])

# Now get rid of the aliases
existent_tags =
existent_tags
|> Enum.map(&(&1.aliased_tag || &1))

new_tags =
nonexistent_tag_names
|> Enum.map(fn name ->
{:ok, tag} =
%Tag{}
|> Tag.creation_changeset(%{name: name})
|> Repo.insert()

%{tag | implied_tags: []}
|> Enum.sort()
|> Enum.dedup()
|> Enum.map(fn tag_name ->
%Tag{}
|> Tag.creation_changeset(%{name: tag_name})
|> Ecto.Changeset.apply_changes()
|> Map.take([
:slug,
:name,
:category,
:images_count,
:description,
:short_description,
:namespace,
:name_in_namespace,
:image,
:image_format,
:image_mime_type,
:mod_notes
])
|> Map.merge(%{
created_at: {:placeholder, :timestamp},
updated_at: {:placeholder, :timestamp}
})
end)

%{new_tags: {_rows_affected, new_tags}, all_tags: all_tags} =
Multi.new()
|> Multi.insert_all(
:new_tags,
Tag,
tags,
placeholders: %{timestamp: DateTime.utc_now() |> DateTime.truncate(:second)},
on_conflict: :nothing,
returning: [:id]
)
|> Multi.all(
:all_tags,
Tag
|> where([t], t.name in ^tag_names)
|> distinct([t], t.name)
|> preload([:implied_tags, aliased_tag: :implied_tags])
)
|> Repo.transaction()
|> case do
{:ok, ok} ->
ok

result ->
raise "get_or_create_tags failed: #{inspect(result)}\ntag_names: #{inspect(tag_names)}"
end

new_tags
|> reindex_tags()

existent_tags ++ new_tags
all_tags
end

@doc """
Expand Down Expand Up @@ -545,9 +616,7 @@ defmodule Philomena.Tags do

tag_ids = Enum.map(taggings, & &1.tag_id)

Tag
|> where([t], t.id in ^tag_ids)
|> Repo.update_all(inc: [images_count: 1])
update_image_counts(Repo, 1, tag_ids)

tag_ids
end)
Expand All @@ -557,6 +626,25 @@ defmodule Philomena.Tags do
|> Repo.all()
end

@doc """
Accepts IDs of tags and increments their `images_count` by 1.
"""
@spec update_image_counts(term(), integer(), [integer()]) :: integer()
def update_image_counts(repo, diff, tag_ids)

def update_image_counts(_repo, _diff, []), do: 0

def update_image_counts(repo, diff, tag_ids) do
locked_tags = vectorized_mutation_lock("FOR NO KEY UPDATE", tag_ids)

{rows_affected, _} =
Tag
|> where([t], t.id in subquery(locked_tags))
|> repo.update_all(inc: [images_count: diff])

rows_affected
end

@doc """
Returns an `%Ecto.Changeset{}` for tracking tag changes.

Expand Down