BizTalk Custom Pipeline for Splitting Messages

This article will show how a custom pipeline component can be used to split an incoming message into smaller individual messages for further processing. In this sample you will be shown how to use simple but powerful techniques that can be built upon and developed for use in a production environment.

Pre-requisites
This article assumes a mid level knowledge of BizTalk Server 2006 and BizTalk artefacts such as pipelines and custom pipeline components. As such, some knowledge for creating artefacts will be assumed.

What we will create
We will be creating the following

A BizTalk solution consisting of 3 projects:

A BizTalk project containing:
A message schema for the
An orchestration
A pipeline
A VB.Net class library project containing:
A custom pipeline component that does the debatching of the incoming message into smaller messages.
A VB Net class library containing:
The schema classes derived from the xsd in the BizTalk project

In addition we will need to:

Create the incoming message flat file sample
Configure the BizTalk application in Admin console

Create the BbizTalk project

Open Visual studio and create an empty BizTalk Project. Right click the project and add a new Schema.

Build the schema so it resembles the following screenshot:



In the Schema Properties window make sure you select the value Flat File Extension for the property Schema Editor Extensions.

Highligh the CompanyStats node and adjust it’s properties to be like the screenshot below. Pay particular attention to the Child delimiter properties (circled)



Next highlight the CompanyRow node and adjust it’s properties so they are as shown in the following screensot



Add the SchemaClasses Project

Next we create the schema class file from the CompanyStats.xsd. Open a command prompt and execute the following command:

Xsd.exe <path_to_CompanyStats.xsd> -c –l:VB

This will produce the CompanyStats.vb class file. This needs to be added to the project so add a new VB class library project to the solution and call it SchemaClasses. Add the CompanyStats.vb class file.

We now need to edit the CompanyStats.vb file slightly as xsd.exe uses an overly verbose naming convention. Open the CompanyStats.vb file in Visual Studio and replace all instances of CompanyStatsCompanyRow with CompanyRow. There should be four in all. Save the file, sign the project and build.

Create the Debatch custom component
This is the part that does all the work. It reads through an incoming flat file, detecting where to split the data into smaller messages and sends them down the receive pipeline to the messagebox. The orchestration we will build next will then subscribe to theses messages.

Add a new VB class library project to the solution and call it PipelineComponents.
Add references to the project so they are as the following screenshot:



Rename the Class1.vb file to Debatcher.vb. Highlight everything in the file and copy and paste the following code over it.

Imports System.IO
Imports Microsoft.BizTalk.Component.Interop
Imports Microsoft.BizTalk.Message.Interop
Imports System.ComponentModel
Imports System.Text
Imports System.Text.ASCIIEncoding
Imports System.Reflection
Imports System.Diagnostics
Imports System.Xml
Imports Microsoft.BizTalk.Component
Imports System.Text.RegularExpressions
Imports SchemaClasses

<ComponentCategory(CategoryTypes.CATID_PipelineComponent), _
ComponentCategory(CategoryTypes.CATID_DisassemblingParser), _
System.Runtime.InteropServices.Guid("F848D0A0-B087-4af7-B207-738DDD2B682A")> _
Public Class Debatcher

    
Implements Microsoft.BizTalk.Component.Interop.IBaseComponent
    
Implements Microsoft.BizTalk.Component.Interop.IPersistPropertyBag
    
Implements Microsoft.BizTalk.Component.Interop.IComponentUI
    
Implements Microsoft.BizTalk.Component.Interop.IDisassemblerComponent

    
Private qOutputMsgs As System.Collections.Queue = New System.Collections.Queue()
    
Private inMsg As IBaseMessage
    
Private companyRow As CompanyRow


#Region "IBaseComponent members"

    
Public ReadOnly Property Description() As String Implements Microsoft.BizTalk.Component.Interop.IBaseComponent.Description
        
Get
            
Return "Component to debatch incoming messages into multiple smaller messages."
        
End Get
    
End Property

    
Public ReadOnly Property Name() As String Implements Microsoft.BizTalk.Component.Interop.IBaseComponent.Name
        
Get
            
Return "Debatch message Component"
        
End Get
    
End Property

    
Public ReadOnly Property Version() As String Implements Microsoft.BizTalk.Component.Interop.IBaseComponent.Version
        
Get
            
Return "1.0.0.0"
        
End Get
    
