Sunday, December 23, 2012

Windows Azure 1.8 SDK– Queue to Queue Transfers

I was recently watching an episode of the Cloud Cover show where Abhishek Lal was talking about some of the recent features that are available in SDK 1.8.  One of the features that stood out for me was Queue to Queue transfers. A Queue to Queue transfer allows for a publisher to push content to a Queue.  Next, the Queue that just received the message can now push it out to another Queue automatically.

This new capability supports a couple design patterns:

  • Fan In – Where you have multiple systems publishing messages and you want to reduce the receiving endpoint surface down to a smaller number.
  • Fan Out – Where you have a single source message that you want to disseminate to more parties

The focus of this post is the Fan In scenario.  The diagram below describes the messaging pattern that we would like to enforce.  In this case we have 4 publishers.  Let’s pretend these are retailers who are now requesting more supply of a particular product.  If we want isolation between publishers then we would create a queue for each Publisher.  However, if we are interested in order delivery we now have race conditions that exist on the Receiver side.  Since this is a BizTalk Blog, BizTalk is acting as my Receiver.  Since we have 4 queues with 4 unique URIs this translates into 4 BizTalk Receive Locations (the blue RL boxes below).  We do not have any control over when and how those messages are received.  In my opinion this problem gets worse if we are building our own .Net client that is checking each queue looking for new messages.  Even if we are trying to be “fair” about the way in which we check the queues for new messages we don’t have any assurances of in order delivery.

image

Let’s make our lives easier and let the Service Bus maintain the order of messages through Queue to Queue  transfers and have a single endpoint that we need to consume from.  It will also simplify our BizTalk configuration as we will only need 1 Receive Location.

image

 

Solution

Within Visual Studio I am going to create a Solution that has 3 C# console applications.

image

QueueToQueuePublisher Project

The core of the overall solution is the QueueToQueuePublisher project.  Within it there are two classes:

  • DataContracts.cs -  contains our class that we will use as our PurchaseOrder
  • Program.cs -  is where we will create our Queues and establish our Queue to Queue forwarding.

image

DataContracts Class

If we further examine the DataContracts class we will discover the following object:

namespace QueueToQueuePublisher
{
    public class PurchaseOrder
    {
        public string ProductID { get; set; }
        public int QuantityRequired { get; set; }
        public string CompanyID { get; set; }
    }
}

 

Program Class

In Program.cs things get a little more interesting


using System;
using System.Collections.Generic;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;

namespace QueueToQueuePublisher
{
    public class Program
    {
        const string CommonQueueName = "CommonQueue";
        const string PubAQueueName = "PubAQueue";
        const string PubBQueueName = "PubBQueue";
        const string ServiceNamespace = "<your_namespace>";
        const string IssuerName = "owner";
        const string IssuerKey ="<your_key>";

        static void Main(string[] args)
        {

            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(Program.IssuerName, Program.IssuerKey);
            Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", Program.ServiceNamespace, string.Empty);

            try
            {
                //*************************************************************************************************
                //                                   Management Operations
                //**************************************************************************************************         
                NamespaceManager namespaceClient = new NamespaceManager(serviceUri, credentials);
                if (namespaceClient == null)
                {
                    Console.WriteLine("\nUnexpected Error: NamespaceManager is NULL");
                    return;
                }

                Console.WriteLine("\nCreating Queue '{0}'...", Program.CommonQueueName);

                // Create Queue if it doesn't exist.
                //This Queue must exist prior to another
                //Queue forwarding messages to it
                if (!namespaceClient.QueueExists(Program.CommonQueueName))
                {
                    namespaceClient.CreateQueue(Program.CommonQueueName);
                }

                // Create Publisher A's Queue if it doesn't exist
                if (!namespaceClient.QueueExists(Program.PubAQueueName))
                {
                    QueueDescription qd = new QueueDescription(Program.PubAQueueName);

                    //This is where we establish our forwarding
                    qd.ForwardTo = Program.CommonQueueName;
                    namespaceClient.CreateQueue(qd);
                }

                // Create Publisher B's Queue if it doesn't exist
                if (!namespaceClient.QueueExists(Program.PubBQueueName))
                {
                     QueueDescription qd = new QueueDescription(Program.PubBQueueName);
                    {
                         //This is where we establish our forwarding

                         qd.ForwardTo = Program.CommonQueueName;
                         namespaceClient.CreateQueue(qd);
                    };
                  
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }
    }
}

 

Within this class, the purpose is to:

  1. Create the Common Queue, if it does not already exist.
  2. If Publisher A’s Queue does not exist, create a new Queue Description and include the ForwardTo directive that will forward messages from the Publisher A Queue to the Common Queue. We will then use this Queue Description to create the Publisher A Queue.
  3. If Publisher B’s Queue does not exist, create a new Queue Description and include the ForwardTo directive that will forward messages from the Publisher B Queue to the Common Queue. We will then use this Queue Description to create the Publisher B Queue.

The Program.cs class that is part of this project only needs to run once in order to setup and configure our queues.

 

Publisher A Project

The purpose of this Project is very simple.  We want to create an instance of a Purchase Order and publish this message to our Publisher A Queue.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using QueueToQueuePublisher;
using System.Runtime.Serialization;


namespace PublisherA
{
    class Program
    {

