First published on MSDN on Jan 02, 2009
This sample creates a data flow package with an OLEDB Source component feeding into a Lookup Transform. The Lookup transform is set to Full Cache mode, and uses [DimCustomer] as its reference table.
Items of interest:
-
CustomerKey and GeographyKey are used as the index (join) columns. This is configured by using the JoinToReferenceColumn property
-
The FirstName column is being overwritten by the value retrieved by the lookup transform
-
The LastName2 column is being added as a new output column
static void Main(string[] args)
{
Package package = new Package();
// Add Data Flow Task
Executable dataFlowTask = package.Executables.Add("STOCK:PipelineTask");
// Set the name (otherwise it will be a random GUID value)
TaskHost taskHost = dataFlowTask as TaskHost;
taskHost.Name = "Data Flow Task";
// We need a reference to the InnerObject to add items to the data flow
MainPipe pipeline = taskHost.InnerObject as MainPipe;
//
// Add connection manager
//
ConnectionManager connection = package.Connections.Add("OLEDB");
connection.Name = "localhost";
connection.ConnectionString = "Data Source=localhost;Initial Catalog=AdventureWorksDW2008;Provider=SQLNCLI10.1;Integrated Security=SSPI;Auto Translate=False;";
//
// Add OLEDB Source
//
IDTSComponentMetaData100 srcComponent = pipeline.ComponentMetaDataCollection.New();
srcComponent.ComponentClassID = "DTSAdapter.OleDbSource";
srcComponent.ValidateExternalMetadata = true;
IDTSDesigntimeComponent100 srcDesignTimeComponent = srcComponent.Instantiate();
srcDesignTimeComponent.ProvideComponentProperties();
srcComponent.Name = "OleDb Source";
// Configure it to read from the given table
srcDesignTimeComponent.SetComponentProperty("AccessMode", 0);
srcDesignTimeComponent.SetComponentProperty("OpenRowset", "[DimCustomer]");
// Set the connection manager
srcComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
srcComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;
// Retrieve the column metadata
srcDesignTimeComponent.AcquireConnections(null);
srcDesignTimeComponent.ReinitializeMetaData();
srcDesignTimeComponent.ReleaseConnections();
// Add transform
IDTSComponentMetaData100 lookupComponent = pipeline.ComponentMetaDataCollection.New();
lookupComponent.ComponentClassID = "DTSTransform.Lookup";
lookupComponent.Name = "Lookup";
CManagedComponentWrapper lookupWrapper = lookupComponent.Instantiate();
lookupWrapper.ProvideComponentProperties();
// Connect the source and the transform
IDTSPath100 path = pipeline.PathCollection.New();
path.AttachPathAndPropagateNotifications(srcComponent.OutputCollection[0], lookupComponent.InputCollection[0]);
//
// Configure the transform
//
// Set the connection manager
lookupComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
lookupComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;
// Cache Type - Full = 0, Partial = 1, None = 2
lookupWrapper.SetComponentProperty("CacheType", 0);
lookupWrapper.SetComponentProperty("SqlCommand", "select * from [DimCustomer]");
// initialize metadata
lookupWrapper.AcquireConnections(null);
lookupWrapper.ReinitializeMetaData();
lookupWrapper.ReleaseConnections();
// Mark the columns we are joining on
IDTSInput100 lookupInput = lookupComponent.InputCollection[0];
IDTSInputColumnCollection100 lookupInputColumns = lookupInput.InputColumnCollection;
IDTSVirtualInput100 lookupVirtualInput = lookupInput.GetVirtualInput();
IDTSVirtualInputColumnCollection100 lookupVirtualInputColumns = lookupVirtualInput.VirtualInputColumnCollection;
// We are joining on CustomerKey and GeographyKey
// Note: join columns should be marked as READONLY
var joinColumns = new string[] { "CustomerKey", "GeographyKey" };
foreach (string columnName in joinColumns)
{
IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];
IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);
lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);
}
// Overwrite the existing FirstName column value with the one returned by the Lookup.
// To do this, we need to flag the column as READWRITE, and set the CopyFromReferenceColumn property on the input
var overwriteColumns = new string[] { "FirstName" };
foreach (string columnName in overwriteColumns)
{
IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];
IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READWRITE);
lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "CopyFromReferenceColumn", columnName);
}
// First output is the Match output
IDTSOutput100 lookupMatchOutput = lookupComponent.OutputCollection[0];
// Add a new LastName2 column from the "LastName" column returned by the lookup
var newColumns = new Dictionary<string, string>();
newColumns.Add("LastName", "LastName2");
foreach (string sourceColumn in newColumns.Keys)
{
string newColumnName = newColumns[sourceColumn];
string description = string.Format("Copy of {0}", sourceColumn);
// insert the new column
IDTSOutputColumn100 outputColumn = lookupWrapper.InsertOutputColumnAt(lookupMatchOutput.ID, 0, newColumnName, description);
lookupWrapper.SetOutputColumnProperty(lookupMatchOutput.ID, outputColumn.ID, "CopyFromReferenceColumn", sourceColumn);
}
}