Event Streaming and Messaging in .NET Microservices with Kafka, Zookeeper, RabbitMQ

DotNet Full Stack Dev
4 min readJul 29, 2024

--

In a microservices architecture, services often need to communicate with each other to ensure data consistency and trigger actions across different parts of the system. Event streaming and messaging are two popular techniques for achieving this inter-service communication. In this blog post, we will explore how to implement event streaming and messaging in .NET microservices using a product and order service example.

Embark on a journey of continuous learning and exploration with DotNet-FullStack-Dev. Uncover more by visiting our https://dotnet-fullstack-dev.blogspot.com reach out for further information.

Event streaming and messaging are techniques used to enable asynchronous communication between services.

  • Event Streaming: This involves publishing events (changes in state) to a stream, which can then be consumed by multiple services.
  • Messaging: This involves sending messages between services via a message broker. Messages can be commands or events, and they can be sent to one or many recipients.

Setting Up the Project

We will set up two microservices: ProductService and OrderService. We’ll use Kafka as the event streaming platform and RabbitMQ as the messaging broker.

Implementing Event Streaming with Kafka

Step 1: Setting Up Kafka

First, ensure Kafka and Zookeeper are installed and running. You can use Docker to run Kafka and Zookeeper.

docker run -d --name zookeeper -p 2181:2181 zookeeper
docker run -d --name kafka -p 9092:9092 --link zookeeper wurstmeister/kafka

Step 2: Installing Kafka NuGet Packages

Add the necessary NuGet packages to your .NET projects.

dotnet add package Confluent.Kafka

Step 3: Creating ProductService

Product.cs

public class Product
{
public int Id { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}

KafkaProducer.cs

using Confluent.Kafka;
using System;
using System.Threading.Tasks;

public class KafkaProducer
{
private readonly IProducer<Null, string> _producer;

public KafkaProducer(string brokerList)
{
var config = new ProducerConfig { BootstrapServers = brokerList };
_producer = new ProducerBuilder<Null, string>(config).Build();
}

public async Task ProduceAsync(string topic, string message)
{
try
{
var dr = await _producer.ProduceAsync(topic, new Message<Null, string> { Value = message });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}

ProductsController.cs

[ApiController]
[Route("api/[controller]")]
public class ProductsController : ControllerBase
{
private readonly KafkaProducer _kafkaProducer;

public ProductsController(KafkaProducer kafkaProducer)
{
_kafkaProducer = kafkaProducer;
}

[HttpPost]
public async Task<IActionResult> CreateProduct([FromBody] Product product)
{
// Save product to database (omitted for brevity)

// Publish event to Kafka
var productEvent = JsonConvert.SerializeObject(product);
await _kafkaProducer.ProduceAsync("product-events", productEvent);

return Ok(product);
}
}

Step 4: Creating OrderService

KafkaConsumer.cs

using Confluent.Kafka;
using System;
using System.Threading;

public class KafkaConsumer
{
private readonly IConsumer<Null, string> _consumer;

public KafkaConsumer(string brokerList, string topic, string groupId)
{
var config = new ConsumerConfig
{
GroupId = groupId,
BootstrapServers = brokerList,
AutoOffsetReset = AutoOffsetReset.Earliest
};

_consumer = new ConsumerBuilder<Null, string>(config).Build();
_consumer.Subscribe(topic);
}

public void Consume(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
var cr = _consumer.Consume(cancellationToken);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");

// Handle the event (e.g., update order details)
}
}
catch (OperationCanceledException)
{
_consumer.Close();
}
}
}

OrderService.cs

public class OrderService
{
private readonly KafkaConsumer _kafkaConsumer;

public OrderService(KafkaConsumer kafkaConsumer)
{
_kafkaConsumer = kafkaConsumer;
}

public void StartConsuming(CancellationToken cancellationToken)
{
_kafkaConsumer.Consume(cancellationToken);
}
}

Implementing Messaging with RabbitMQ

Step 1: Setting Up RabbitMQ

Ensure RabbitMQ is installed and running. You can use Docker to run RabbitMQ.

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Step 2: Installing RabbitMQ NuGet Packages

Add the necessary NuGet packages to your .NET projects.

dotnet add package RabbitMQ.Client

Step 3: Creating ProductService

RabbitMQProducer.cs

using RabbitMQ.Client;
using System;
using System.Text;

public class RabbitMQProducer
{
private readonly IConnection _connection;
private readonly IModel _channel;

public RabbitMQProducer(string hostname)
{
var factory = new ConnectionFactory { HostName = hostname };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "product-queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
}

public void SendMessage(string message)
{
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: "", routingKey: "product-queue", basicProperties: null, body: body);
Console.WriteLine($"[x] Sent {message}");
}
}

ProductsController.cs

[ApiController]
[Route("api/[controller]")]
public class ProductsController : ControllerBase
{
private readonly RabbitMQProducer _rabbitMQProducer;

public ProductsController(RabbitMQProducer rabbitMQProducer)
{
_rabbitMQProducer = rabbitMQProducer;
}

[HttpPost]
public IActionResult CreateProduct([FromBody] Product product)
{
// Save product to database (omitted for brevity)

// Publish message to RabbitMQ
var productMessage = JsonConvert.SerializeObject(product);
_rabbitMQProducer.SendMessage(productMessage);

return Ok(product);
}
}

Step 4: Creating OrderService

RabbitMQConsumer.cs

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

public class RabbitMQConsumer
{
private readonly IConnection _connection;
private readonly IModel _channel;

public RabbitMQConsumer(string hostname)
{
var factory = new ConnectionFactory { HostName = hostname };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "product-queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
}

public void StartListening()
{
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[x] Received {message}");

// Handle the message (e.g., update order details)
};
_channel.BasicConsume(queue: "product-queue", autoAck: true, consumer: consumer);
}
}

OrderService.cs

public class OrderService
{
private readonly RabbitMQConsumer _rabbitMQConsumer;

public OrderService(RabbitMQConsumer rabbitMQConsumer)
{
_rabbitMQConsumer = rabbitMQConsumer;
}

public void StartConsuming()
{
_rabbitMQConsumer.StartListening();
}
}

Conclusion

Event streaming and messaging are powerful techniques for enabling asynchronous communication between microservices. Kafka and RabbitMQ are popular tools for implementing these patterns. In this blog post, we demonstrated how to set up event streaming with Kafka and messaging with RabbitMQ in .NET microservices using product and order services. By using these techniques, you can build scalable and resilient microservices that can handle complex inter-service communication.

You may also like : https://medium.com/@siva.veeravarapu/rabbitmq-vs-kafka-a-detailed-comparison-with-c-code-snippets-038d6baf8c1a

--

--

DotNet Full Stack Dev
DotNet Full Stack Dev

Written by DotNet Full Stack Dev

Join me to master .NET Full Stack Development & boost your skills by 1% daily with insights, examples, and techniques! https://dotnet-fullstack-dev.blogspot.com

No responses yet