            const string SendQueueName = "pubaqueue";
            const string ServiceNamespace = "<your_namespace>";
            const string IssuerName ="owner";
            const string IssuerKey = "<your_key>";
      
               
       static void Main(string[] args)
        {

          

     
            //***************************************************************************************************
            //                                   Get Credentials
            //***************************************************************************************************          
            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider  (Program.IssuerName, Program.IssuerKey);
            Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", Program.ServiceNamespace, string.Empty);

            MessagingFactory factory = null;

            try
            {
                PurchaseOrder po = new PurchaseOrder();

                po.CompanyID = "PublisherA";
                po.ProductID = "A1234";
                po.QuantityRequired = 300;

                factory = MessagingFactory.Create(serviceUri, credentials);

                QueueClient myQueueClient = factory.CreateQueueClient(Program.SendQueueName);

                BrokeredMessage message = new BrokeredMessage(po, new DataContractSerializer(typeof(PurchaseOrder)));
                Console.WriteLine("Publisher A sending message");
                myQueueClient.Send(message);


            }
            catch(Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }

        }
    }
}

Publisher B Project

This project is pretty much a carbon copy of the Publisher A Project with the difference being that we are going to send messages to our Publisher B Queue instead of the Publisher A Queue.  I have included this project for completeness.

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using QueueToQueuePublisher;
using System.Runtime.Serialization;


namespace PublisherA
{
    class Program
    {

        const string SendQueueName = "pubbqueue";
        const string ServiceNamespace = "<your_namespace>";
        const string IssuerName = "owner";
        const string IssuerKey = "<your_key>";


        static void Main(string[] args)
        {

 


            //***************************************************************************************************
            //                                   Get Credentials
            //***************************************************************************************************          
            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(Program.IssuerName, Program.IssuerKey);
            Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", Program.ServiceNamespace, string.Empty);

            MessagingFactory factory = null;

            try
            {
                PurchaseOrder po = new PurchaseOrder();

                po.CompanyID = "PublisherB";
                po.ProductID = "B1234";
                po.QuantityRequired = 300;

                factory = MessagingFactory.Create(serviceUri, credentials);

                QueueClient myQueueClient = factory.CreateQueueClient(Program.SendQueueName);


                BrokeredMessage message = new BrokeredMessage(po, new DataContractSerializer(typeof(PurchaseOrder)));
                Console.WriteLine("Publisher B sending message");
                myQueueClient.Send(message);


            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }

        }
    }
}

BizTalk

From a BizTalk perspective we are going to keep this very simple.  We will simply create a Send Port Subscription and write a copy of any message that is retrieved from the Common Service Bus Queue to disk.

In order to configure a Send Port Subscription we need to first create a Receive Port and Receive Location.  This Receive Location will connect to our ServiceBus Namespace and will be looking for messages form the CommonQueue only.  As you may recall the messages that are sent to the individual Publisher A and B Queues will be forwarded to this Common Queue. 

Also note, since I am not deploying any Schemas we want to use the PassThruReceive pipeline.  If you specify XMLReceive then BizTalk will be looking for a schema that doesn’t exist.

image

Our Send Port will consist of using the FILE Adapter to write our messages to the local file system.

image

In order for our Send Port Subscription to work we do need to create a Filter based upon our Receive Port Name.

image

At this point we can enable our BizTalk Application.

 

Testing

In order to test our application we do need to make sure that the QueueToQueuePublisher console application is run.  This will create our common queue and our two publisher queues.  After running this application we should see the following within our namespace.

image

If we want to test our Queue To Queue forwarding we can simply create a test message in our pubaqueue and then receive the test message from our commonqueue.

image

image

image

Now that our Queue configuration has been verified we can run an instance of our PublisherA console application.

image

If we check our file folder that our send port is writing to we should see a new file has been written.

image

We can now perform the same actions with PublisherB.

image

image

Conclusion

As you can see the Queue to Queue forwarding is a pretty neat feature.  We can use it for Fan In, as in our scenario, messaging scenarios that forces Service Bus to worry about in order delivery and simplifies a Receiver’s endpoint surface.  Arguably it creates more configuration in the cloud so there may be a bit of a trade off in that regard.

Currently the only way to configure the ForwardTo property is through the Management APIs.  There is currently no GUI that allows you to take care of this.  But, without having any private knowledge, I am sure that this is something that Microsoft will address in a future update.

Something else to be aware of is that BizTalk has no understanding of ForwardTo.  Nor would any other client of the Service Bus.  This “configuration” is occurring outside of client configurations which is the way it should be.  Any perceived complexities that exist should be abstracted away from systems that are communicating with Service Bus.

Saturday, November 10, 2012

BizTalk 2013 Beta: Azure Service Bus Integration–Dynamic Sent Ports

 

I started writing this blog post before the Beta was announced and it was probably a good thing that I did not publish it as I now have something else to discuss.  Microsoft has made an enhancement to the configuration that supports Dynamic Send Ports.  More on this later.

This blog post is really about 3.5 years in the making.  You are probably saying what, how is that possible?  BizTalk 2013, nor the Azure Service Bus(in its current form) did not even exist back then.  This is true but a colleague and myself were already thinking of a cloud based pattern to solve a ‘trading partner’ scenario that we were dealing with.

Business Problem

I work in the Utilities industry within a province in Canada.  We have a ‘deregulated’ market when it comes to Electricity and Natural Gas.  What this means is that the market is more open and it provides other opportunities for companies to participate within it.  This deregulation occurred over 10 years ago and the overall goal was to reduce prices to the end consumer.  (The jury is still out on that).

The market is primarily broken up into 4 segments:

