Forum Discussion
MERGE on delta table with source structure change
It seems like you are looking for a way to merge on delta table with source structure change.
- You can use the *MERGE INTO* operation to upsert data from a source table, view, or DataFrame into a target delta table. This operation allows you to insert, update, and delete data based on a matching condition. https://learn.microsoft.com/en-us/azure/databricks/delta/merge
- You can also enable *schema evolution* for your delta table, which allows the table schema to change automatically when you write new data with different schema. This feature supports resolving struct fields by name and evolving schemas for arrays of structs. https://docs.delta.io/latest/delta-update.html
- To enable schema evolution, you need to set the configuration **spark.databricks.delta.schema.autoMerge.enabled** to true before writing data to your delta table. You can also use the *mergeSchema* option when writing data using the DataFrame API. https://learn.microsoft.com/en-us/azure/databricks/delta/update-schema
- pete441610Jul 12, 2023Copper Contributor
Hi WATTANACHAI,
Thanks for your help!
I'm indeed trying to use spark.databricks.delta.schema.autoMerge.enabled configuration,
I set the config using the following command
spark.conf.set("spark.databricks.delta.schema.autoMerge.enable","true")
and wrote my merge command as below:
Target_Table = DeltaTable.forPath(spark, Target_Table_path)
# Insert non existing records in the Target table, update existing records with end_date and ActiveRecord = 0
Target_Table.alias('dwh')\
.merge(
Source_Table_dataframe.alias('updates'),
'(dwh.Key == updates.Key)'
)\
.whenMatchedUpdate(set =
{
"end_date": "date_sub(current_date(), 1)",
"ActiveRecord": "0"
}
) \
.whenNotMatchedInsertAll()\
.execute()
but get an error message can not resolve column1 in INSERT clause given columns with the list of the source table in which column1 does not exist anymore.
Regards
Pete- H2OJul 12, 2023Iron Contributor
The error message you are getting is because you are trying to insert a column into the target table that does not exist in the source table. This is not allowed by Delta Lake, because it could corrupt the data in the target table.
To fix this, you need to make sure that the columns in the source table match the columns in the target table. If the source table has a column that does not exist in the target table, you need to either drop the column from the source table or add the column to the target table.
In your case, the source table has a column called column1 that does not exist in the target table. You can either drop the column from the source table or add the column to the target table.
If you drop the column from the source table, you will need to update your merge command to not include the column in the INSERT clause. For example:
Code snippet
Target_Table.alias('dwh')\
.merge(
Source_Table_dataframe.alias('updates'),
'(dwh.Key == updates.Key)'
)\
.whenMatchedUpdate(set =
{
"end_date": "date_sub(current_date(), 1)",
"ActiveRecord": "0"
}
) \
.whenNotMatchedInsertAll(
exclude_columns = ['column1']
)\
.execute()
If you add the column to the target table, you will need to specify the column in the INSERT clause. For example:
Code snippet
Target_Table.alias('dwh')\
.merge(
Source_Table_dataframe.alias('updates'),
'(dwh.Key == updates.Key)'
)\
.whenMatchedUpdate(set =
{
"end_date": "date_sub(current_date(), 1)",
"ActiveRecord": "0"
}
) \
.whenNotMatchedInsertAll(
columns = ['Key', 'column1']
)\
.execute()
Once you have made the necessary changes.
- pete441610Jul 12, 2023Copper ContributorHi Wattanachi,
Thanks for your response.
It's the opposite, I'm trying to insert new values from the data source, but column1 does not exist on the source, it still exists on the target table.
There are no new columns in the datasource.
If it is required to specify the implied columns, to me the delta lake documentation is not clear enough on this point. Anyway, I will test this solution.
In the Automatic schema evolution for Delta Lake Merge it is written:
Schema evolution allows users to resolve schema mismatches between the target and source table in merge. It handles the following two cases:
A column in the source table is not present in the target table. The new column is added to the target schema, and its values are inserted or updated using the source values.
A column in the target table is not present in the source table. The target schema is left unchanged; the values in the additional target column are either left unchanged (for UPDATE) or set to NULL (for INSERT).
Thanks for your help once again!