Add conditions to upsert
Eduardo B. Alexandre:
Is there any way to add conditionals when doing an upsert in a create action?
For example, I have this resource:
defmodule Pacman.Markets.Blibs do
use Ash.Resource, data_layer: AshPostgres.DataLayer
code_interface do
define_for Pacman.Markets
define :create
end
attributes do
attribute :id, :uuid do
default &Ash.UUID.generate/0
primary_key? true
allow_nil? false
end
attribute :name, :string do
allow_nil? false
constraints max_length: 255
end
attribute :transaction_time, :integer, allow_nil?: false
attribute :deleted?, :boolean, allow_nil?: false, default: false
timestamps(private?: false)
end
postgres do
table "blibs"
repo Pacman.Repo
migration_types transaction_time: :bigint, name: {:varchar, 255}
end
actions do
defaults [:read, :destroy]
create :create do
primary? true
upsert? true
end
end
end
Eduardo B. Alexandre:
If I create a new instance with the same id, it will do an upsert. with a query more or less like this:
INSERT INTO blibs (id, name, inserted_at, updated_at, transaction_time, "deleted?")
VALUES ('c48a8f31-9c55-4861-8465-f489db50e08f', 'a', '2023-07-31 20:07:27.660396Z', '2023-07-31 20:07:27.660396Z', 2, false)
ON CONFLICT (id) DO UPDATE SET id = 'c48a8f31-9c55-4861-8465-f489db50e08f', name = 'a', transaction_time = 2
RETURNING updated_at, inserted_at, "deleted?", transaction_time, name, id;
The thing is, I want to add a condition to the upsert part, basically I want to only apply the upsert if the
transaction_time
that I’m updating is bigger than the one in the DB.
Basically I want to change the query to this one:
INSERT INTO blibs (id, name, inserted_at, updated_at, transaction_time, "deleted?")
VALUES ('c48a8f31-9c55-4861-8465-f489db50e08f', 'a', '2023-07-31 20:07:27.660396Z', '2023-07-31 20:07:27.660396Z', 2, false)
ON CONFLICT (id) DO UPDATE SET id = 'c48a8f31-9c55-4861-8465-f489db50e08f', name = 'a', transaction_time = 2
WHERE blibs.transaction_time < 2
RETURNING updated_at, inserted_at, "deleted?", transaction_time, name, id;
Looking at the Ash documentation, I couldn’t find any option to add conditionals to my create action, is there a way for me to achieve this?
Eduardo B. Alexandre:
Just for completeness, in this forum post it shows how it is done with Ecto https://elixirforum.com/t/upsert-conditional-update-e-g-update-only-when-existing-data-is-outdated/55503/2
zachdaniel:
We don’t have that option currently
zachdaniel:
you’ll need to build a manual action
zachdaniel:
can you open a request to ash?
Eduardo B. Alexandre:
Yep, I will create it tomorrow when I’m back to work
Eduardo B. Alexandre:
Do you mind giving me just some tips on how to proceed with the manual action?
I never understood how exactly I’m supposed to actually insert the changeset in the DB with a manual create action. My guess is that there is some way to manually apply all checks from the resource attributes to the changeset, if there is no error, retrieve the end attribute map from it, insert it using Ecto Repo.insert directly, and then somehow converting the result (especially in case of an error) back to an error that is equivalent to one that Ash.Changeset would give.
I’m just not sure what is the best approach to do this and I can’t recall an example of something like that in the documentation.
zachdaniel:
create :your_upsert do
upsert? true
manual fn changeset, _ ->
case Ash.Changeset.apply_attributes(changeset) do
{:ok, record} -> upsert_struct_with_ecto(record, ...)
{:error, error} -> {:error, error}
end
end
end
Eduardo B. Alexandre:
<@197905764424089601> done: https://github.com/ash-project/ash/issues/667
Eduardo B. Alexandre:
This is my current workaround:
create :create do
primary? true
manual fn changeset, _ ->
import Ecto.Query
case Ash.Changeset.apply_attributes(changeset) do
{:ok, record} ->
all_fields = record |> Map.take(__MODULE__.__schema__(:fields)) |> Enum.to_list()
on_conflict =
from r in __MODULE__,
where: r.transaction_time < ^record.transaction_time,
update: [set: ^all_fields]
Pacman.Repo.insert(record,
conflict_target: [:id],
on_conflict: on_conflict,
stale_error_field: :transaction_time,
returning: true
)
{:error, error} ->
{:error, error}
end
end
end
It is super ugly, but seems to work fine.
The only thing that I didn’t figure out yet is how to transform the ecto changeset back to a ash changeset when there is errors.
For example, if the upsert condition fails, it will return this changeset:
#Ecto.Changeset<action: :insert, changes: %{id: "9dd0fe5c-3fe4-4036-bb50-b189169b4028", name: "b", transaction_time: 6, deleted?: false, inserted_at: ~U[2023-08-01 14:23:46.238721Z], updated_at: ~U[2023-08-01 14:23:46.238721Z]}, errors: [transaction_time: {"is stale", [stale: true]}], data: #Pacman.Markets.Blibs<>, valid?: false>
Do we have some helper function to convert an Ecto.Changeset error into an Ash.Changeset error? In other words, I want to get the errors from Ecto.Changeset (
[transaction_time: {"is stale", [stale: true]}]
) and convert into a meaningful error inside Ash.Changeset.
Eduardo B. Alexandre:
I tried just passing the Ecto.Changeset errors field to Ash.Changeset using the
add_error
function, that kinda works but it gives me a more “generic” error:
%Ash.Error.Changes.InvalidChanges{
fields: [],
message: nil,
validation: nil,
changeset: nil,
query: nil,
error_context: [],
vars: [transaction_time: {"is stale", [stale: true]}],
path: [],
stacktrace: #Stacktrace<>,
class: :invalid
}
I was wondering if there is some “parser” that would fill all the error fields correctly
zachdaniel:
There isn’t one exposed, but what
AshPostgres
does looks something like this:
defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do
{:error, Enum.map(errors, &to_ash_error/1)}
end
defp handle_errors({:ok, val}), do: {:ok, val}
defp to_ash_error({field, {message, vars}}) do
Ash.Error.Changes.InvalidAttribute.exception(
field: field,
message: message,
private_vars: vars
)
end
Eduardo B. Alexandre:
Ah, thanks! Based on that, my final workaround is the following:
create :create do
primary? true
manual fn changeset, _ ->
import Ecto.Query
case Ash.Changeset.apply_attributes(changeset) do
{:ok, record} ->
all_fields = record |> Map.take(__MODULE__.__schema__(:fields)) |> Enum.to_list()
on_conflict =
from r in __MODULE__,
where: r.transaction_time < ^record.transaction_time and not r.deleted?,
update: [set: ^all_fields]
result = Pacman.Repo.insert(record,
conflict_target: [:id],
on_conflict: on_conflict,
stale_error_field: :upsert,
stale_error_message: "upsert condition not met",
returning: true
)
with {:error, %{errors: [upsert: {error, stale: true}]}} <- result do
error = Ash.Error.Unknown.UnknownError.exception(field: :upsert, error: error, changeset: changeset)
{:error, Ash.Changeset.add_error(changeset, error)}
end
{:error, error} ->
{:error, error}
end
end
end