Forum Discussion

pete441610's avatar
pete441610
Copper Contributor
Jul 12, 2023

MERGE on delta table with source structure change

Hi everybody, 

I'm working on a Lakehouse on Synapse and want to merge two delta tables in a pyspark notebook.


We are working on Apache Spark Version 3.3

The structure of the source table may change, some columns may be deleted for instance.

I try to set the configuration"spark.databricks.delta.schema.autoMerge.enabled" to true


But keep getting error message such as "cannot resolve column1 in INSERT clause given columns source.column2, source.column3 when I try to load new source data with only column2 and column3

Thanks for your help.

Pete

 

 

5 Replies

  • H2O's avatar
    H2O
    Iron Contributor

    pete441610  

    It seems like you are looking for a way to merge on delta table with source structure change.

    - You can use the *MERGE INTO* operation to upsert data from a source table, view, or DataFrame into a target delta table. This operation allows you to insert, update, and delete data based on a matching condition. https://learn.microsoft.com/en-us/azure/databricks/delta/merge
    - You can also enable *schema evolution* for your delta table, which allows the table schema to change automatically when you write new data with different schema. This feature supports resolving struct fields by name and evolving schemas for arrays of structs. https://docs.delta.io/latest/delta-update.html
    - To enable schema evolution, you need to set the configuration **spark.databricks.delta.schema.autoMerge.enabled** to true before writing data to your delta table. You can also use the *mergeSchema* option when writing data using the DataFrame API. https://learn.microsoft.com/en-us/azure/databricks/delta/update-schema

    • pete441610's avatar
      pete441610
      Copper Contributor

      Hi WATTANACHAI,

      Thanks for your help!

      I'm indeed trying to use spark.databricks.delta.schema.autoMerge.enabled configuration,
      I set the config using the following command
      spark.conf.set("spark.databricks.delta.schema.autoMerge.enable","true")

      and wrote my merge command as below:
      Target_Table = DeltaTable.forPath(spark, Target_Table_path)
      # Insert non existing records in the Target table, update existing records with end_date and ActiveRecord = 0
      Target_Table.alias('dwh')\
      .merge(
      Source_Table_dataframe.alias('updates'),
      '(dwh.Key == updates.Key)'
      )\
      .whenMatchedUpdate(set =
      {
      "end_date": "date_sub(current_date(), 1)",
      "ActiveRecord": "0"
      }
      ) \
      .whenNotMatchedInsertAll()\
      .execute()

      but get an error message can not resolve column1 in INSERT clause given columns with the list of the source table in which column1 does not exist anymore.


      Regards

      Pete

      • H2O's avatar
        H2O
        Iron Contributor

        pete441610  

        The error message you are getting is because you are trying to insert a column into the target table that does not exist in the source table. This is not allowed by Delta Lake, because it could corrupt the data in the target table.

        To fix this, you need to make sure that the columns in the source table match the columns in the target table. If the source table has a column that does not exist in the target table, you need to either drop the column from the source table or add the column to the target table.

        In your case, the source table has a column called column1 that does not exist in the target table. You can either drop the column from the source table or add the column to the target table.

        If you drop the column from the source table, you will need to update your merge command to not include the column in the INSERT clause. For example:

        Code snippet

        Target_Table.alias('dwh')\

        .merge(

        Source_Table_dataframe.alias('updates'),

        '(dwh.Key == updates.Key)'

        )\

        .whenMatchedUpdate(set =

        {

        "end_date": "date_sub(current_date(), 1)",

        "ActiveRecord": "0"

        }

        ) \

        .whenNotMatchedInsertAll(

        exclude_columns = ['column1']

        )\

        .execute()

         

        If you add the column to the target table, you will need to specify the column in the INSERT clause. For example:

        Code snippet

        Target_Table.alias('dwh')\

        .merge(

        Source_Table_dataframe.alias('updates'),

        '(dwh.Key == updates.Key)'

        )\

        .whenMatchedUpdate(set =

        {

        "end_date": "date_sub(current_date(), 1)",

        "ActiveRecord": "0"

        }

        ) \

        .whenNotMatchedInsertAll(

        columns = ['Key', 'column1']

        )\

        .execute()

         

        Once you have made the necessary changes.

Resources