Home .NET Core and RabbitMQ
Post
Cancel

.NET Core and RabbitMQ

.NET Core and RabbitMQ

Having a system which is composed by distributed applications is a great idea, but a way to communicate with each other is required. A very popular architecture is the so called MDA or Message Driven Architecture, where a system is composed from autonomous components that communicate with each other via messages. The part which facilitates communication is the message broker, effectively decoupling applications, which don’t communicate directly, rather they publish messages to the message broker and the latter is responsible to forward them to interested parties, i.e. other applications.

A message broker that is particularly powerful and interesting is RabbitMQ, one of the most popular open source tools for that job, used worldwide by large enterprises to small startups.

In this post, I’m going to explore RabbitMQ’s basics, by creating a simple RabbitMQ producer and consumer in .NET Core with C#. Then, I will take discussion to how AMQP works at a low-level, using the code example demonstrated.

Creating a producer and consumer application in .NET Core

I am going to create a producer application, which publishes messages to the broker and also a consumer application, which consumes messages from a RabbitMQ queue.

One fine way to create a producer and a consumer application in .NET Core is to use Hosted Services. A hosted service is a background task, which comes in three flavors:

  1. Scheduled

  2. Long running task

  3. Queued

We’re not going to be bothered with #1 and #3 in this post. I am going to create a hosted service for the producer application first. This hosted service is going to connect to RabbitMQ, declare an exchange, a queue and bind them. Every 20 seconds, the producer is going to publish a message to the queue.

Regarding the consumer, again, I am going to create a hosted service, which also is going to connect to RabbitMQ, declare an exchange (the same exchange as previously), a queue (the same queue) and bind them (same binding). As this is a simple consumer application which just logs the consumed messages.

Connecting to RabbitMQ

First thing first, install the RabbitMQ.Client nuget package, this is an implementation of the AMQP 0–9–1 client library for C#.

In order to connect to RabbitMQ, the client library provides a ConnectionFactory class, which API contains a method called CreateConnection. This allows the client to connect to RabbitMQ and also perform the rest of the required steps which are

  1. Open a channel between the client and the broker

  2. Declare an exchange

  3. Declare a queue

  4. Bind the queue to the exchange

To create a connection factory it’s pretty simple, you just need the RabbitMQ connection string and voila! The connection string below follows this format:

amqp://{username}:{password}@{host}:{port}/{virtual_host}

I’ve omitted rest of registrations as they are not important on this demonstration. Be advised, I’m going to create two different client applications, so I’ve created a connection factory on both of them. As a note, the producer configuration is slightly different, due to the fact that my producers are asynchronous. More on that later.

Now back to the really important bit. I’ve created an abstract base class for connecting to RabbitMQ and then added that to a common class library. Reason is I want to reuse the same logic across my producer and consumer applications. All consumers and producers are going to derive from that base class.

On breaking down the above, on ConnectToRabbitMq method, I’m first looking if that connection is already established, if yes then no reason to open this again.

Next, I’m doing the same for Channel. To open a channel, I’m using the CreateModel method on the connection object I’ve created earlier from the factory.

Finally, I declare an exchange with the ExchangeDeclare method with some arguments, which are briefly discussed below.

  • The exchange name.

  • The type of the exchange. I’ve chosen direct, but there are other types as well, like Fanout, Topic and Headers

  • Durability of the exchange. Means that the exchange will not be deleted if RabbitMQ restarts.

  • Auto delete instruction, which makes the exchange automatically delete if all of its queues unbind.

I’ve also declared a queue, with the following arguments.

  • The queue name.

  • Durability of the queue. This is whether the queue will survive when RabbitMQ restarts.

  • Exclusivity of the queue. This tells the queue to be used by only one connection and automatically delete when that connection closes.

  • Auto delete instruction. Means that the queue, given that it had at least one consumer, will be deleted when all of its consumers unsubcribe.

And then I’ve bidden the queue to the exchange with the following arguments.

  • The queue name

  • The exchange name

  • The routing key. Incoming messages will have their own routing keys, the exchange will use the routing key on its binding to evaluate incoming messages and deliver to the appropriate queue.

The producer

For the producer application I have created an empty ASP.NET Core project, with a class that that implements BackgroundService. This is a long running hosted service and it’s responsibility is to publish a message to the broker queue every 20 seconds. As long as the task is not cancelled it’s going to run forever. Let’s look at that first.