  • Generation – Organizations who generate electricity via coal, gas, wind, solar etc
  • Transmission – High voltage Transmission of energy from Generation plants to Distribution Networks
  • Distribution – Organizations that take High voltage feeds and distribute low end voltage to customer’s homes and businesses.  These Distribution businesses are also responsible for capturing  Customers’ usage and providing this information to retailers.
  • Retailers – Are responsible for billing customers for their usage.  Customers can choose who their retailer is but cannot choose who their Distribution company is as it is broken up by Service territory due to the amount of capital investment required in order to provide infrastructure.

image

Note: Image is used for illustrative purposes only and does not necessarily reflect the actual number of companies involved.

As you can probably imagine, there is a need for information to be exchanged amongst these parties.  Amongst the Generation, Transmission and Distribution companies, each company must be aware of demand that their customers are requesting so that the Generation company can produce enough energy.  The Transmission and Distribution companies must also be able of supporting that load on their networks.

From a Distribution standpoint, the segment of the industry that I work in, we need to be aware of Customer management/Enrollment, Energizing and De-energizing customers,  Meter reading and providing consumption data to Retailers so they can bill the customers.

A few years ago the industry was looking to replace its Managed File Transfer solution that facilitated the exchange of this information.  Think of an FTP type application that had an Address book with some scheduling type activities.  This tool moved around flat files that were all based upon different transaction types.  The tool at the time suffered from major reliability issues

At the time, the Service Bus just wasn’t what it is today.  Also if you think there is a lot of skepticism  about the cloud today, it was much, much worse 3.5 years ago.  People were not ready for the cloud back then and as an industry we went a different direction, I thought it would be fun to revisit this problem and look at a way that we could solve this problem today using cutting edge tools like the Azure Service Bus and BizTalk Server 2013.

Within the context of this post we are going to focus on a fairly common problem and that is enrolling new customers.  As I previously mentioned, customers can choose their retailer.  Like most businesses retailers are very interested in on-boarding new customers with incentives.  When a customer does choose to move their business to a new retailer, this retailer needs to inform the Distribution provider that the customer has chosen to switch retailers.  The Distribution provider is then responsible for notifying the old retailer that this customer is now longer a customer of theirs and the Distributor is also responsible for confirming with the new retailer that this customer is now theirs.  From here on in, the Distribution company will send this customer’s consumption (Meter Reads) to the new retailer so that they can bill the customer.

This process is defined as a set of transactions known as the Enrollment transactions and is comprised of three messages:

  • SRR – Switch Retailer Request
  • SRN – Switch Retailer Confirmation for new Retailer
  • SRO – Switch Retailer Confirmation for old Retailer

Implementation

All participants within the market place have their own company identifier.  For this blog post I have created fictitious ids.  Our New Retailer will have an id of 1111, our Old Retailer will have an id of 2222 and our Distribution Company will have an id of 3333.These ids will map to Queues that have been created within the Service Bus.

  1. The New Retailer will send an SRR request to the Distribution Company’s queue called 3333.
  2. The Distribution Company will pull this message, via BizTalk,  off of its queue and communicate with its back end ERP system (not described within this blog post).
  3. BizTalk will then need to send a notification (SRN) back to the new Retailer confirming the enrollment.
  4. BizTalk will also need to send a notification (SRO) to the Old Retailer letting them know that they have lost a customer.

image

 

Service Bus Configuration

Within my Azure account I have created a new Namespace called Electricity and created 3 different queues:

  • 1111
  • 2222
  • 3333

image

.Net Retailer Clients

Similar to some of my other recent posts we are going to use the .Net Brokered Message API from client applications that will interact with BizTalk through the Service Bus.

Within our .Net Solution there are 3 projects:

