MERGE on delta table with source structure change

Copper Contributor

Hi everybody, 

I'm working on a Lakehouse on Synapse and want to merge two delta tables in a pyspark notebook.


We are working on Apache Spark Version 3.3

The structure of the source table may change, some columns may be deleted for instance.

I try to set the configuration"spark.databricks.delta.schema.autoMerge.enabled" to true


But keep getting error message such as "cannot resolve column1 in INSERT clause given columns source.column2, source.column3 when I try to load new source data with only column2 and column3

Thanks for your help.

Pete

 

 

5 Replies

@pete441610  

It seems like you are looking for a way to merge on delta table with source structure change.

- You can use the *MERGE INTO* operation to upsert data from a source table, view, or DataFrame into a target delta table. This operation allows you to insert, update, and delete data based on a matching condition. For more details, please see this article.
- You can also enable *schema evolution* for your delta table, which allows the table schema to change automatically when you write new data with different schema. This feature supports resolving struct fields by name and evolving schemas for arrays of structs. For more details, please see this article.
- To enable schema evolution, you need to set the configuration **spark.databricks.delta.schema.autoMerge.enabled** to true before writing data to your delta table. You can also use the *mergeSchema* option when writing data using the DataFrame API. For more details, please see this article.

Hi WATTANACHAI,

Thanks for your help!

I'm indeed trying to use spark.databricks.delta.schema.autoMerge.enabled configuration,
I set the config using the following command
spark.conf.set("spark.databricks.delta.schema.autoMerge.enable","true")

and wrote my merge command as below:
Target_Table = DeltaTable.forPath(spark, Target_Table_path)
# Insert non existing records in the Target table, update existing records with end_date and ActiveRecord = 0
Target_Table.alias('dwh')\
.merge(
Source_Table_dataframe.alias('updates'),
'(dwh.Key == updates.Key)'
)\
.whenMatchedUpdate(set =
{
"end_date": "date_sub(current_date(), 1)",
"ActiveRecord": "0"
}
) \
.whenNotMatchedInsertAll()\
.execute()

but get an error message can not resolve column1 in INSERT clause given columns with the list of the source table in which column1 does not exist anymore.


Regards

Pete

@pete441610  

The error message you are getting is because you are trying to insert a column into the target table that does not exist in the source table. This is not allowed by Delta Lake, because it could corrupt the data in the target table.

To fix this, you need to make sure that the columns in the source table match the columns in the target table. If the source table has a column that does not exist in the target table, you need to either drop the column from the source table or add the column to the target table.

In your case, the source table has a column called column1 that does not exist in the target table. You can either drop the column from the source table or add the column to the target table.

If you drop the column from the source table, you will need to update your merge command to not include the column in the INSERT clause. For example:

Code snippet

Target_Table.alias('dwh')\

.merge(

Source_Table_dataframe.alias('updates'),

'(dwh.Key == updates.Key)'

)\

.whenMatchedUpdate(set =

{

"end_date": "date_sub(current_date(), 1)",

"ActiveRecord": "0"

}

) \

.whenNotMatchedInsertAll(

exclude_columns = ['column1']

)\

.execute()

 

If you add the column to the target table, you will need to specify the column in the INSERT clause. For example:

Code snippet

Target_Table.alias('dwh')\

.merge(

Source_Table_dataframe.alias('updates'),

'(dwh.Key == updates.Key)'

)\

.whenMatchedUpdate(set =

{

"end_date": "date_sub(current_date(), 1)",

"ActiveRecord": "0"

}

) \

.whenNotMatchedInsertAll(

columns = ['Key', 'column1']

)\

.execute()

 

Once you have made the necessary changes.

Hi Wattanachi,

Thanks for your response.
It's the opposite, I'm trying to insert new values from the data source, but column1 does not exist on the source, it still exists on the target table.
There are no new columns in the datasource.

If it is required to specify the implied columns, to me the delta lake documentation is not clear enough on this point. Anyway, I will test this solution.

In the Automatic schema evolution for Delta Lake Merge it is written:

Schema evolution allows users to resolve schema mismatches between the target and source table in merge. It handles the following two cases:

A column in the source table is not present in the target table. The new column is added to the target schema, and its values are inserted or updated using the source values.

A column in the target table is not present in the source table. The target schema is left unchanged; the values in the additional target column are either left unchanged (for UPDATE) or set to NULL (for INSERT).

Thanks for your help once again!

@pete441610 

We previously did this using whenNotMatchedInsertAll with automerge enabled and it worked well. HOWEVER, we've noticed in our recently created pipelines that it does NOT work. We saw the same errors that you're seeing.

To get around the issue, we still have auto merge enabled, but we do the following:

1. Check the update DF columns vs. target DF columns.
2. If update isn't a subset of target, then we append the update DF.limit(0) to the target table
3. Give it a few seconds to finish the schema evolution (if necessary), then
4. Proceed with our merge as usual.

Hopefully this helps you, but YMMV.  Good luck!

 

delta_table = DeltaTable.forName(spark, table_full_name)

if not set(df.columns).issubset(set(delta_table.toDF().columns)):
    df.limit(0).write.format("delta").mode("append") \
    .option("mergeSchema", True).saveAsTable(table_full_name)
    time.sleep(5)  # possibly not required
    delta_table = DeltaTable.forName(spark, table_full_name)
delta_table.alias("target") \
.merge(
    source = df.alias("updates"),
    condition = match_condition
).whenNotMatchedInsertAll().execute()