The LogIntegrationEvent is the message carried from one application to the other with RabbitMq acting as a medium between them. As you can see below it’s just a class with couple of properties and nothing special about it.

What is particularly interesting though is the producer. Let’s see first producer’s contract which just exposes a Publish method. Each producer publishes its own type of message, so it’s essential to make the publish method generic in order to publish a variety of messages.

Next, I’ve created an abstract base producer class, which contains all required logic to publish a message. I can implement different producers by deriving from this class. Driver’s BasicPublish method issues an RPC command to publish a message on that channel. RabbitMQ requires to know to which exchange you publish that message and what is the routing key. By having these two, the broker can determine where to deliver that message.

Finally, I’ve created a LogProducer, which derives from the ProducerBase class. I’ve also provided the exchange name and routing key required for that particular producer. Using that base class I can create any producer, as long as I can provide information on the exchange, the routing key and the generic parameter, which is the integration event, a.k.a. the message.

The consumer

To consume the messages that are delivered by the publisher above, I need a consumer application to consume from an existing queue. I’ve created an ASP.NET application with a HostedService, in which I perform the actions described above. The following is the log queue specific consumer.

This is a simple background task and to make one I just need to implement the IHostedService interface.

This consumer handles asynchronous code, so I had to create one using the AsyncEventingBasicConsumer class, passing the current open channel as an argument. To make the channel consume messages from a specific queue, I need to call the BasicConsume method and provide the queue name and the consumer object as arguments. The autoAck argument tells the channel whether to automatically return an acknowledgment back to the broker. This is important, as it tells the broker that the message is indeed consumed and doesn’t need to be in the queue anymore, effectively dequeued. In this scenario, I’ve decided to sent an ack manually back to RabbitMQ.

Please note, it is required to set the DispatchConsumersAsync property to true on connection factory initialization, if any of the consumers is of type AsyncEventingBasicConsumer.

Finally, I need a way to handle the incoming messages, so for this I’m handling the Received event on the consumer. This event is invoked only when new messages enqueue on the target queue. The implementation lies on the following base class.

I’ve serialized the body, which comes in a binary format, to the appropriate command, which is just a plain C# class. Notice also that I’m using the MediatR library, which implements the mediator pattern beautifully and it’s particularly useful here. I’ve created a command handler for the LogCommand, which runs some business code when the command is send to the mediator. This instantly decouples the consumers and the business logic required for each incoming message.

To create a command, a plain C# class is needed, with few properties of course to store the data. Plus it’s required to implement the IRequest interface from MediatR library.

Finally, I’ve created a command handler which implements the IRequestHandler interface. Notice, that the generic parameter is the command class I’ve created earlier, this is to tell MediatR which command this handler is going to handle. In the Handle method, I’m logging the incoming message from RabbitMQ.

Running the applications with docker-compose

The code is ready, it’s time to start the applications and consume the messages. I love docker because it makes things so easy for a developer, with one command I’m spinning up both consumer and producer applications and I also start a RabbitMQ server. Imagine the pain if no docker ever existed, I would have to run each application separately and install RabbitMQ on my machine. Ugly.

Following is the docker-compose file to run my applications, which only contains definitions for the images. The override file contains specific definitions for each container. This is very useful if you want to have different definitions for local, development and production environments for example.

Following is the override file, which is pretty standard. However one thing that stands out is the depends_on definition on each ASP.NET application. This tells the containers to not start until RabbitMQ container enters a healthy state.

I’m using v2.4 on docker-compose as in 3.x depends_on is deprecated.

To run the apps, I’m running the following command on the root folder.

It works pretty well, I can see messages being consumed (open full screen to see the logs).

Why?

Why all these steps? How it works behind the scenes? I am interested to know how RabbitMQ communicates with the AMQP protocol on a low-level.

Let’s look at AMQP and how this dance between the broker and client unfolds on the next post.

Full code of the above example can be found on my GitHub repository.


If you liked this blog, please like and share! For more, follow me on Twitter.

This post is licensed under CC BY 4.0 by the author.

.NET Core and RabbitMQ Part 2 - Communication via AMQP

How to Build a RabbitMQ Cluster on Raspberry Pi

Comments powered by Disqus.