  • DataContracts -  where we will find our message specifications for the SRR, SRN and SRO message types.
  • Retailer1111 – where we will find our New Retailer  functionality.  This project will Send the SRR and receive the SRN response.
  • Retailer2222 – where we will find our Old Retailer functionality.  This project will Receive the SRO confirmation.

image

DataContracts

The first class that we want to create represents the Switch Retailer Request (SRR).  Within this message we have properties for SiteID (Customer identifier), New Retailer ID, Old Retailer ID and the Enrollment Date (effective date).

Normally the New Retailer would not necessarily be aware of the Old Retailer ID but since I am leaving the ERP functionality out of the scope of this blog post we will assume that the New Retailer is aware of this information.

namespace DataContracts
{
    public class SwitchRetailerReqeust
    {
        public string SiteID {get;set;}
        public string NewRetailerID {get;set;}
        public string OldRetailerID { get; set; }
        public DateTime EnrollmentDate { get; set; }
    }
}

 

Next we will get into the Switch Retailer Notification (SRN).  This is the message that BizTalk will send to the New Retailer confirming that the customer is now theirs.

namespace DataContracts
{
    public class SwitchNewRetailerNotification
    {
        public string SiteID { get; set; }
        public string NewRetailerID { get; set; }
        public DateTime EnrollmentDate { get; set; }
     
    }
}

 

Finally, we have the Switch Retailer Notification (SRO) that needs to be sent to the Old Retailer letting them know that this customer is no longer theirs.

namespace DataContracts
{
    public class SwitchOldRetailerNotification
    {
        public string SiteID { get; set; }
        public string OldRetailerID { get; set; }
        public DateTime EnrollmentDate { get; set; }
    }
}


Retailer1111 Project

As previously stated, the purpose of this project is to send the SRR transaction to the Distribution company and then receive the corresponding SRN transaction back.

using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using DataContracts;
using System.Runtime.Serialization;
using System.IO;

namespace RetailerNew
{
    class Retailer1111
    {
   

            const string SendQueueName = "3333";
            const string ReceiveQueueName = "1111";
            const string ServiceNamespace = "<your_namespace>";
            const  string IssuerName ="owner";
            const string IssuerKey = "<your_key>”;

        static void Main(string[] args)
        {
            //***************************************************************************************************
            //                                   Get Credentials
            //***************************************************************************************************          
            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider  (Retailer1111.IssuerName, Retailer1111.IssuerKey);
            Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", Retailer1111.ServiceNamespace, string.Empty);

            MessagingFactory factory = null;

            try
            {
                //***************************************************************************************************
                //                                   Management Operations
                //***************************************************************************************************          
                NamespaceManager namespaceClient = new NamespaceManager(serviceUri, credentials);
                if (namespaceClient == null)
                {
                    Console.WriteLine("\nUnexpected Error: NamespaceManager is NULL");
                    return;
                }

                Console.WriteLine("\nChecking to see if Queue '{0}' exists...", Retailer1111.SendQueueName);

                // If Queue doesn't exist, then let's create it
                if (!namespaceClient.QueueExists(Retailer1111.SendQueueName))
                {
                    QueueDescription qd = new QueueDescription(Retailer1111.SendQueueName);
                   

                    namespaceClient.CreateQueue(qd);
                  
                }

                //***************************************************************************************************
                //                                   Runtime Operations
                //***************************************************************************************************
                factory = MessagingFactory.Create(serviceUri, credentials);

                QueueClient myQueueClient = factory.CreateQueueClient(Retailer1111.SendQueueName);

                //***************************************************************************************************
                //                                   Sending messages to a Queue
                //***************************************************************************************************
               

                Console.WriteLine("\nSending messages to Queue...");

                //Create a Switch Retailer Request
                SwitchRetailerRequest srr = new SwitchRetailerRequest();
                srr.SiteID = "3333123456789";
                srr.NewRetailerID = "1111";
                srr.OldRetailerID = "2222";
                srr.EnrollmentDate = DateTime.Now.AddDays(1);
          

     //Serialize the request so that BizTalk can process it correctly

                BrokeredMessage message = new BrokeredMessage(srr, new DataContractSerializer(typeof(SwitchRetailerRequest)));


                //Here we specify which URI we are expecting our response
                message.ReplyTo = serviceUri.AbsoluteUri + Retailer1111.ReceiveQueueName;
                 myQueueClient.Send(message);


                 //**************************************************************************************************
                 //                                   Receive messages from Queue
                 //**************************************************************************************************

                TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(
                    Retailer1111.IssuerName, Retailer1111.IssuerKey);
                Uri uri = ServiceBusEnvironment.CreateServiceUri("sb", Retailer1111.ServiceNamespace, string.Empty);
                MessagingFactory messagingFactory = MessagingFactory.Create(uri, tokenProvider);
                QueueClient qc = messagingFactory.CreateQueueClient(Retailer1111.ReceiveQueueName, ReceiveMode.ReceiveAndDelete);
                BrokeredMessage bm;
                while ((bm = qc.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 30))) != null)
                {
                    var data = bm.GetBody<SwitchNewRetailerNotification>(new DataContractSerializer(typeof(SwitchNewRetailerNotification)));
                    Console.WriteLine(String.Format("Customer with SiteID {0} has now been enrolled as of {1}", data.SiteID, data.EnrollmentDate.ToString() ));
                }


                Console.WriteLine("\nAfter running the entire sample, press ENTER to clean up and exit.");
                Console.ReadLine();
            }
            catch (Exception e)
            {
                Console.WriteLine("Unexpected exception {0}", e.ToString());
                throw;
            }
            finally
            {
                // Closing factory close all entities created from the factory.
                if(factory != null)
                    factory.Close();
            }
           
        }

    }

}

 

