BIML. Усложняем задачу.

By | February 25, 2015

Мы научились создавать простые пакеты с помощью BIML. Теперь усложним задачу. Создадим скрипт BIML для пакета, осуществляющего инкрементальную загрузку данных. Для этого сначала проведем предварительную работу. Выполните скрипт T-SQL (наверняка вы будете использовать для этого SQL Server Management Studio) для создания баз данных и таблиц с данными:

USE master

Go

— база данных источник

If Not Exists(Select name From sys.databases Where name ‘SSISIncrementalLoad_Source’)

CREATE DATABASE [SSISIncrementalLoad_Source]

— база данных получатель

If Not Exists(Select name From sys.databases Where name ‘SSISIncrementalLoad_Dest’)
CREATE DATABASE [SSISIncrementalLoad_Dest]

GO

USE SSISIncrementalLoad_Source
GO
— таблица с исходными данными
If Not Exists(Select name From sys.tables Where name = ‘tblSource’)
CREATE TABLE dbo.tblSource
(ColID int NOT NULL
,ColA varchar(10NULL
,ColB datetime NULL
constraint df_ColB default (getDate())
,ColC int NULL
,constraint PK_tblSource primary key clustered (ColID))

USE
SSISIncrementalLoad_Dest
GO

— данные в таблице получателе
If Not Exists(Select name From sys.tables Where name = ‘tblDest’)
CREATE TABLE dbo.tblDest
(ColID int NOT NULL
,ColA varchar(10NULL
,ColB datetime NULL
,ColC int NULL)
— таблица для частично изменившихся данных
If Not Exists(Select name From sys.tables Where name = ‘stgUpdates’)
CREATE TABLE dbo.stgUpdates
(ColID int NULL
,ColA varchar(10NULL
,ColB datetime NULL
,ColC int NULL)

USE
SSISIncrementalLoad_Source
GO
— вставляем “неизмененная”, “измененная” и “новая” строки в источнике
INSERT INTO dbo.tblSource
(ColID,ColA,ColB,ColC)
VALUES (0, ‘A’, ‘1/1/2007 12:01 AM’, 1),
(1, ‘B’, ‘1/1/2007 12:02 AM’, 2),
(2, ‘N’, ‘1/1/2007 12:03 AM’3)

USE
SSISIncrementalLoad_Dest
GO
— вставляем “измененная” и “неизмененная” строки в получателе
INSERT INTO dbo.tblDest
(ColID,ColA,ColB,ColC)
VALUES
(0, ‘A’, ‘1/1/2007 12:01 AM’, 1),
(1, ‘C’, ‘1/1/2007 12:02 AM’, 2)

Думаю тут понятно, что проверка данных на новизну и изменение будет происходить по ключу ColID.

Пришла пора создать новый Biml файл, назовите его IncrementalLoad.biml. Добавьте в него узел Connection с двумя подузлами для соединений, называемых SSISIncrementalLoad_Source и SSISIncrementalLoad_Dest и осуществляющих соединение с базами данных, созданных на предыдущем этапе. У меня этот узел выглядит следующим образом (строку соединения напишите в соответствии с вашими сервером и способом аутентификации):

<Connections>

<Connection Name=SSISIncrementalLoad_Source ConnectionStringData Source=SQL\SQL14;Initial Catalog=SSISIncrementalLoad_Source;Provider=SQLNCLI11.1;Integrated Security=SSPI; />
<Connection Name=SSISIncrementalLoad_Dest” ConnectionStringData Source=SQL\SQL14;Initial Catalog=SSISIncrementalLoad_Dest;Provider=SQLNCLI11.1;Integrated Security=SSPI; />

</Connections>

Добавим описание пакета с именем IncrementalLoadPackage и свойствами ConstraintMode=

“Parallel” и ProtectionLevel=”EncryptSensitiveWithUserKey”. В этом пакете будет выполняться задача выполнения команды T-SQL: Truncate table stgUpdates.

Вот так выглядит теперь скрипт Biml:

<Biml xmlns=http://schemas.varigence.com/biml.xsd>

<Connections>

<Connection  Name=SSISIncrementalLoad_Source” ConnectionString=Data Source=SQL\SQL14;Initial Catalog=SSISIncrementalLoad_Source;Provider=SQLNCLI11.1;Integrated Security=SSPI; /> 

<Connection Name=SSISIncrementalLoad_Dest” ConnectionString=Data Source=SQL\SQL14;Initial Catalog=SSISIncrementalLoad_Dest;Provider=SQLNCLI11.1;Integrated Security=SSPI; />

</Connections>

<Packages>

<Package Name=IncrementalLoadPackage” ConstraintMode=Parallel” ProtectionLevel=EncryptSensitiveWithUserKey>

<Tasks>

<ExecuteSQL Name=Truncate stgUpdates” ConnectionName=SSISIncrementalLoad_Dest>

<DirectInput>Truncate Table stgUpdates</DirectInput>

</ExecuteSQL>

</Tasks>

</Package>

</Packages>

</Biml>

Добавим теперь задачу Data Flow. Для этого после тэга </ExecuteSQL> добавим узел Dataflow. Внутри него создадим узел PrecedenceConstraints, в нем будет указываться из какой задачи и при каком условии пакет будет переходить к этой задаче.

<Dataflow Name=Load tblDest>

<PrecedenceConstraints>

<Inputs>

<Input OutputPathName=Truncate stgUpdates.Output />

</Inputs>

</PrecedenceConstraints>

</Dataflow>

На следующем этапе нам нужно указать метаданные, которые определят преобразования, являющиеся сердцем задачи Data Flow. Мы будем реализовывать инкрементальную загрузку, которая включает в себя OLE DB Source адаптер, преобразование Lookup, преобразование Condition Split и OLE DB Destination адаптеры. Должен получиться вот такой вариант задачи Data Flow.

Т.е. сначала мы получаем данные из таблицы dbo.tblSource, затем проверяем какие из полученных строк есть в таблице dbo.tblDest (проверку делаем по полю ColID). Не найденные строки (т.е. новые строки) отправляем сразу в таблицу dbo.tblDest, а среди найденных строк проводим фильтрацию на предмет того есть ли изменения в остальных столбцах. Строки с изменениями пишем в таблицу dbo.stgUpdates. В таблицу dbo.tblDest измененные строки запишем позже.

Не вдаваясь в подробности языка Biml приведу готовый скрипт для преобразований в задаче Data Flow. Его достаточно легко понять.

<Transformations>

<OleDbSource Name=tblSource Source” ConnectionName=SSISIncrementalLoad_Source>

<ExternalTableInput Table=dbo.tblSource />

</OleDbSource>

<Lookup Name=Correlate” OleDbConnectionName=SSISIncrementalLoad_Dest” NoMatchBehavior=RedirectRowsToNoMatchOutput>

<InputPath OutputPathName=tblSource Source.Output />

<DirectInput>SELECT ColID, ColA, ColB, ColC FROM dbo.tblDest</DirectInput>

<Inputs>

<Column SourceColumn=ColID” TargetColumn=ColID />

</Inputs>

<Outputs>

<Column SourceColumn=ColA” TargetColumn=Dest_ColA />

<Column SourceColumn=ColB” TargetColumn=Dest_ColB />

<Column SourceColumn=ColC” TargetColumn=Dest_ColC />

</Outputs>

</Lookup>

<OleDbDestination Name=tblDest Destination” ConnectionName=SSISIncrementalLoad_Dest>

<InputPath OutputPathName=Correlate.NoMatch />

<ExternalTableOutput Table=dbo.tblDest />

</OleDbDestination>

<ConditionalSplit Name=Filter>

<InputPath OutputPathName=Correlate.Match/>

<OutputPaths>

<OutputPath Name=Changed Rows>

<Expression>

(ColA != Dest_ColA) || (ColB != Dest_ColB) || (ColC != Dest_ColC)

</Expression>

</OutputPath>

</OutputPaths>

</ConditionalSplit>

<OleDbDestination Name=stgUpdates” ConnectionName=SSISIncrementalLoad_Dest>

<InputPath OutputPathName=Filter.Changed Rows />

<ExternalTableOutput Table=dbo.stgUpdates />

</OleDbDestination>

</Transformations>

И теперь нам остается добавить задачу, которая будет обновлять в получателе те строки, которые были изменены в источнике. Как вы помните, мы выгрузили эти строки в таблицу dbo.stgUpdates. Будем использовать задачу типа ExecuteSQL.

<ExecuteSQL Name=Apply stgUpdates” ConnectionName=SSISIncrementalLoad_Dest>

<PrecedenceConstraints>

<Inputs>

<Input OutputPathName=Load tblDest.Output />

</Inputs>

</PrecedenceConstraints>

<DirectInput>

Update Dest

Set Dest.ColA = Upd.ColA

,Dest.ColB = Upd.ColB

,Dest.ColC = Upd.ColC

From tblDest Dest

Join stgUpdates Upd

On Upd.ColID = Dest.ColID

</DirectInput>

</ExecuteSQL>

Давайте приведу полный скрипт для этого решения:

<Biml xmlns=http://schemas.varigence.com/biml.xsd>

<Connections>

<Connection Name=SSISIncrementalLoad_Source” ConnectionString=Data Source=SQL\SQL14;Initial Catalog=SSISIncrementalLoad_Source;Provider=SQLNCLI11.1;Integrated Security=SSPI; />

<Connection Name=SSISIncrementalLoad_Dest” ConnectionString=Data Source=SQL\SQL14;Initial Catalog=SSISIncrementalLoad_Dest;Provider=SQLNCLI11.1;Integrated Security=SSPI; />

</Connections>

<Packages>

<Package Name=IncrementalLoadPackage” ConstraintMode=Parallel” ProtectionLevel=EncryptSensitiveWithUserKey>

<Tasks>

<ExecuteSQL Name=Truncate stgUpdates” ConnectionName=SSISIncrementalLoad_Dest>

<DirectInput>Truncate Table stgUpdates</DirectInput>

</ExecuteSQL>

<Dataflow Name=Load tblDest>

<PrecedenceConstraints>

<Inputs>

<Input OutputPathName=Truncate stgUpdates.Output />

</Inputs>

</PrecedenceConstraints>

<Transformations>

<OleDbSource Name=tblSource Source” ConnectionName=SSISIncrementalLoad_Source>

<ExternalTableInput Table=dbo.tblSource />

</OleDbSource>

<Lookup Name=Correlate” OleDbConnectionName=SSISIncrementalLoad_Dest” NoMatchBehavior=RedirectRowsToNoMatchOutput>

<InputPath OutputPathName=tblSource Source.Output />

<DirectInput>SELECT ColID, ColA, ColB, ColC FROM dbo.tblDest</DirectInput>

<Inputs>

<Column SourceColumn=ColID” TargetColumn=ColID />

</Inputs>

<Outputs>

<Column SourceColumn=ColA” TargetColumn=Dest_ColA />

<Column SourceColumn=ColB” TargetColumn=Dest_ColB />

<Column SourceColumn=ColC” TargetColumn=Dest_ColC />

</Outputs>

</Lookup>

<OleDbDestination Name=tblDest Destination” ConnectionName=SSISIncrementalLoad_Dest>

<InputPath OutputPathName=Correlate.NoMatch />

<ExternalTableOutput Table=dbo.tblDest />

</OleDbDestination>

<ConditionalSplit Name=Filter>

<InputPath OutputPathName=Correlate.Match/>

<OutputPaths>

<OutputPath Name=Changed Rows>

<Expression>

(ColA != Dest_ColA) || (ColB != Dest_ColB) || (ColC != Dest_ColC)

</Expression>

</OutputPath>

</OutputPaths>

</ConditionalSplit>

<OleDbDestination Name=stgUpdates” ConnectionName=SSISIncrementalLoad_Dest>

<InputPath OutputPathName=Filter.Changed Rows />

<ExternalTableOutput Table=dbo.stgUpdates />

</OleDbDestination>

</Transformations>

</Dataflow>

<ExecuteSQL Name=Apply stgUpdates” ConnectionName=SSISIncrementalLoad_Dest>

<PrecedenceConstraints>

<Inputs>

<Input OutputPathName=Load tblDest.Output />

</Inputs>

</PrecedenceConstraints>

<DirectInput>

Update Dest

Set Dest.ColA = Upd.ColA

,Dest.ColB = Upd.ColB

,Dest.ColC = Upd.ColC

From tblDest Dest

Join stgUpdates Upd

On Upd.ColID = Dest.ColID

</DirectInput>

</ExecuteSQL>

</Tasks>

</Package>

</Packages>

</Biml>

Теперь надо протестировать, то что у нас получилось. Сперва подготовим скрипт T-SQL для проверки значений в таблицах:

USE SSISIncrementalLoad_Source

GO

SELECT TableName ‘tblSource’ ,ColID ,ColA ,ColB ,ColC

FROM dbo.tblSource

GO

USE SSISIncrementalLoad_Dest

GO

SELECT TableName ‘tblDest’ ,[ColID] ,[ColA] ,[ColB] ,[ColC] 

FROM [dbo].[tblDest]

SELECT TableName = ‘stgUpdates’ ,[ColID] ,[ColA] ,[ColB] ,[ColC]

FROM [dbo].[stgUpdates]

GO

Он должен вернуть нам вот такие значения:

Ну а теперь сформируйте пакет и запустите его на выполнение. Если все сделано правильно, то пакет выполнится без проблем.

Кстати, если вы решите снова запустить этот пакет, то имеет смысл подготовить скрипт T-SQL для повторной инициализации данных:

USE SSISIncrementalLoad_Source

GO

TRUNCATE TABLE dbo.tblSource

— вставляем “неизмененная”, “измененная” и “новая” строки в источнике
INSERT INTO dbo.tblSource
(ColID,ColA,ColB,ColC)
VALUES (0, ‘A’, ‘1/1/2007 12:01 AM’, 1),
(1, ‘B’, ‘1/1/2007 12:02 AM’, 2),
(2, ‘N’, ‘1/1/2007 12:03 AM’3)
USE SSISIncrementalLoad_Dest

GO

TRUNCATE TABLE dbo.stgUpdates

TRUNCATE TABLE dbo.tblDest

— вставляем “измененная” и “неизмененная” строки в получателе
INSERT INTO dbo.tblDest
(ColID,ColA,ColB,ColC)
VALUES
(0, ‘A’, ‘1/1/2007 12:01 AM’, 1),
(1, ‘C’, ‘1/1/2007 12:02 AM’, 2) 

Итак, мы разобрали как сделать пакет из нескольких задач. Если вы уже давно работаете с SSIS, то возникнет вопрос зачем все это надо было? Ведь проще сразу сделать пакет SSIS. Но представьте что у вас десятки, а возможно и сотни подобных таблиц для которых надо сделать однотипную загрузку данных. В следующей статье я покажу как задействовать функционал автоматизации, заключенный в Biml.

P.S. в статье использованы примеры из книги “SQL Server Integration Services Design Patterns

2 thoughts on “BIML. Усложняем задачу.

  1. Pingback: BIML. Сборник статей в этом блоге | Korshikov's

  2. Pingback: BIML. Включаем автоматизацию. | Korshikov's

Comments are closed.