Rabbit

.NET Core and RabbitMQ

Having a system which is composed by distributed applications is a great idea, but a way to communicate with each other is required. A very popular architecture is the so called MDA or Message Driven Architecture, where a system is composed from autonomous components that communicate with each other via messages. The part which facilitates communication is the message broker, effectively decoupling applications, which don’t communicate directly, rather they publish messages to the message broker and the latter is responsible to forward them to interested parties, i.e. other applications.

A message broker that is particularly powerful and interesting is RabbitMQ, one of the most popular open source tools for that job, used worldwide by large enterprises to small startups.

In this post, I’m going to explore RabbitMQ’s basics, by creating a simple RabbitMQ producer and consumer in .NET Core with C#. Then, I will take discussion to how AMQP works at a low-level, using the code example demonstrated.

Creating a producer and consumer application in .NET Core

I am going to create a producer application, which publishes messages to the broker and also a consumer application, which consumes messages from a RabbitMQ queue.

One fine way to create a producer and a consumer application in .NET Core is to use Hosted Services. A hosted service is a background task, which comes in three flavors:

  1. Scheduled
  2. Long running task
  3. Queued

We’re not going to be bothered with #1 and #3 in this post. I am going to create a hosted service for the producer application first. This hosted service is going to connect to RabbitMQ, declare an exchange, a queue and bind them. Every 20 seconds, the producer is going to publish a message to the queue.

Regarding the consumer, again, I am going to create a hosted service, which also is going to connect to RabbitMQ, declare an exchange (the same exchange as previously), a queue (the same queue) and bind them (same binding). As this is a simple consumer application which just logs the consumed messages.

Connecting to RabbitMQ

First thing first, install the RabbitMQ.Client nuget package, this is an implementation of the AMQP 0-9-1 client library for C#.

In order to connect to RabbitMQ, the client library provides a ConnectionFactory class, which API contains a method called CreateConnection. This allows the client to connect to RabbitMQ and also perform the rest of the required steps which are

  1. Open a channel between the client and the broker
  2. Declare an exchange
  3. Declare a queue
  4. Bind the queue to the exchange

To create a connection factory it’s pretty simple, you just need the RabbitMQ connection string and voila! The connection string below follows this format:

amqp://{username}:{password}@{host}:{port}/{virtual_host}

I’ve omitted rest of registrations as they are not important on this demonstration. Be advised, I’m going to create two different client applications, so I’ve created a connection factory on both of them. As a note, the producer configuration is slightly different, due to the fact that my producers are asynchronous. More on that later.

public void ConfigureServices(IServiceCollection services)
{
services
// other registrations omitted for brevity
.AddSingleton(serviceProvider =>
{
var uri = new Uri("amqp://guest:guest@rabbit:5672/CUSTOM_HOST");
return new ConnectionFactory
{
Uri = uri
};
});
}
view raw Startup.cs hosted with ❤ by GitHub

Now back to the really important bit. I’ve created an abstract base class for connecting to RabbitMQ and then added that to a common class library. Reason is I want to reuse the same logic across my producer and consumer applications. All consumers and producers are going to derive from that base class.

public abstract class RabbitMqClientBase : IDisposable
{
protected const string VirtualHost = "CUSTOM_HOST";
protected readonly string LoggerExchange = $"{VirtualHost}.LoggerExchange";
protected readonly string LoggerQueue = $"{VirtualHost}.log.message";
protected const string LoggerQueueAndExchangeRoutingKey = "log.message";
protected IModel Channel { get; private set; }
private IConnection _connection;
private readonly ConnectionFactory _connectionFactory;
private readonly ILogger<RabbitMqClientBase> _logger;
protected RabbitMqClientBase(
ConnectionFactory connectionFactory,
ILogger<RabbitMqClientBase> logger)
{
_connectionFactory = connectionFactory;
_logger = logger;
ConnectToRabbitMq();
}
private void ConnectToRabbitMq()
{
if (_connection == null || _connection.IsOpen == false)
{
_connection = _connectionFactory.CreateConnection();
}
if (Channel == null || Channel.IsOpen == false)
{
Channel = _connection.CreateModel();
Channel.ExchangeDeclare(
exchange: LoggerExchange,
type: "direct",
durable: true,
autoDelete: false);
Channel.QueueDeclare(
queue: LoggerQueue,
durable: false,
exclusive: false,
autoDelete: false);
Channel.QueueBind(
queue: LoggerQueue,
exchange: LoggerExchange,
routingKey: LoggerQueueAndExchangeRoutingKey);
}
}
public void Dispose()
{
try
{
Channel?.Close();
Channel?.Dispose();
Channel = null;
_connection?.Close();
_connection?.Dispose();
_connection = null;
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Cannot dispose RabbitMQ channel or connection");
}
}
}