Probably the most interesting/significant line of code in there is where we set the Brokered Message ReplyTo property.  The reason why this code is significant is that BizTalk can use the value of this property to set our URI that our Dynamic Send port is going to use in order to send the response back out.  You will see how this is set in the BizTalk section.

//Here we specify which URI we are expecting our response
message.ReplyTo = serviceUri.AbsoluteUri + Retailer1111.ReceiveQueueName;

 

Retailer2222 Project

The purpose of this project is to retrieve the SRO notifications that occur when we lose a customer to another retailer.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using DataContracts;
using System.Runtime.Serialization;
using System.IO;

namespace RetailerOld
{
    class Retailer2222
    {

       
        const string ReceiveQueueName = "2222";
        const string ServiceNamespace = "<your_namespace>";
        const  string IssuerName ="owner";
        const string IssuerKey = "<your_key>";

        static void Main(string[] args)
        {

               try
               {

                //Create instance of tokenProvider using our credentials
                TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(
                    Retailer2222.IssuerName, Retailer2222.IssuerKey);
                Uri uri = ServiceBusEnvironment.CreateServiceUri("sb", Retailer2222.ServiceNamespace, string.Empty);
                MessagingFactory messagingFactory = MessagingFactory.Create(uri, tokenProvider);
                QueueClient qc = messagingFactory.CreateQueueClient(Retailer2222.ReceiveQueueName, ReceiveMode.ReceiveAndDelete);
                BrokeredMessage bm;


                //***************************************************************************************************
                //                                   Receive messages from Queue
                //***************************************************************************************************

                Console.WriteLine("About to connect to the Queue");
                while ((bm = qc.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 30))) != null)
                {
                    var data = bm.GetBody<SwitchOldRetailerNotification>(new DataContractSerializer(typeof(SwitchOldRetailerNotification)));
                    Console.WriteLine(String.Format("Customer with SiteID {0} is no longer our customr as of {1}", data.SiteID, data.EnrollmentDate.ToString() ));
                }

                Console.WriteLine("\nAfter running the entire sample, press ENTER to clean up and exit.");
                Console.ReadLine();
            }
            catch (Exception e)
            {
                Console.WriteLine("Unexpected exception {0}", e.ToString());
                throw;
            }
       
        }
}
}

 

BizTalk Project

Within this solution I am going to demonstrate what is required to send messages to Service Bus Queues using a dynamic send port.  I am going to use two slightly different approaches so keep your eyes open for that.

Schemas

I have created 3 schemas to match those classes that were included in our DataContracts project.  In order to create these schemas, I did use the Auto Generate Schemas feature that is included in Visual Studio (when you have BizTalk installed of course).  I discussed this approach in a previous post if you are interested in more details.

image

Maps

I have two very simple maps, the first one will transform an instance of our SRR message to an instance of our SRN message.

image

The second map will transform an instance of our SRR message to an instance of our SRO message.

image

Orchestration

Here is where most of the “magic” happens.  We have a receive shape where we will receive an instance of our SRR message.  We will then transform it and set some context properties, within a Message Assignment shape, so that we can use a Dynamic Send Port.  What is special about this Message Assignment shape is that we are going to use a Brokered Message property called ReplyTo as our URI.  So this is a pretty need “out of the box” feature that allows a client to dictate where we need to send a response message.

Even thought this ReplyTo property is beneficial, it only gets us part way.  We will need to provide a transport type and indicate that it is the new SB-Messaging adapter.  We also need to provide our Service Bus credentials and the URI to the Access Control Service.  It, of course, is a good idea to maintain these values in some sort of configuration store as opposed to hardcoding within our Orchestration.

image

Once we have sent out our SRN message back to the Service Bus Queue, we now need to process the SRO message.  We will once again leverage a map to instantiate this message in the absence of an ERP system were this message would ordinarily be created.  We will once again take advantage of Dynamic Send Ports but in this case we do not have the ReplyTo brokered message property because this retailer never sent us the message.  We will need to provide the base uri for the Service Bus but we can augment this by distinguishing the OldRetailerID node that is part of the SRO schema.  This will allow the solution to work for any retailer that needs to receive an SRO.

Much like the previous scenario we will need to specify a TransportType, our credentials and our Access Control Service URI.

image

When we are done putting these building blocks together we should have something that resembles the following.  We can now deploy our BizTalk application.

image

Receive Location

We will configure our Receive Location much like we did in my previous posts that discuss BizTalk integration with the Service Bus.

image

image

Send Ports 

As outlined in the Orchestration section of this post, we have two different Send Ports.  Part of the reason why I have included two is to demonstrate the following NEW features that has been included in the BizTalk 2013 Beta.  In previous BizTalk versions, if you used a Dynamic Send Port then it would automatically use the Adapter’s Default Send Handler to send out the message.  This isn’t ideal, especially in a shared environment where there can be many different groups or applications using this same Send Handler(Host).

Going forward we now have a Configure button that we can click within a Dynamic Send Port’s configuration.  When we do this a Configure Send Handler dialogue will appear that allows us to set different Send Handlers depending upon the Adapter.

image

