Building a Scalable Kafka Consumer in .NET Core Without Duplicates: A Complete Guide

A simple change can avoid duplicates in Kafka

DotNet Full Stack Dev
5 min readOct 20, 2024

In a world where real-time data processing is critical, Kafka serves as a robust, distributed event-streaming platform. For .NET developers, building an efficient Kafka consumer that scales well and processes messages without duplicates can be tricky. Especially when scaling your application horizontally — by adding more replicas — you might encounter duplicate messages, even with proper offset handling. But fear not! In this blog, we’ll dive into how to effectively tackle this issue by leveraging best practices with Confluent Kafka in .NET Core, while ensuring your system remains scalable, reliable, and performant.

Whether you’re scaling your consumer instances to improve throughput or simply looking to optimize the message processing workflow, this blog will help you navigate the common pitfalls of Kafka consumer rebalancing, offset management, and potential race conditions.

📌Explore more at: https://dotnet-fullstack-dev.blogspot.com/
🌟 Clapping and/or sharing would be appreciated! 🚀

The Problem: Duplicate Messages When Scaling Kafka Consumers

When you scale your Kafka consumers to handle higher loads (e.g., increasing replicas in a microservices architecture), you might notice something disturbing: duplicate messages. Even with proper offset management, Kafka can sometimes reassign partitions to different consumers, resulting in the same message being processed twice.

Let’s break down the possible culprits:

  • Consumer Group Rebalancing: When scaling your consumer group by adding more instances (replicas), Kafka redistributes partitions among the consumers. If offsets aren’t committed at the right time, a new consumer might reprocess messages.
  • Race Conditions on Offset Storage: In multi-threaded environments, improperly handling asynchronous message processing and offset commits can lead to race conditions, causing the same message to be processed multiple times.

Solution Overview

To resolve this, you need to:

  1. Handle Consumer Group Rebalancing properly to avoid reprocessing messages during scaling events.
  2. Synchronously commit message offsets after processing to ensure the correct state is stored in Kafka.
  3. Ensure Idempotency of your message processing logic to handle duplicates gracefully (in case of failure or retries).

Let’s dive into how to apply these concepts in your Kafka consumer using .NET Core.

Step 1: Understanding Kafka Consumer Offsets

Offsets are the backbone of Kafka message processing. They represent the position of a message within a Kafka partition. If offsets are not stored or committed correctly, Kafka will assume messages have not been processed and resend them.

There are two common approaches for offset management:

  • Auto-Commit Offsets: Kafka automatically commits offsets at regular intervals. This is simple, but introduces risks of message reprocessing if the consumer crashes before the auto-commit interval is reached.
  • Manual Offset Commit: The consumer manually commits offsets after processing each message. This gives you more control but requires careful synchronization.

To ensure that no duplicate messages are processed when scaling Kafka consumers, we need to manually commit offsets after processing each message. This avoids the race conditions that could occur with auto-committed offsets.

Step 2: The Core of the Solution — Synchronous Offset Commit

Let’s take a look at a common pattern for processing Kafka messages in .NET Core using Confluent Kafka:

try
{
var consumeResult = consumer.Consume(cancellationToken);

// Process the message asynchronously
await Task.Run(() => ProcessMessage(consumeResult.Message));

// Synchronously commit the offset after processing
consumer.Commit(consumeResult); // This commits the offset immediately to Kafka
}
catch (ConsumeException ex)
{
Console.WriteLine($"Error occurred: {ex.Error.Reason}");
}

Why Synchronous Commit?

Using Commit(consumeResult) ensures that the offset is synchronously written to Kafka after the message is successfully processed. This eliminates the risk of a message being processed but not acknowledged, which could lead to the same message being processed by another consumer after a rebalance.

Step 3: Handle Consumer Rebalancing Gracefully

Rebalancing is one of Kafka’s core features for fault tolerance and scalability, but it can cause issues if your consumer is not prepared to handle it properly.

The Problem

When Kafka rebalances a consumer group, it pauses all consumer instances, reassigns partitions, and then resumes the consumers. If your consumer is in the middle of processing a message but hasn’t committed the offset yet, the same message might be picked up by another consumer after the rebalance.

The Solution: Pausing During Rebalance

You can subscribe to the OnPartitionsRevoked and OnPartitionsAssigned events to ensure that your consumer is paused during rebalancing and resumes processing only after the partitions are reassigned:

consumer.OnPartitionsRevoked += (sender, partitions) =>
{
Console.WriteLine($"Partitions revoked: [{string.Join(", ", partitions)}]");
consumer.Pause(partitions);
};

consumer.OnPartitionsAssigned += (sender, partitions) =>
{
Console.WriteLine($"Partitions assigned: [{string.Join(", ", partitions)}]");
consumer.Resume(partitions);
};

By doing this, you ensure that no message is processed during the rebalance, reducing the risk of message duplication.

Step 4: Optimize Performance — Batch Offset Commits

Committing the offset after each message provides reliability but can introduce some performance overhead, especially in high-throughput environments. To mitigate this, you can batch the offset commits.

Batching Offset Commits

Instead of committing the offset after every message, you can commit after processing a certain number of messages or after a time interval. This reduces the number of network requests to Kafka and improves throughput:

int messageCounter = 0;

try
{
var consumeResult = consumer.Consume(cancellationToken);

await Task.Run(() => ProcessMessage(consumeResult.Message));

messageCounter++;

// Commit after processing 10 messages
if (messageCounter % 10 == 0)
{
consumer.Commit(consumeResult);
messageCounter = 0; // Reset the counter after commit
}
}
catch (ConsumeException ex)
{
Console.WriteLine($"Error occurred: {ex.Error.Reason}");
}

This approach strikes a balance between performance and reliability. If the consumer fails after processing 9 messages, only those uncommitted messages will be reprocessed.

Step 5: Ensure Idempotency

Even with the best offset management strategies, there is always a possibility of duplicate messages due to unforeseen failures. Thus, idempotency is key.

What Is Idempotency?

Idempotency means that processing the same message multiple times will not result in incorrect behavior. For instance, if your consumer inserts records into a database, you could use a unique constraint or check if the message has already been processed based on a unique identifier.

bool IsMessageProcessed(string messageId)
{
// Check if the message has already been processed (e.g., via database or cache)
return processedMessages.Contains(messageId);
}

void ProcessMessage(Message<string, string> message)
{
if (!IsMessageProcessed(message.Key))
{
// Process the message
Console.WriteLine($"Processing message with key: {message.Key}");
// Mark the message as processed
processedMessages.Add(message.Key);
}
}

By ensuring idempotent message processing, you can safely handle duplicates without causing issues in your system.

Final Thoughts: Building a Reliable and Scalable Kafka Consumer

Building a Kafka consumer that scales horizontally while avoiding duplicate message processing is essential for robust real-time data pipelines. By synchronously committing offsets after processing, handling consumer rebalancing, batching offset commits for performance, and ensuring idempotent message processing, you can create a reliable Kafka consumer in .NET Core.

Engage with Us! What challenges have you faced with Kafka consumer scaling? How do you handle message processing in your distributed systems? Share your thoughts and solutions in the comments below!

--

--

DotNet Full Stack Dev
DotNet Full Stack Dev

Written by DotNet Full Stack Dev

Join me to master .NET Full Stack Development & boost your skills by 1% daily with insights, examples, and techniques! https://dotnet-fullstack-dev.blogspot.com

No responses yet