End Property

#End Region

    
Public Sub Disassemble(ByVal pContext As IPipelineContext, ByVal pInMsg As IBaseMessage) Implements IDisassemblerComponent.Disassemble

         
Dim bodyPart As IBaseMessagePart = pInMsg.BodyPart
         
Dim originalStrm As Stream = Nothing
        
Dim rowString As New StringBuilder
        
Dim completeRow As String = String.Empty
        
Dim xmlWriter As XmlWriter = Nothing
        
Dim namespaceURI As String = "http://EnvelopeDebatcher.CompanyStats"
        
Dim rootElement As String = "CompanyStats"
        
Dim colDataHash As New Hashtable
        
Dim rowArray As String()

        inMsg = pInMsg

        
Try
            
If Not (bodyPart Is Nothing) Then
                originalStrm = bodyPart.
Data

                 
If Not (originalStrm Is Nothing) Then

                     originalStrm.
Position = 0
                     
Dim data As String = ""
                     
Dim sreader As StreamReader = New StreamReader(originalStrm, System.Text.Encoding.UTF8)
                     
Dim strLine As String

                    strLine = sreader.
ReadLine()

                     
While Not strLine Is Nothing
                         
' If the line contains alphanumeric characters, it is not a seperator line made up of semi-colons
                         
If RegularExpressions.Regex.Match(strLine.Trim(), "[a-zA-Z0-9]").Success Then

                            rowArray = strLine.
Split(";")
                            companyRow =
New CompanyRow

                            AssignHashTable(companyRow, colDataHash, rowArray)

                              companyRow.
CompanyName = colDataHash("CompanyName")
                            companyRow.SalesQ1 = colDataHash(
"SalesQ1")
                            companyRow.SalesQ2 = colDataHash(
"SalesQ2")
                            companyRow.SalesQ3 = colDataHash(
"SalesQ3")
                            companyRow.SalesQ4 = colDataHash(
"SalesQ4")
                              companyRow.
Year = colDataHash("Year")

                              rowString.
Append(ClassToString(companyRow))

                              colDataHash.
Clear()
                         
Else
                              
' We've reached a seperator line in the file. Create a message
                              
Dim ms As New MemoryStream
                            CreateXmlWriter(xmlWriter, ms)

                              xmlWriter.
WriteStartDocument()
                              xmlWriter.
WriteStartElement("ns0", rootElement, namespaceURI)
                              xmlWriter.WriteAttributeString(
"xmlns", "ns0", Nothing, namespaceURI)
                              xmlWriter.
WriteRaw(rowString.ToString.Replace(",", "."))
                              xmlWriter.
WriteEndElement()
                              xmlWriter.
Flush()
                              ms.
Seek(0, SeekOrigin.Begin)
                              
data = Unicode.GetChars(ms.ToArray)
                              xmlWriter.
Close()

                            CreateOutgoingMessage(pContext,
data, namespaceURI, rootElement)
                              rowString.
Remove(0, rowString.Length)
                              
data = String.Empty
                         
End If
                        strLine = sreader.
ReadLine()
                     
End While
                 
End If
            
End If
        
Catch ex As Exception
             
System.Diagnostics.EventLog.WriteEntry("Debatcher - Disassemble.", ex.Message + "\n" + ex.StackTrace, System.Diagnostics.EventLogEntryType.Error)
        
End Try
    
End Sub

    
''' <summary>
    
''' This method cycles through all the properties of the companyRow and populates them with the contents of the hash table
    
''' </summary>
    
''' <param name="companyRow"></param>
    
''' <param name="arrColData"></param>
    
''' <param name="colData"></param>
    