Even within our own Application we can set a different Send Handler if you have more than 1 Dynamic Send Port.  For our second Dynamic Send Port I have specified BizTalkServerApplication as the Send Handler for this Dynamic Send Port.

There is probably not a really good reason to do this but the point I am trying to make is that we have gone from being very restricted to having a lot of flexibility.

image

Testing

We are now ready to test our application.  The steps that we need to take in order to do so are:

  • Run an instance of our Retailer1111 client.  This application will send the SRR to the Service Bus Queue where BizTalk will retrieve it.
  • BizTalk in turn will look for the ReplyTo property and send an SRN message back to the ServiceBus but this time to the the Retailer’s 1111 queue.
  • Next, BizTalk will generate an SRO message and send it to the old Retailer’s 2222 queue.

image

Conclusion

The point of this blog post was to take a real world problem and discuss how the combination of Azure Service Bus Queues and BizTalk 2013 can help us solve it.  Of course I had to over-simplify things to make this blog post some-what reasonable in length.  This also is not the only approach that we can take in order to solve this problem.  Using Azure Service Bus Topics is another option as well but I thought it was important to demonstrate how we can use the SB-Messaging context properties and Dynamic Send Ports.  I also felt it was a great opportunity to highlight the new Send Port Configuration for Dynamic Send Ports.

Monday, November 5, 2012

BizTalk 2010 R2 CTP meet BizTalk 2013 Beta

Microsoft released some exciting news on Monday for people who live and breathe Middleware.  They have released BizTalk 2010 R2 CTP’s successor: BizTalk 2013 Beta.  The new name signifies a major release and rightfully so, it does deserve it.

Back in June 2012, I was at TechEd North America where the BizTalk team provided a glimpse into the future of BizTalk.  At that point, I did feel that they were adding enough new functionality to warrant the full release but it was only until today that they officially announced the name change.

What is included in this release?

Many of the features that they did speak to at TechEd have been included in the CTP including:

  • Integration with Cloud Services via SB-Messaging Adapter
  • Support for RESTful Services (both Send and Receive)
  • Platform Support (Windows Server 2012, Visual Studio 2012 and SQL Server 2012)
  • Azure BizTalk VMs (IaaS)

The BizTalk team has now added some more features within the Beta including:

  • Enhanced SharePoint Adapter (No longer do we need to install an agent(web service) on the SharePoint Server)
  • Secure FTP (SFTP) Adapter
  • ESB Toolkit Integration
  • Dependency Tracking (The dependencies between artifacts can now be viewed and navigated in Admin console)
  • Improvements to Dynamic Send Ports (ability to set host handler per adapter, instead of always using the default send handler of the adapters)

After I discovered the Microsoft post I sent it off to my team and a lively discussion was held.   There was a bit of a debate over which of the feature we can benefit from the most.  The interesting thing is that we can benefit from each of these new features. 

SharePoint Adapter

We do a lot of work with SharePoint and in addition to running a multi-node BizTalk farm we also have a multi-node SharePoint farm.  The BizTalk team does not like installing the Adapter Web Service on the SharePoint Web Front ends so you can imagine that the SharePoint team isn’t a big fan of it either.  To make things worse, try to perform an upgrade from BizTalk 2009 to BizTalk 2010 and ask a SharePoint person to come in on the weekend to assist with the install.  Not going to make many friends that way.

Secure FTP (SFTP) Adapter

It is great to see a native SFTP adapter being included “in the box”. Back in BizTalk 2009, Microsoft provided an FTPS adapter but that protocol is not nearly as pervasive as SFTP, in my opinion anyways. As you may have seen on my blog previously, I do have several posts about a 3rd party SFTP adapter.  The adapter has been a pretty good adapter for us.  Yes there have been a few bugs, but the vendor has always provided top notch support.  However, it is a pain to deal with another vendor, pay another invoice and get license keys. 

ESB Toolkit Integration

We don’t do much with ESB itineraries but we do heavily leverage the Exception Management Portal that is included in the toolkit.  We have made some small modifications, to the exception portal,  and many of our Business Units rely upon it for Business level exceptions that occur within the Middleware or other systems like SAP.  There are many opportunities for improvement and the installation is certainly one of them.  So even though the description on the Microsoft link is rather vague, I am really hoping for a seamless experience this time around.

Dependency Tracking

We have a fairly large BizTalk environment.  It is something that we have been building upon for the last 7 years.  I can confidently say that our Business runs on top of BizTalk.  If BizTalk doesn’t function, there are widespread impacts to our employees and customers.  A Vice President was asking me about BizTalk and he was purely amazed that it supported so many Business processes within the company.  With that said, we try to leverage common components and leverage previous investments.  Overtime this can be difficult to manage so this feature is definitely welcomed.

Improvements to Dynamic Send Ports

We do have a few different scenarios where we need to determine the route that a message will take at run-time and has been a bit of a pain that these messages will always be processed by the default send handler for that Adapter.  It will be nice to have some more granularity.

RESTful Services

Looking back there have been a few different integration scenarios where could have benefited by a ‘lighter weight’ service construct.  I don’t expect this to be our de facto standard when exposing services but I do recognize the benefit in having a more flexible option than SOAP.