On breaking down the above, on ConnectToRabbitMq method, I’m first looking if that connection is already established, if yes then no reason to open this again.

Next, I’m doing the same for Channel. To open a channel, I’m using the CreateModel method on the connection object I’ve created earlier from the factory.

Finally, I declare an exchange with the ExchangeDeclare method with some arguments, which are briefly discussed below.

  • The exchange name.
  • The type of the exchange. I’ve chosen direct, but there are other types as well.
    • Fanout
    • Topic
    • Headers
  • Durability of the exchange. Means that the exchange will not be deleted if RabbitMQ restarts.
  • Auto delete instruction, which makes the exchange automatically delete if all of its queues unbind.

I’ve also declared a queue, with the following arguments.

  • The queue name.
  • Durability of the queue. This is whether the queue will survive when RabbitMQ restarts.
  • Exclusivity of the queue. This tells the queue to be used by only one connection and automatically delete when that connection closes.
  • Auto delete instruction. Means that the queue, given that it had at least one consumer, will be deleted when all of its consumers unsubcribe.

And then I’ve bidden the queue to the exchange with the following arguments.

  • The queue name
  • The exchange name
  • The routing key. Incoming messages will have their own routing keys, the exchange will use the routing key on its binding to evaluate incoming messages and deliver to the appropriate queue.

The producer

For the producer application I have created an empty ASP.NET Core project, with a class that that implements BackgroundService. This is a long running hosted service and it’s responsibility is to publish a message to the broker queue every 20 seconds. As long as the task is not cancelled it’s going to run forever. Let’s look at that first.

public class LogBackgroundTask : BackgroundService
{
private readonly IRabbitMqProducer<LogIntegrationEvent> _producer;
public LogBackgroundTask(IRabbitMqProducer<LogIntegrationEvent> producer) => _producer = producer;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var @event = new LogIntegrationEvent
{
Id = Guid.NewGuid(),
Message = $"Hello! Message generated at {DateTime.Now.ToString("O")}"
};
_producer.Publish(@event);
await Task.Delay(20000, stoppingToken);
}
await Task.CompletedTask;
}
}
view raw LogBackgroundTask.cs hosted with ❤ by GitHub

The LogIntegrationEvent is the message carried from one application to the other with RabbitMq acting as a medium between them. As you can see below it’s just a class with couple of properties and nothing special about it.

public class LogIntegrationEvent
{
public Guid Id { get; set; }
public string Message { get; set; }
}

What is particularly interesting though is the producer. Let’s see first producer’s contract which just exposes a Publish method. Each producer publishes its own type of message, so it’s essential to make the publish method generic in order to publish a variety of messages.

public interface IRabbitMqProducer<in T>
{
void Publish(T @event);
}
view raw IRabbitMqProducer.cs hosted with ❤ by GitHub

Next, I’ve created an abstract base producer class, which contains all required logic to publish a message. I can implement different producers by deriving from this class. Driver’s BasicPublish method issues an RPC command to publish a message on that channel. RabbitMQ requires to know to which exchange you publish that message and what is the routing key. By having these two, the broker can determine where to deliver that message.

public abstract class ProducerBase<T> : RabbitMqClientBase, IRabbitMqProducer<T>
{
private readonly ILogger<ProducerBase<T>> _logger;
protected abstract string ExchangeName { get; }
protected abstract string RoutingKeyName { get; }
protected abstract string AppId { get; }
protected ProducerBase(
ConnectionFactory connectionFactory,
ILogger<RabbitMqClientBase> logger,
ILogger<ProducerBase<T>> producerBaseLogger) :
base(connectionFactory, logger) => _logger = producerBaseLogger;
public virtual void Publish(T @event)
{
try
{
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event));
var properties = Channel.CreateBasicProperties();
properties.AppId = AppId;
properties.ContentType = "application/json";
properties.DeliveryMode = 1; // Doesn't persist to disk
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
Channel.BasicPublish(exchange: ExchangeName, routingKey: RoutingKeyName, body: body, basicProperties: properties);
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Error while publishing");
}
}
}
view raw ProducerBase.cs hosted with ❤ by GitHub

