BIML. Включаем автоматизацию.

By | February 25, 2015

Как создать один пакет для инкрементальной загрузки данных мы разобрали. Но пусть теперь у нас будет четыре таблицы, для которых надо создать аналогичные пакеты. Давайте создадим эти таблицы и заполним данными:

USE SSISIncrementalLoad_Source
GO

— создаем таблицу Source1

If Not Exists(Select name From sys.tables Where name = ‘Source1’)

CREATE TABLE dbo.Source1
(ColID int NOT NULL
,ColA varchar(10NULL
,ColB datetime NULL
,ColC int NULL
,constraint PK_Source1 primary key clustered (ColID))
GO

— заполняем Source1

INSERT INTO dbo.Source1
(ColID,ColA,ColB,ColC)
VALUES (0‘A’, ‘1/1/2007 12:01 AM’1),
(1‘B’, ‘1/1/2007 12:02 AM’2),
(2‘C’, ‘1/1/2007 12:03 AM’3),
(3‘D’, ‘1/1/2007 12:04 AM’4),
(4‘E’, ‘1/1/2007 12:05 AM’5),
(5‘F’, ‘1/1/2007 12:06 AM’6)

— создаем таблицу Source2

If Not Exists(Select name From sys.tables Where name ‘Source2’)
CREATE TABLE dbo.Source2
(ColID int NOT NULL
,Name varchar(25NULL
,Value int NULL
,constraint PK_Source2 primary key clustered (ColID)) 

GO

— заполняем Source2

INSERT INTO dbo.Source2
(ColID,Name,Value)
VALUES
(0‘Willie’, 11),
(1‘Waylon’, 22),
(2‘Stevie Ray’, 33),
(3‘Johnny’, 44),
(4‘Kris’, 55)

— создаем таблицу Source3
If Not Exists(Select name From sys.tables Where name ‘Source3’)
CREATE TABLE dbo.Source3
(ColID int NOT NULL
,Value int NULL
,Name varchar(100NULL
,constraint PK_Source3 primary key clustered (ColID))
GO

— загружаем Source3

INSERT INTO dbo.Source3
(ColID,Value,Name)
VALUES
(0, 101‘Good-Hearted Woman’),
(1, 202‘Lonesome, Onry, and Mean’),
(2, 303‘The Sky Is Crying’),
(3, 404‘Ghost Riders in the Sky’),
(4, 505‘Sunday Morning, Coming Down’)

А также создадим промежуточную базу данных и наполним её данными.

USE master
GO

If Not Exists(Select name From sys.databases Where name ‘SSISIncrementalLoad_Stage’)
Create Database SSISIncrementalLoad_Stage
GO

USE SSISIncrementalLoad_Stage
GO

CREATE TABLE dbo.tblSource(
ColID int NOT NULL,
ColA varchar(10NULL,
ColB datetime NULL,
ColC int NULL )

CREATE TABLE dbo.stgUpdates_tblSource(
ColID int NOT NULL,
ColA varchar(10NULL,
ColB datetime NULL,
ColC int NULL )
GO

INSERT INTO dbo.tblSource
(ColID,ColA,ColB,ColC)
VALUES
(0‘A’, ‘1/1/2007 12:01 AM’1),
(1‘B’, ‘1/1/2007 12:02 AM’2),
(2‘N’, ‘1/1/2007 12:03 AM’3)
GO

CREATE TABLE dbo.Source1(
ColID int NOT NULL,
ColA varchar(10NULL,
ColB datetime NULL,
ColC int NULL )

CREATE TABLE dbo.stgUpdates_Source1(
ColID int NOT NULL,
ColA varchar(10NULL,
ColB datetime NULL,
ColC int NULL )
GO

INSERT INTO dbo.Source1
(ColID,ColA,ColB,ColC)
VALUES
(0‘A’, ‘1/1/2007 12:01 AM’1),
(1‘Z’, ‘1/1/2007 12:02 AM’2)
GO

CREATE TABLE dbo.Source2(
ColID int NOT NULL,
Name varchar(25NULL,
Value int NULL )

CREATE TABLE dbo.stgUpdates_Source2(
ColID int NOT NULL,
Name varchar(25NULL,
Value int NULL )

GO

INSERT INTO dbo.Source2
(ColID,Name,Value)
VALUES
(0‘Willie’, 11),
(1‘Waylon’, 22),
(2‘Stevie’, 33)
GO

CREATE TABLE dbo.Source3(
ColID int NOT NULL,
Value int NULL,
Name varchar(100NULL )

CREATE TABLE dbo.stgUpdates_Source3(
ColID int NOT NULL,
Value int NULL,
Name varchar(100NULL ) 

GO

INSERT INTO dbo.Source3
(ColID,Value,Name)
VALUES
(0, 101‘GOod-Hearted Woman’),
(1, 202‘Are You Sure Hank Done It This Way?’)
GO

Теперь добавим новый Biml файл в наш проект (при желании можете создать новый проект). Назовем этот файл GenerateStagingPackages.biml. Перед тэгом <Biml> добавьте следующие строки:

<#import=“” namespace=System.Data” #>
<
#import=“” namespace=Varigence.Hadron.CoreLowerer.SchemaManagement” #>
<# var connection = SchemaManager.CreateConnectionNode(“SchemaProvider“, 
Data Source=SQL\\SQL14;Initial Catalog=SSISIncrementalLoad_Source;Provider=SQLNCLI11.1;Integrated Security=True;“); #>
<var tables = connection.GenerateTableNodes(); #>

Обратите внимание на строку соединения, вам нужно её исправить в соответствии с вашим сервером. Вероятно вы заметили, что редактор выделил многое как ошибки. Не обращайте на них внимание. Чтобы проверить скрипт на ошибки, кликните правой клавишей мыши на имени соответствующего файла и выберите из контекстного меню Check Biml for Errors. И вы либо получите сообщение что все хорошо, либо диалоговое окно с описанием ошибки.

Давайте разберем подробней что мы добавили. В этих строках импортируются пространства имен System.Data и Varigence.Hadron.CoreLowerer.SchemaManagement. В переменной connection создается объект ,связанный с базой дпнных, указанной в строке соединения. Этот объект далее используется в следующей переменной, загружающей в себя список таблиц из базы данных SSISIncrementalLoad_Source.

Теперь после тэка <Biml> добавьте соответствующие узлы для двух Connection, подключающихся к базам данных SSISIncrementalLoad_Source и SSISIncrementalLoad_Stage.

<Connections>
<Connection Name=SSISIncrementalLoad_Source” ConnectionString=Data Source=SQL\SQL14;Initial Catalog=SSISIncrementalLoad_Source;Provider=SQLNCLI11.1;Integrated Security=SSPI; />
<Connection Name=SSISIncrementalLoad_Stage” ConnectionString=Data Source=SQL\SQL14;Initial Catalog=SSISIncrementalLoad_Stage;Provider=SQLNCLI11.1;OLE DB Services=1;Integrated Security=SSPI; />
</Connections>

А вот теперь начинается самое интересное. Мы будем использовать в скрипте элементы языка C#.

<Packages>
<foreach (var table in tables#>

Этот цикл перебирает все таблицы из переменной tables, определенной ранее.

Далее будет строка, формирующая имя пакета. Но так как она будет выполняться внутри цикла, то пакетов будет сформированно столько же сколько и таблиц и в именах пакетов будут присутствовать имена таблиц.

<Package Name=IncrementalLoad_<#=table.Name#>” ConstraintMode=”Linear” ProtectionLevel=”EncryptSensitiveWithUserKey”>

Обратите внимание на значение ConstraintMode. Есть еще одно значение – Parallel. При Parallel нужно вручную указывать последовательность выполнения задач и условия перехода. В нашем же случае переходы будут созданы автоматически и все будут иметь условие OnSuccess. Кроме того задачи будут выполняться в той же последовательности в какой они описаны в скрипте.

Далее ставим тэг для задач и опишем первую задачу, выполняющую команду T-SQL.

<Tasks>

<ExecuteSQL Name=Truncate stgUpdates_<#=table.Name#>” ConnectionName=”SSISIncrementalLoad_Stage”>

<DirectInput> Truncate Table stgUpdates_<#=table.Name#</DirectInput>

</ExecuteSQL>

 

Во время выполнения цикла каждый раз имя этой задачи будет сформировано в зависимости от имени таблицы с помощью конструкции <#=table.Name#>.

Далее рассмотрим задачу Data Flow:

<Dataflow Name=Load <#=table.Name#>“>

<Transformations>

<OleDbSource Name=” <#=table.Name#> Source” ConnectionName=”SSISIncrementalLoad_Source”>

<DirectInput>

SELECT <#=table.GetColumnList()#> FROM <#=table.SchemaQualifiedName #>

</DirectInput>

</OleDbSource>

<Lookup Name=Correlate” OleDbConnectionName=SSISIncrementalLoad_Stage” NoMatchBehavior=RedirectRowsToNoMatchOutput>
<DirectInput>

SELECT <#=table.GetColumnList()#> FROM dbo.<#=table.Name#>

</DirectInput>

<Inputs>

<foreach (var keyColumn in table.Keys[0].Columns) { #>
<Column SourceColumn=<#=keyColumn.Column#>” TargetColumn=”<#=keyColumn.Column#>” />
<} #>
</Inputs>

<Outputs>

<# foreach (var col in table.Columns) { #>

<Column SourceColumn=<#=col#>” TargetColumn=”Dest_<#=col#>” />
<# } #>
</Outputs>

</Lookup>

<ConditionalSplit Name=Filter>

<OutputPaths>

<OutputPath Name=Changed Rows>

<# string exp =“;
foreach (var colex in table.Columns) { exp += (+ colex + != Dest_ + colex + ) || “; } #>
<Expression>

<#=exp.Substring(0, exp.Length 4)#>

</Expression>

</OutputPath>

</OutputPaths>

</ConditionalSplit>

<OleDbDestination Name=stgUpdates_
<#=table.Name#>ConnectionName=”SSISIncrementalLoad_Stage”>
<InputPath OutputPathName=Filter.Changed Rows />
<ExternalTableOutput Table=dbo.stgUpdates_<#=table.Name#>” />
</OleDbDestination>
<OleDbDestination Name=” <#=table.Name#> Destination” ConnectionName=”SSISIncrementalLoad_Stage”>
<InputPath OutputPathName=Correlate.NoMatch />
<ExternalTableOutput Table=dbo.<#=table.Name#>” />
</OleDbDestination>
</Transformations>

</Dataflow>
 

В этой части скрипта используются новые для вас элементы <#=table.GetColumnList()#>,
<#=table.SchemaQualifiedName#>, <foreach (var keyColumn in table.Keys[0].Columns{ #>, <#=exp.Substring(0, exp.Length 4)#>. Их использование, впрочем как и остального в Biml, интуитивно понятно.

А вот дальше будет интересно. Рассмотрим оставшуюся задачу для загрузки обновившихся данных в таблицы в базу данных получателя из промежуточной базы.

<ExecuteSQL Name=Apply stgUpdates_<#=table.Name#>” ConnectionName=”SSISIncrementalLoad_Stage”>
<string upd =Update Dest Set “; foreach (var colex in table.Columns.Where(column =>!table.Keys[0].Columns.Select(keyColumn => keyColumn.Column).Contains(column))) {upd = upd + “Dest.” + colex + ” = Upd.” + colex + “,”;} var updc = upd.Substring(0,upd.Length-1) + ” From ” + table.SchemaQualifiedName + ” Dest Join [” + table.Schema.Name + “].[stgUpdates_” + table.Name + “] Upd On Upd.” + table.Keys[0].Columns[0].Column + ” = Dest.” + table.Keys[0].Columns[0].Column;#>
<DirectInput>
<#=updc#>

</DirectInput>

</ExecuteSQL>
 

В этом коде активно используется C#, в частности работа с данными с помощью Linq. Описание работы с Linq стоит отдельных статей, думаю я напишу их позже.

Остается добавить в скрипт ещё несколько строчек и он готов.

</Tasks>
</Package>

<} #>

</Packages>

</Biml>

Теперь следует проверить наш скрипт. Помните как сформировать пакет? Но вот сюрприз (ожидаемый). Сформировалось 4 пакета.

Таким образом мы сократили работу по разработке пакетов в несколько раз. В следующих статьях я планирую рассмотреть интересные сценарии использования Biml.

P.S. в статье использованы примеры из книги “SQL Server Integration Services Design Patterns

3 thoughts on “BIML. Включаем автоматизацию.

  1. Pingback: BIML. Сборник статей в этом блоге | Korshikov's

  2. Pingback: BIML. Создаем мастер-пакет. | Korshikov's

  3. Pingback: BIML. Разбираем ConstraintMode | Korshikov's

Comments are closed.