Add appropriate timeouts for Repo.transactions

Signed-off-by: Thomas Citharel <tcit@tcit.fr>
This commit is contained in:
Thomas Citharel 2022-05-04 08:21:15 +02:00
parent 90158f1112
commit 87214b038f
No known key found for this signature in database
GPG key ID: A061B9DDE0CA0773
4 changed files with 67 additions and 58 deletions

View file

@ -64,7 +64,7 @@ defmodule Mobilizon.Service.ActorSuspension do
Logger.debug("Going to run the transaction") Logger.debug("Going to run the transaction")
case Repo.transaction(multi) do case Repo.transaction(multi, timeout: 60_000) do
{:ok, %{actor: %Actor{} = actor}} -> {:ok, %{actor: %Actor{} = actor}} ->
{:ok, true} = Cachex.del(:activity_pub, "actor_#{actor.preferred_username}") {:ok, true} = Cachex.del(:activity_pub, "actor_#{actor.preferred_username}")
Cachable.clear_all_caches(actor) Cachable.clear_all_caches(actor)

View file

@ -29,35 +29,38 @@ defmodule Mobilizon.Service.SiteMap do
gzip: false gzip: false
] ]
Repo.transaction(fn -> Repo.transaction(
Events.stream_events_for_sitemap() fn ->
|> Stream.concat(Actors.list_groups_for_stream()) Events.stream_events_for_sitemap()
|> Stream.concat(Posts.list_posts_for_stream()) |> Stream.concat(Actors.list_groups_for_stream())
|> Stream.concat( |> Stream.concat(Posts.list_posts_for_stream())
Enum.map(static_routes, fn route -> |> Stream.concat(
{url, frequency} = Enum.map(static_routes, fn route ->
case route do {url, frequency} =
{url, frequency} -> {url, frequency} case route do
url when is_binary(url) -> {url, @default_static_frequency} {url, frequency} -> {url, frequency}
end url when is_binary(url) -> {url, @default_static_frequency}
end
%{url: url, updated_at: nil, frequence: frequency} %{url: url, updated_at: nil, frequence: frequency}
end)
)
|> Stream.map(fn %{url: url, updated_at: updated_at} = args ->
frequence = Map.get(args, :frequence, :weekly)
%Sitemapper.URL{
loc: url,
changefreq: frequence,
lastmod: check_date_time(updated_at)
}
end) end)
) |> Sitemapper.generate(config)
|> Stream.map(fn %{url: url, updated_at: updated_at} = args -> |> Sitemapper.persist(config)
frequence = Map.get(args, :frequence, :weekly) |> Sitemapper.ping(config)
|> Stream.run()
%Sitemapper.URL{ end,
loc: url, timeout: :infinity
changefreq: frequence, )
lastmod: check_date_time(updated_at)
}
end)
|> Sitemapper.generate(config)
|> Sitemapper.persist(config)
|> Sitemapper.ping(config)
|> Stream.run()
end)
end end
# Sometimes we use naive datetimes # Sometimes we use naive datetimes

View file

@ -20,27 +20,30 @@ defmodule Mobilizon.Service.Workers.SendActivityRecapWorker do
@impl Oban.Worker @impl Oban.Worker
def perform(%Job{}) do def perform(%Job{}) do
Repo.transaction(fn -> Repo.transaction(
Users.stream_users_for_recap() fn ->
|> Enum.to_list() Users.stream_users_for_recap()
|> Repo.preload([:settings, :activity_settings]) |> Enum.to_list()
|> Enum.filter(&filter_elegible_users/1) |> Repo.preload([:settings, :activity_settings])
|> Enum.map(fn %User{} = user -> |> Enum.filter(&filter_elegible_users/1)
%{ |> Enum.map(fn %User{} = user ->
activities: activities_for_user(user), %{
user: user activities: activities_for_user(user),
} user: user
end) }
|> Enum.filter(fn %{activities: activities, user: _user} -> length(activities) > 0 end) end)
|> Enum.map(fn %{ |> Enum.filter(fn %{activities: activities, user: _user} -> length(activities) > 0 end)
activities: activities, |> Enum.map(fn %{
user: activities: activities,
%User{settings: %Setting{group_notifications: group_notifications}} = user:
user %User{settings: %Setting{group_notifications: group_notifications}} =
} -> user
Email.send(user, activities, recap: group_notifications) } ->
end) Email.send(user, activities, recap: group_notifications)
end) end)
end,
timeout: :infinity
)
end end
defp activities_for_user( defp activities_for_user(

View file

@ -82,15 +82,18 @@ defmodule Mobilizon.Web.Email.Event do
|> MapSet.new() |> MapSet.new()
if MapSet.size(diff) > 0 do if MapSet.size(diff) > 0 do
Repo.transaction(fn -> Repo.transaction(
event_id fn ->
|> Events.list_local_emails_user_participants_for_event_query() event_id
|> Repo.stream() |> Events.list_local_emails_user_participants_for_event_query()
|> Enum.to_list() |> Repo.stream()
|> Enum.each( |> Enum.to_list()
&send_notification_for_event_update_to_participant(&1, old_event, event, diff) |> Enum.each(
) &send_notification_for_event_update_to_participant(&1, old_event, event, diff)
end) )
end,
timeout: 120_000
)
else else
{:ok, :ok} {:ok, :ok}
end end