Reading Kafka Topic Periodically (T hours) and Converting Topic JSON to CSV in.Net

DotNet Full Stack Dev
2 min readMar 13, 2024

--

In this comprehensive guide, we’ll walk through the process of reading messages from a Kafka topic at regular 12-hour intervals, extracting only the values from the JSON messages (excluding column names), converting them to CSV format, and appending them to a CSV file. We’ll provide detailed C# code snippets for each step, enabling you to implement this solution effectively.

Embark on a journey of continuous learning and exploration with DotNet-FullStack-Dev. Uncover more by visiting our https://dotnet-fullstack-dev.blogspot.com reach out for further information.

Step 1: Setting Up Kafka Consumer

First, let’s set up a Kafka consumer in our C# application to read messages from the Kafka topic.

Kafka Consumer Configuration:

var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "json-to-csv-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("json-topic");

while (true)
{
var consumeResult = consumer.Consume();
var jsonMessage = consumeResult.Message.Value;

// Process JSON message...
}
}

Step 2: JSON to CSV Conversion

Next, let’s implement logic to extract only the values from the incoming JSON message and convert them to CSV format.

JSON to CSV Conversion Logic:

public string ConvertJsonValuesToCsv(string jsonMessage)
{
var jsonObject = JObject.Parse(jsonMessage);

// Extract data values from JSON object
var csvData = new StringBuilder();

foreach (var property in jsonObject.Properties())
{
csvData.Append(property.Value);
csvData.Append(",");
}

// Remove trailing comma and append newline
csvData.Remove(csvData.Length - 1, 1);
csvData.AppendLine();

return csvData.ToString();
}

Step 3: Writing CSV Data to File

Now, let’s implement code to write the CSV data (values only) to a file, appending it to the existing contents.

Writing CSV Data to File:

public void AppendCsvToFile(string csvData, string filePath)
{
using (var writer = new StreamWriter(filePath, true))
{
writer.Write(csvData);
}
}

Step 4: Periodic Execution Every 12 Hours

Finally, let’s ensure that the Kafka topic is read and the JSON to CSV conversion process occurs every 12 hours.

Periodic Execution Logic:

public void ExecutePeriodically(Action action, TimeSpan interval)
{
Task.Factory.StartNew(async () =>
{
while (true)
{
action();
await Task.Delay(interval);
}
}, TaskCreationOptions.LongRunning);
}

// Call ExecutePeriodically with Kafka consumer and conversion logic
ExecutePeriodically(() =>
{
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("json-topic");

while (true)
{
var consumeResult = consumer.Consume();
var jsonMessage = consumeResult.Message.Value;

var csvData = ConvertJsonValuesToCsv(jsonMessage);
AppendCsvToFile(csvData, "output.csv");
}
}
}, TimeSpan.FromHours(12));

Conclusion

By following these steps and implementing the provided C# code snippets, you can create a robust solution for reading messages from a Kafka topic at regular 12-hour intervals, extracting only the values from the JSON messages, converting them to CSV format, and appending them to a CSV file.

This approach enables efficient data processing and storage, facilitating analysis and insights generation from the collected data. Whether you’re handling large-scale data streams or implementing periodic data export tasks, this guide equips you with the knowledge and tools to accomplish your objectives effectively.

Happy coding!

--

--

DotNet Full Stack Dev
DotNet Full Stack Dev

Written by DotNet Full Stack Dev

Join me to master .NET Full Stack Development & boost your skills by 1% daily with insights, examples, and techniques! https://dotnet-fullstack-dev.blogspot.com

No responses yet