Integrations with Cloud Services via SB-Messaging Adapter

If you have been following my blog recently, I have quite a few posts (Post 1, Post 2, Post 3, Post 4 and Post 5)  on the new SB-Messaging Adapter which allows us to connect Service Bus Queues and Topics.  I actually have a blog post currently in the works with the CTP version but I will now save it and implement it with the new Beta. So far I have been very impressed with this new capability.  It is very easy to get this to work.  It will be this adapter that allows for many(but not all) hybrid integration scenarios with on-premise Line of Business (LOB) systems.  I fully expect BizTalk PaaS to participate in LOB integration scenarios but for those customers who have heavily invested in BizTalk, leveraging the SB-Messaging adapter has many benefits.

Azure VM BizTalk IaaS

I haven’t actually had a chance to try this out.  We do a lot of On-Premise integration so it just hasn’t been a big of a priority for me. I do recognize the value for some people though.  It creates a much lower barrier of entry for some organizations and allows them to get their “feet wet” with BizTalk before investing more significant capital in a larger environment.

Host Integration Services 2013 Beta

In addition to all of these BizTalk features a Beta of Host Integration Services (HIS) has also been released.  This version includes the following updates:
• Data Integration – Several key updates in integration with DB2.
• Application Integration – Updates to Transaction Integrator designer, tools and accounting.
• Network Integration – Direct connectivity from Server/Client to IBM mainframe systems using fully managed TN3270E runtime.
• Message Integration – WCF Channel Support for WebSphere MQ v7.5 and v7.1.
• Platform Support – Introducing the support for Window Server 2012, Visual Studio 2012, SQL Server 2012, BizTalk 2013 and IBM Systems.

Conclusion

From what I have seen with the BizTalk 2010 R2 CTP, the BizTalk team has been working very hard on this next release of BizTalk.  I think they are already demonstrating that the end product will be very solid as they have put together two compelling “pre-releases” of BizTalk.   I really encourage you to download the new bits and take the new BizTalk for a spin.  There are a lot of great features included.

Saturday, October 20, 2012

BizTalk 2010 R2 CTP: Azure Service Bus Integration–Part 5 Sending messages to Service Bus Queues using Sessions

 

What are Service Bus Sessions?

Service Bus Sessions are actually a rather broad subject as there are a few different scenarios in which they can be used.  At its simplest description I consider Service Bus Sessions to be a way to relate messages together.  More specifically here are a few ways in which they can be used:

  • To address the Maximum message size constraint.  Service Bus Queues can support messages that have a size of 256 KB or smaller.  Using Sessions allow us to break a larger message down into smaller messages and then send them over the wire.  A consumer, or receiver, can then receive all of these message “chunks” and aggregate them together.
  • To support receiving a related set of messages in First In First Out (FIFO) fashion
  • Allows for affinity between a consumer and a Service Bus Queue in competing consumer scenarios.  Imagine having 3 consuming clients all trying to receive messages from the same Service Bus Queue.  Under normal circumstances you cannot be assured that one receiver will receive all messages within a message set.  One can expect the messages to be distributed amongst the clients as each consumer “competes” for the right to process a particular message.  In this scenario, once a receiver has started to process a message within a session, that consumer will process all messages within that session barring some sort of application crash.
  • In some scenarios, using a Service Bus Session allows for routing of messages.  Within a receiver, you can specify an explicit Session that you are interested in.  So in some ways a Session can be used almost like a filter.  I am not suggesting that this approach be used instead of Service Bus Topics/Subscriptions, but there may be a specific business requirement to do this.

Why are Service Bus Sessions important in BizTalk processing?

BizTalk deals with a variety of different messaging scenarios in many different industry verticals.  Supporting Service Bus Sessions is just another tool in the the BizTalk toolbox for supporting new requirements.  A scenario that I came up with is dispatching messages.  For instance if we wanted to load up a field worker with all of his orders, wouldn’t it be nice to have all of his orders sent to him as a batch?  As opposed to him receiving some of his orders only to receive more orders later on.  For instance he may have driven by one of his customers already because the messages that he receive were out of order and other field workers were also receiving their orders which delayed him in receiving all of his.

image

Putting it together – Modifying Service Bus Queue

A pre-requisite for this type of messaging scenario to work is configuring our Service Bus Queue to support Sessions.  This can be enabled in a couple different ways:

  • When creating a queue from within the Azure Portal, we can check the Enable sessions checkbox.

image

  • When using the QueueDescription class we can set the RequiresSession property to true.

NamespaceManager namespaceClient = new NamespaceManager(serviceUri, credentials);

if (!namespaceClient.QueueExists(Sender.QueueName))
        {

            QueueDescription queueDescription = new QueueDescription(Sender.QueueName)
            {
                RequiresSession = true
            };
            namespaceClient.CreateQueue(queueDescription);
          
        }

BizTalk Configuration