''' <remarks></remarks>
    
Private Sub AssignHashTable(ByRef companyRow As CompanyRow, ByRef arrColData As Hashtable, ByVal colData As String())
        
Dim index As Integer
        
Dim type As Type = companyRow.GetType()
        
Dim p As PropertyInfo
        
Dim props() As PropertyInfo = type.GetProperties()
        
Try
            
For index = 0 To props.Length - 1
                p =
props.GetValue(index)
                 arrColData.
Add(p.Name, colData(index))
            
Next
        
Catch ex As Exception
             
System.Diagnostics.EventLog.WriteEntry("Debatcher - AssignHashTable", ex.Message + "\n" + ex.StackTrace, System.Diagnostics.EventLogEntryType.Error)
        
End Try
    
End Sub

    
Private Sub CreateXmlWriter(ByRef xmlWriter As XmlWriter, ByRef ms As MemoryStream)
        
Dim settings As New XmlWriterSettings()
        
Try
            
settings.Encoding = New UnicodeEncoding(False, False)
            
settings.CloseOutput = True
            xmlWriter = XmlTextWriter.
Create(ms, settings)
        
Catch ex As Exception
             
System.Diagnostics.EventLog.WriteEntry(ex.Source, ex.Message + "\n" + ex.StackTrace, System.Diagnostics.EventLogEntryType.Error)
        
End Try

    
End Sub

    
Private Sub CreateOutgoingMessage(ByVal pContext As IPipelineContext, ByVal row As String, ByVal namespaceURI As String, ByVal rootElement As String)
        
Dim outMsg As IBaseMessage
        
Dim systemPropertiesNamespace As String = "http://schemas.microsoft.com/BizTalk/2003/system-properties"
        
Try
            
''create outgoing message (utf-16)
            
Dim bufferOoutgoingMessage As Byte() = System.Text.UnicodeEncoding.Unicode.GetBytes(row)
            outMsg = pContext.GetMessageFactory().
CreateMessage()
            
' Assign context of incoming message to queued message
            outMsg.
Context = inMsg.Context
            outMsg.AddPart(
"Body", pContext.GetMessageFactory().CreateMessagePart(), True)
            outMsg.
Context.Promote("MessageType", systemPropertiesNamespace, namespaceURI + "#" + rootElement.Replace("ns0:", ""))
            outMsg.BodyPart.
Data = New MemoryStream(bufferOoutgoingMessage)
             qOutputMsgs.
Enqueue(outMsg)
        
Catch ex As Exception
             
Throw New ApplicationException("Error in queueing outgoing messages: " + ex.Message)
        
End Try

    
End Sub

    
Private Function ClassToString(ByRef row As CompanyRow) As String
        
Dim ns As Serialization.XmlSerializerNamespaces = New Serialization.XmlSerializerNamespaces()
        
Dim x As New Xml.Serialization.XmlSerializer(row.GetType)
        
Dim sw As New IO.StringWriter()
        
Dim writer As XmlWriter = New XmlTextWriterFormattedNoDeclaration(sw)
         ns.
Add("", "")
        
x.Serialize(writer, row, ns)

        
Return sw.ToString()

    
End Function

    
Public Function GetNext(ByVal pContext As IPipelineContext) As IBaseMessage Implements IDisassemblerComponent.GetNext
         
Try
            
If (qOutputMsgs.Count > 0) Then
                 
Dim outMsg As IBaseMessage
                outMsg =
CType(qOutputMsgs.Dequeue(), IBaseMessage)
                 outMsg.BodyPart.
Data.Seek(0, SeekOrigin.Begin)
                 
Return outMsg
             
Else
                 
Return Nothing
            
End If
        
Catch ex As Exception
             
System.Diagnostics.EventLog.WriteEntry("EPEX Debatcher - GetNext", ex.Message + "\n" + ex.StackTrace, System.Diagnostics.EventLogEntryType.Error)
        
End Try

        
Return Nothing

    
End Function


#Region "IPersistPropertyBag members"

    
Public Sub GetClassID(ByRef classID As System.Guid) Implements Microsoft.BizTalk.Component.Interop.IPersistPropertyBag.GetClassID
        classID =
New Guid("F706702A-E78F-485c-AE5C-AB08AF19BED5")
    
End Sub

    
Public Sub InitNew() Implements Microsoft.BizTalk.Component.Interop.IPersistPropertyBag.InitNew

    
End Sub

    
Public Sub Load(ByVal propertyBag As Microsoft.BizTalk.Component.Interop.IPropertyBag, ByVal errorLog As Integer) Implements Microsoft.BizTalk.Component.Interop.IPersistPropertyBag.Load

    
End Sub

    
Public Sub Save(ByVal propertyBag As Microsoft.BizTalk.Component.Interop.IPropertyBag, ByVal clearDirty As Boolean, ByVal saveAllProperties As Boolean) Implements Microsoft.BizTalk.Component.Interop.IPersistPropertyBag.Save

    
End Sub

#End Region

#Region "IComponentUI members"
    <
Browsable(False)> _
        
Public ReadOnly Property Icon() As IntPtr Implements Microsoft.BizTalk.Component.Interop.IComponentUI.Icon
        
Get
            
Return Nothing
        
End Get
    
End Property

    
Public Function Validate(ByVal obj As Object) As System.Collections.IEnumerator Implements Microsoft.BizTalk.Component.Interop.IComponentUI.Validate
        
Return Nothing
    
End Function
#End Region

End Class

''' <summary>
''' Class to to supress XML Processor instruction "<?xml version="1.0"?>"
''' </summary>
''' <remarks></remarks>
Public Class XmlTextWriterFormattedNoDeclaration
    
Inherits System.Xml.XmlTextWriter

    
Private w As System.IO.TextWriter

    
Public Sub New(ByVal w As System.IO.TextWriter)
        
MyBase.New(w)
        Formatting =
Xml.Formatting.Indented
    
End Sub

    
''' <summary>
    
