Add conditions to upsert

Eduardo B. Alexandre
2023-07-31

Eduardo B. Alexandre:

Is there any way to add conditionals when doing an upsert in a create action?

For example, I have this resource:

defmodule Pacman.Markets.Blibs do
  use Ash.Resource, data_layer: AshPostgres.DataLayer

  code_interface do
    define_for Pacman.Markets

    define :create
  end

  attributes do
    attribute :id, :uuid do
      default &Ash.UUID.generate/0
      primary_key? true
      allow_nil? false
    end

    attribute :name, :string do
      allow_nil? false

      constraints max_length: 255
    end

    attribute :transaction_time, :integer, allow_nil?: false

    attribute :deleted?, :boolean, allow_nil?: false, default: false

    timestamps(private?: false)
  end

  postgres do
    table "blibs"

    repo Pacman.Repo

    migration_types transaction_time: :bigint, name: {:varchar, 255}
  end

  actions do
    defaults [:read, :destroy]
    
    create :create do
      primary? true

      upsert? true
    end
  end
end

Eduardo B. Alexandre:

If I create a new instance with the same id, it will do an upsert. with a query more or less like this:

INSERT INTO blibs (id, name, inserted_at, updated_at, transaction_time, "deleted?")
VALUES ('c48a8f31-9c55-4861-8465-f489db50e08f', 'a', '2023-07-31 20:07:27.660396Z', '2023-07-31 20:07:27.660396Z', 2, false)
ON CONFLICT (id) DO UPDATE SET id = 'c48a8f31-9c55-4861-8465-f489db50e08f', name = 'a', transaction_time = 2
RETURNING updated_at, inserted_at, "deleted?", transaction_time, name, id;

The thing is, I want to add a condition to the upsert part, basically I want to only apply the upsert if the transaction_time that I’m updating is bigger than the one in the DB.

Basically I want to change the query to this one:

INSERT INTO blibs (id, name, inserted_at, updated_at, transaction_time, "deleted?")
VALUES ('c48a8f31-9c55-4861-8465-f489db50e08f', 'a', '2023-07-31 20:07:27.660396Z', '2023-07-31 20:07:27.660396Z', 2, false)
ON CONFLICT (id) DO UPDATE SET id = 'c48a8f31-9c55-4861-8465-f489db50e08f', name = 'a', transaction_time = 2
WHERE blibs.transaction_time < 2
RETURNING updated_at, inserted_at, "deleted?", transaction_time, name, id;

Looking at the Ash documentation, I couldn’t find any option to add conditionals to my create action, is there a way for me to achieve this?

Eduardo B. Alexandre:

Just for completeness, in this forum post it shows how it is done with Ecto https://elixirforum.com/t/upsert-conditional-update-e-g-update-only-when-existing-data-is-outdated/55503/2

zachdaniel:

We don’t have that option currently

zachdaniel:

you’ll need to build a manual action

zachdaniel:

can you open a request to ash?

Eduardo B. Alexandre:

Yep, I will create it tomorrow when I’m back to work

Eduardo B. Alexandre:

Do you mind giving me just some tips on how to proceed with the manual action?

I never understood how exactly I’m supposed to actually insert the changeset in the DB with a manual create action. My guess is that there is some way to manually apply all checks from the resource attributes to the changeset, if there is no error, retrieve the end attribute map from it, insert it using Ecto Repo.insert directly, and then somehow converting the result (especially in case of an error) back to an error that is equivalent to one that Ash.Changeset would give.

I’m just not sure what is the best approach to do this and I can’t recall an example of something like that in the documentation.

zachdaniel:

create :your_upsert do
  upsert? true
  manual fn changeset, _ -> 
    case Ash.Changeset.apply_attributes(changeset) do
      {:ok, record} -> upsert_struct_with_ecto(record, ...)
      {:error, error} -> {:error, error}
    end
  end
end

Eduardo B. Alexandre:

<@197905764424089601> done: https://github.com/ash-project/ash/issues/667

Eduardo B. Alexandre:

This is my current workaround:

create :create do
  primary? true

  manual fn changeset, _ ->
    import Ecto.Query
    
    case Ash.Changeset.apply_attributes(changeset) do
      {:ok, record} ->
        all_fields = record |> Map.take(__MODULE__.__schema__(:fields)) |> Enum.to_list()
        
        on_conflict =
          from r in __MODULE__,
            where: r.transaction_time < ^record.transaction_time,
            update: [set: ^all_fields]

        Pacman.Repo.insert(record,
          conflict_target: [:id],
          on_conflict: on_conflict,
          stale_error_field: :transaction_time,
          returning: true
        )
      
      {:error, error} ->
        {:error, error}
    end
  end
end

It is super ugly, but seems to work fine.

The only thing that I didn’t figure out yet is how to transform the ecto changeset back to a ash changeset when there is errors.

For example, if the upsert condition fails, it will return this changeset:

#Ecto.Changeset<action: :insert, changes: %{id: "9dd0fe5c-3fe4-4036-bb50-b189169b4028", name: "b", transaction_time: 6, deleted?: false, inserted_at: ~U[2023-08-01 14:23:46.238721Z], updated_at: ~U[2023-08-01 14:23:46.238721Z]}, errors: [transaction_time: {"is stale", [stale: true]}], data: #Pacman.Markets.Blibs<>, valid?: false>

Do we have some helper function to convert an Ecto.Changeset error into an Ash.Changeset error? In other words, I want to get the errors from Ecto.Changeset ( [transaction_time: {"is stale", [stale: true]}] ) and convert into a meaningful error inside Ash.Changeset.

Eduardo B. Alexandre:

I tried just passing the Ecto.Changeset errors field to Ash.Changeset using the add_error function, that kinda works but it gives me a more “generic” error:

     %Ash.Error.Changes.InvalidChanges{
       fields: [],
       message: nil,
       validation: nil,
       changeset: nil,
       query: nil,
       error_context: [],
       vars: [transaction_time: {"is stale", [stale: true]}],
       path: [],
       stacktrace: #Stacktrace<>,
       class: :invalid
     }

I was wondering if there is some “parser” that would fill all the error fields correctly

zachdaniel:

There isn’t one exposed, but what AshPostgres does looks something like this:

  defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do
    {:error, Enum.map(errors, &to_ash_error/1)}
  end

  defp handle_errors({:ok, val}), do: {:ok, val}

  defp to_ash_error({field, {message, vars}}) do
    Ash.Error.Changes.InvalidAttribute.exception(
      field: field,
      message: message,
      private_vars: vars
    )
  end

Eduardo B. Alexandre:

Ah, thanks! Based on that, my final workaround is the following:

create :create do
  primary? true

  manual fn changeset, _ ->
    import Ecto.Query

    case Ash.Changeset.apply_attributes(changeset) do
      {:ok, record} ->
        all_fields = record |> Map.take(__MODULE__.__schema__(:fields)) |> Enum.to_list()
    
        on_conflict =
          from r in __MODULE__,
            where: r.transaction_time < ^record.transaction_time and not r.deleted?,
            update: [set: ^all_fields]

        result = Pacman.Repo.insert(record,
          conflict_target: [:id],
          on_conflict: on_conflict,
          stale_error_field: :upsert,
          stale_error_message: "upsert condition not met",
          returning: true
        )

        with {:error, %{errors: [upsert: {error, stale: true}]}} <- result do
          error = Ash.Error.Unknown.UnknownError.exception(field: :upsert, error: error, changeset: changeset)
          
          {:error, Ash.Changeset.add_error(changeset, error)}
        end
  
      {:error, error} ->
        {:error, error}
    end
  end
end