In order to keep the solution very simple, we will create:

  • Two Receive Ports
  • A Receive Location for each Receive Port.  The purpose of these Receive Locations is to simply inject messages into BizTalk so that we can set the SessionID property on the Send Ports.
  • 2 Send Ports
    • One for Mike
    • One for Joe
  • Each Send Port will have a filter for the corresponding Receive Port.  Within each Send Port we will configure a SessionID .

image

The other Send Port will use the same URI, however it will have a different SessionID value which will be Joe.

image

 

Service Bus Queue Client

The code below will make a connection to our session-ful Service Bus Queue and retrieve all messages that have a SessionId of Mike.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System.Runtime.Serialization;
using BrokeredMessageToBizTalk;

namespace RetrieveServiceBusSession
{
    class Receiver
    {

        const string QueueName = "<your_sessionful_queue>";
        static string ServiceNamespace = "<your_namespace>";
        static string IssuerName = "<your_issuerName>";
        static string IssuerKey = "<your_IssuerKey>";
        static void Main(string[] args)
        {
            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(Receiver.IssuerName, Receiver.IssuerKey);
            Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", Receiver.ServiceNamespace, string.Empty);

            MessagingFactory factory = null;

            factory = MessagingFactory.Create(serviceUri, credentials);

            QueueClient sessionQueueClient = factory.CreateQueueClient(Receiver.QueueName);

            //Create sessionQueueClient and subscribe to SessionIDs that have a value of "Mike"
            MessageSession sessionReceiver = sessionQueueClient.AcceptMessageSession("Mike", TimeSpan.FromSeconds(60));
            BrokeredMessage receivedMessage;

                while ((receivedMessage = sessionReceiver.Receive(TimeSpan.FromSeconds(60))) != null)
                {
                    var data = receivedMessage.GetBody<PowerOut>(new DataContractSerializer(typeof(PowerOut)));
                    Console.WriteLine(String.Format("Customer Name: {0}", data.CustomerName));
                    Console.WriteLine("SessionID: {0}", receivedMessage.SessionId);
                    //remove message from Topic
                    receivedMessage.Complete();
                }
          
            Console.WriteLine("All received on this session...press enter to exit");
            Console.Read();
        }
    }
}

 

The code itself is very similar to that of some of my previous blog posts on ServiceBus integration.  The main difference is instantiating a MessionSession object.

MessageSession sessionReceiver = sessionQueueClient.AcceptMessageSession("Mike", TimeSpan.FromSeconds(60));

Within this line of we are indicating that we want to receive messages that belong to the Mike Session.  We can also provide a TimeSpan as an argument to specify the duration in which we want to receive from this Session.  Setting this value is more useful when we are looking for any available Session as it allows all messages within a Session to be processed before moving onto the next Session.

Testing

I have two sets of messages here.  Two of the messages will be routed through the Joe Send Port and subsequently the SessionID for these two messages will be set to Joe. The other two messages will be routed through the Mike Send Port and subsequently will have its SessionID property set to  Mike.

image

As mentioned previously, both Send Ports are configured to send to the same Service Bus Queue.  When we do run our client application, the expectation is that messages belonging to the Mike Session will be retrieved.  The Joe Messages will remain in the Queue until another receiver pulls them down or the the Time To Live (TTL) threshold has been exceeded.

When we start our Consumer application we do discover that the “Mike” messages are processed.

image

So what happened to the other messages?

The “Joe” messages are still in our Queue.  If we navigate to our Windows Azure Portal, we will discover that our Queue Length is set to 2.

image

So how do we get these messages out?

We have a couple options, we can create another MessageSession instance and retrieve all Messages belonging to the Mike Session or we can not specify a Session and our client will look for the next available Session which in this case will be the Mike Session.

Let’s go with the second option and retrieve the next available session.  In order to do so we need to change the following line of code from

  MessageSession sessionReceiver = sessionQueueClient.AcceptMessageSession("Mike", TimeSpan.FromSeconds(60));

to

  MessageSession sessionReceiver = sessionQueueClient.AcceptMessageSession(TimeSpan.FromSeconds(60));

We essentially are no longer specifying a specific Session that we are interested in and are now interested in any Session.

I will now process another 4 files; 2 will belong to the Joe Session and 2 will belong to the Mike Session.  What we expect to happen is that all 4 Joe messages will be processed since it is the next available Session.

image

So this proves that we have In Order Delivery occurring at the Session level.  Initially our Mike Session was processed which left our Joe messages outstanding.  We then loaded 4 more messages to the Queue and since the Joe messages were first in, they were processed first.  Our remaining 2 messages that now belong to Mike can be retrieved by starting up our Client Application once again.

image

Note:

Within my BizTalk Send Ports I statically configured my SessionID.  This isn’t very practical in the “Real World” but it was easy to demonstrate for the purpose of this blog post.  Much like other BizTalk context properties the SessionID property is available and can be set dynamically within an Orchestration Message Assignment shape or a Pipeline Component.

image

Conclusion

Overall I found this functionality pretty neat. I do think that it is another capability that we can leverage to support more granular control over message processing.  I do like the opportunity to group messages together and treat them as a batch.  This also works when dealing with message size limitations as we can stich a bunch of smaller messages together that collective make up a large message.