''' "null" implementation to supress writing "<?xml version="1.0"?>"
    
''' </summary>
    
''' <remarks></remarks>
    
Public Overrides Sub WriteStartDocument()

    
End Sub
End Class

The Disassemble method of this class is the powerhouse and does most of the work. In simple terms this method reads the incoming message one line at a time. If the line is not a separator line (in this case if the line contains alphanumeric characters) then create a new CompanyRow class object, deserialize it to a stringbuilder. When a separator line is hit the contents of the stringbuilder are used to create  a xml file conforming to the CompanyRow schema. The xml message, a ‘debatched’ instance, is added to an outgoing queue of messages.

Once the file has been completely processed the GetNext method of the component is called automatically once for each debatched message in the queue. Each of these messages is sent down the pipeline to the message box.

Sign the Pipelinecomponents project and build. Navigate to the directory where the PipelineComponents dll was saved and copy it to the pipelineComponents directory of your BizTalk installation. This is normally C:\Program Files\Microsoft BizTalk 2006\PipelineComponents. This makes it visible to the toolbox of Visual Studio when we create the pipeline. The last thing to do here is to add a reference to the pipelines component project to the BizTalk project. Right click the references folder in the BizTalk project and add a project reference.

Create the Receive Pipeline
In the BizTalk project add a new Receive pipeline calling it DebatchPipeline.btp. Right click the toolbox and add the pipeline component we just created. Drag the component onto the pipeline and drop it onto the Disassemble node.

Your pipeline should look something like this. Leave all the properties at their defaults.



Note: Using this custom debatcher is essentially using your own Dissassembler and so there is no need to include a disassessmbler component in the receive
pipeline

Create the Orchestration
The orchestration in this example is a very simple one which just receives the file from the pipeline and sends it to a directory located on your PC. In a real world environment more complicated business processes would more than likely take place. The orchestration simply writes out the debatched files to the file system.

Add a new orchestration to the BizTalk project and call it SendDebatchedMsg.

It will be a very simple orchestration and will end up looking like this:



Add and configure the following receive port to the orchestration



Add and configure the following Receive shape to the orchestration:



Add and configure the following send shape:



Add and configure the following Send port to the Orchestration:



Add and configure a new message type in the orchestration view window



For the message type click the button that appears with three elipses and expand the Schemas option to show the EnvelopeDebatcher.CompanyStats reference.

Now all that needs to be done is to compile and deploy the solution. Make sure all projects are signed and that the Application Name in the BizTalk project properties is set (Properties ->Configuration properties->Deployment) otherwise it will deployed into the default BizTalk Application 1 application.
One last thing we need to create is the incoming flat file. This example assumes a flat file looking like the following:

Company A;12250.25;15690.65;895632.1;78965.08;2009
Company A;24589.25;65213.02;98526.33;11489.66;2008
Company A;741526.25;896541.88;636398.64;78912.54;2007
Company A;654998.22;123458.07;784512.63;552214.89;2006
;;;;;;
Company B;965.2;452.69;852.14;654.99;2009
Company B;456.21;101.02;321.85;663.99;2008
Company B;785;459.96;213.77;995.52;2007
;;;;;;
Company C;1010.55;7954.62;4665.88;2001.63;2009
Company C;7894.25;3571.59;4658.2;6715.62;2008
;;;;;;

As you’ll notice this is a very stripped down file and its format is very similar to files that are processed by BizTalk in many real world applications I have worked on.

The lines comprised of semi-colons are the separator lines. When processed this file will generate three separate messages that in turn generate three separate files that will get written to your file system courtesy of your send pipeline

Once you’ve deployed and started your application, drop the flat file in your receive directory. After a few seconds check your outbound directory. You should see three separate files named <GUID>.xml. Examining the contents of these files you’ll see that the pipeline component we just built has split the messages, creating one for each company in the input file.

<?xml version="1.0" encoding="utf-16"?>
<ns0:CompanyStats
xmlns:ns0="http://EnvelopeDebatcher.CompanyStats">
<CompanyRow>
<
CompanyName>Company A</CompanyName>
<SalesQ1>12250.25</SalesQ1>
<SalesQ2>15690.65</SalesQ2>
<SalesQ3>895632.1</SalesQ3>
<SalesQ4>78965.08</SalesQ4>
<
Year>2009</Year>
</CompanyRow>
<CompanyRow>
<
CompanyName>Company A</CompanyName>
<SalesQ1>24589.25</SalesQ1>
<SalesQ2>65213.02</SalesQ2>
<SalesQ3>98526.33</SalesQ3>
<SalesQ4>11489.66</SalesQ4>
<
Year>2008</Year>
</CompanyRow>
<CompanyRow>
<
CompanyName>Company A</CompanyName>
<SalesQ1>741526.25</SalesQ1>
<SalesQ2>896541.88</SalesQ2>
<SalesQ3>636398.64</SalesQ3>
<SalesQ4>78912.54</SalesQ4>
<
Year>2007</Year>
</CompanyRow>
<CompanyRow>
<
CompanyName>Company A</CompanyName>
<SalesQ1>654998.22</SalesQ1>
<SalesQ2>123458.07</SalesQ2>
<SalesQ3>784512.63</SalesQ3>
<SalesQ4>552214.89</SalesQ4>
<
Year>2006</Year>
</CompanyRow>
</ns0:CompanyStats>



<?xml version="1.0" encoding="utf-16"?>
<ns0:CompanyStats
xmlns:ns0="http://EnvelopeDebatcher.CompanyStats">
<CompanyRow>
<
CompanyName>Company B</CompanyName>
<SalesQ1>965.2</SalesQ1>
<SalesQ2>452.69</SalesQ2>
<SalesQ3>852.14</SalesQ3>
<SalesQ4>654.99</SalesQ4>
<
Year>2009</Year>
</CompanyRow>
<CompanyRow>
<
CompanyName>Company B</CompanyName>
<SalesQ1>456.21</SalesQ1>
<SalesQ2>101.02</SalesQ2>
<SalesQ3>321.85</SalesQ3>
<SalesQ4>663.99</SalesQ4>
<
Year>2008</Year>
</CompanyRow>
<CompanyRow>
<
CompanyName>Company B</CompanyName>
<SalesQ1>785</SalesQ1>
<SalesQ2>459.96</SalesQ2>
<SalesQ3>213.77</SalesQ3>
<SalesQ4>995.52</SalesQ4>
<
Year>2007</Year>
</CompanyRow>
</ns0:CompanyStats>


<?
xml version="1.0" encoding="utf-16"?>
<ns0:CompanyStats
xmlns:ns0="http://EnvelopeDebatcher.CompanyStats">
<CompanyRow>
<
CompanyName>Company C</CompanyName>
<SalesQ1>1010.55</SalesQ1>
<SalesQ2>7954.62</SalesQ2>
<SalesQ3>4665.88</SalesQ3>
<SalesQ4>2001.63</SalesQ4>
<
Year>2009</Year>
</CompanyRow>
<CompanyRow>
<
CompanyName>Company C</CompanyName>
<SalesQ1>7894.25</SalesQ1>
<SalesQ2>3571.59</SalesQ2>
<SalesQ3>4658.2</SalesQ3>
<SalesQ4>6715.62</SalesQ4>
<
Year>2008</Year>
</CompanyRow>
</ns0:CompanyStats>


This article shows you how to use simple but powerful techniques to build your own custom pipeline component that will split an incoming flat file into smaller messages. An orchestration will then write your ‘debatched’ files to your file system. It would be relatively easy to build on and add more processing steps to the solution and customise it to our own needs

By BiZTech Know   Popularity  (11936 Views)