Finally, I’ve created a LogProducer, which derives from the ProducerBase class. I’ve also provided the exchange name and routing key required for that particular producer. Using that base class I can create any producer, as long as I can provide information on the exchange, the routing key and the generic parameter, which is the integration event, a.k.a. the message.

public class LogProducer : ProducerBase<LogIntegrationEvent>
{
public LogProducer(
ConnectionFactory connectionFactory,
ILogger<RabbitMqClientBase> logger,
ILogger<ProducerBase<LogIntegrationEvent>> producerBaseLogger) :
base(connectionFactory, logger, producerBaseLogger)
{
}
protected override string ExchangeName => "CUSTOM_HOST.LoggerExchange";
protected override string RoutingKeyName => "log.message";
protected override string AppId => "LogProducer";
}
view raw LogProducer.cs hosted with ❤ by GitHub

The consumer

To consume the messages that are delivered by the publisher above, I need a consumer application to consume from an existing queue. I’ve created an ASP.NET application with a HostedService, in which I perform the actions described above. The following is the log queue specific consumer.

public class LogConsumer : ConsumerBase, IHostedService
{
protected override string QueueName => "CUSTOM_HOST.log.message";
public LogConsumer(
IMediator mediator,
ConnectionFactory connectionFactory,
ILogger<LogConsumer> logConsumerLogger,
ILogger<ConsumerBase> consumerLogger,
ILogger<RabbitMqClientBase> logger) :
base(mediator, connectionFactory, consumerLogger, logger)
{
try
{
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += OnEventReceived<LogCommand>;
Channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
}
catch (Exception ex)
{
logConsumerLogger.LogCritical(ex, "Error while consuming message");
}
}
public virtual Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public virtual Task StopAsync(CancellationToken cancellationToken)
{
Dispose();
return Task.CompletedTask;
}
}
view raw LogConsumer.cs hosted with ❤ by GitHub

This is a simple background task and to make one I just need to implement the IHostedService interface.

This consumer handles asynchronous code, so I had to create one using the AsyncEventingBasicConsumer class, passing the current open channel as an argument. To make the channel consume messages from a specific queue, I need to call the BasicConsume method and provide the queue name and the consumer object as arguments. The autoAck argument tells the channel whether to automatically return an acknowledgment back to the broker. This is important, as it tells the broker that the message is indeed consumed and doesn’t need to be in the queue anymore, effectively dequeued. In this scenario, I’ve decided to sent an ack manually back to RabbitMQ.

Please note, it is required to set the DispatchConsumersAsync property to true on connection factory initialization, if any of the consumers is of type AsyncEventingBasicConsumer.

public void ConfigureServices(IServiceCollection services)
{
services
// omitted for brevity
.AddSingleton(serviceProvider =>
{
var uri = new Uri("amqp://guest:guest@rabbit:5672/CUSTOM_HOST");
return new ConnectionFactory
{
Uri = uri,
DispatchConsumersAsync = true
};
});
}
view raw Startup.cs hosted with ❤ by GitHub

Finally, I need a way to handle the incoming messages, so for this I’m handling the Received event on the consumer. This event is invoked only when new messages enqueue on the target queue. The implementation lies on the following base class.

public abstract class ConsumerBase : RabbitMqClientBase
{
private readonly IMediator _mediator;
private readonly ILogger<ConsumerBase> _logger;
protected abstract string QueueName { get; }
public ConsumerBase(
IMediator mediator,
ConnectionFactory connectionFactory,
ILogger<ConsumerBase> consumerLogger,
ILogger<RabbitMqClientBase> logger) :
base(connectionFactory, logger)
{
_mediator = mediator;
_logger = consumerLogger;
}
protected virtual async Task OnEventReceived<T>(object sender, BasicDeliverEventArgs @event)
{
try
{
var body = Encoding.UTF8.GetString(@event.Body.ToArray());
var message = JsonConvert.DeserializeObject<T>(body);
await _mediator.Send(message);
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Error while retrieving message from queue.");
}
finally
{
Channel.BasicAck(@event.DeliveryTag, false);
}
}
}
view raw ConsumerBase.cs hosted with ❤ by GitHub

I’ve serialized the body, which comes in a binary format, to the appropriate command, which is just a plain C# class. Notice also that I’m using the MediatR library, which implements the mediator pattern beautifully and it’s particularly useful here. I’ve created a command handler for the LogCommand, which runs some business code when the command is send to the mediator. This instantly decouples the consumers and the business logic required for each incoming message.

To create a command, a plain C# class is needed, with few properties of course to store the data. Plus it’s required to implement the IRequest interface from MediatR library.

public class LogCommand : IRequest<Unit>
{
public Guid Id { get; set; }
public string Message { get; set; }
}
view raw LogCommand.cs hosted with ❤ by GitHub

Finally, I’ve created a command handler which implements the IRequestHandler interface. Notice, that the generic parameter is the command class I’ve created earlier, this is to tell MediatR which command this handler is going to handle. In the Handle method, I’m logging the incoming message from RabbitMQ.

public class LogCommandHandler : IRequestHandler<LogCommand>
{
private readonly ILogger<LogCommandHandler> _logger;
public LogCommandHandler(ILogger<LogCommandHandler> logger) => _logger = logger;
public Task<Unit> Handle(LogCommand request, CancellationToken cancellationToken)
{
_logger.LogInformation("---- Received message: {Message} ----", request.Message);
return Task.FromResult(Unit.Value);
}
}
view raw LogCommandHandler.cs hosted with ❤ by GitHub

Running the applications with docker-compose

The code is ready, it’s time to start the applications and consume the messages. I love docker because it makes things so easy for a developer, with one command I’m spinning up both consumer and producer applications and I also start a RabbitMQ server. Imagine the pain if no docker ever existed, I would have to run each application separately and install RabbitMQ on my machine. Ugly.

Following is the docker-compose file to run my applications, which only contains definitions for the images. The override file contains specific definitions for each container. This is very useful if you want to have different definitions for local, development and production environments for example.

version: '2.4'
services:
tcpdump:
image: kaazing/tcpdump
rabbit:
image: rabbitmq:3-management-alpine
rmq.application.producer:
image: ${DOCKER_REGISTRY-}rmqapplicationproducer
build:
context: .
dockerfile: src/Rmq.Application.Producer/Dockerfile
rmq.application.consumer:
image: ${DOCKER_REGISTRY-}rmqapplicationconsumer
build:
context: .
dockerfile: src/Rmq.Application.Consumer/Dockerfile
view raw docker-compose.yml hosted with ❤ by GitHub

Following is the override file, which is pretty standard. However one thing that stands out is the depends_on definition on each ASP.NET application. This tells the containers to not start until RabbitMQ container enters a healthy state.

I’m using v2.4 on docker-compose as in 3.x depends_on is deprecated.

version: '2.4'
services:
tcpdump:
network_mode: "host"
volumes:
- ./tcpdump:/tcpdump
rabbit:
restart: always
ports:
- "15672:15672"
- "5672:5672"
healthcheck:
test: [ "CMD", "nc", "-z", "rabbit", "5672" ]
interval: 30s
timeout: 10s
retries: 5
environment:
- RABBITMQ_DEFAULT_VHOST=CUSTOM_HOST
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
rmq.application.producer:
restart: on-failure
environment:
- ASPNETCORE_ENVIRONMENT=Development
- ASPNETCORE_URLS=https://+:443;http://+:80
- ASPNETCORE_HTTPS_PORT=44303
ports:
- "56719:80"
- "44303:443"
volumes:
- ${APPDATA}/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro
- ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro
depends_on:
rabbit:
condition: service_healthy
rmq.application.consumer:
restart: on-failure
environment:
- ASPNETCORE_ENVIRONMENT=Development
- ASPNETCORE_URLS=https://+:443;http://+:80
- ASPNETCORE_HTTPS_PORT=44303
ports:
- "56720:80"
- "44304:443"
volumes:
- ${APPDATA}/Microsoft/UserSecrets:/root/.microsoft/usersecrets:ro
- ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro
depends_on:
rabbit:
condition: service_healthy

To run the apps, I’m running the following command on the root folder.

docker-compose up --build --force-recreate
view raw docker-compose.sh hosted with ❤ by GitHub

It works pretty well, I can see messages being consumed (open full screen to see the logs).

Why?

Why all these steps? How it works behind the scenes? I am interested to know how RabbitMQ communicates with the AMQP protocol on a low-level.

Let’s look at AMQP and how this dance between the broker and client unfolds on the next post.

Full code of the above example can be found on my GitHub repository.


If you liked this blog, please like and share! For more, follow me on Twitter.