Kafka and .NET - Part 3 - Finally at .NET
It has taken us 2 seemingly unrelated posts to get here but we finally made it to the point where we can actually run .NET and interact with Kafka. We need to create two basic programs: one that writes to Kafka and another that reads from it. There is a Kafka client for .NET available in nuget and that’s where our story will start.
C# Advent
This post is one among many which is part of 2021’s C# Advent. There are a ton of really great posts this year and some new bloggers to discover. I’d strongly encourage you to check it out.Producer
So let’s start with the producer program which will write to a Kafka topic. I’m using .NET 6 because it is awesome. So either from Visual Studio or from the command line create a new console application and add the package Confluent.Kafka
to it. Now in the main Program.cs file you can include a couple of namespaces.
using Confluent.Kafka;
using System.Net;
The first is the Kafka namespace and the second is our Net namespace for reasons that will become clear shortly.
Next we need a config. In our case it looks like
var config = new ProducerConfig{
BootstrapServers = "localhost:29092",
ClientId = Dns.GetHostName()
};
The two important things here are the bootstrap servers and the client id. The bootstrap servers are the servers that the Kafka client will connect to. We just have one here but in a real production environment you’d likely want several in there. If one of the servers fails then the client can fall back to others to get information about the cluster. The client id is a unique identifier for the client. It is used to identify the client to the Kafka cluster. Here we’re using the host name of the machine, but in production you might want to add some additional information to that to uniquely identify multiple clients running on one machine.
Now we write a message to the topic
using(var producer = new ProducerBuilder<Null, string>(config).Build())
{
try
{
var dr = await producer.ProduceAsync("user-added", new Message<Null, string> { Value = $"Sent message at {DateTime.Now}" });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
The important parts here are the topic we’re delivering to, user-added
and the message we’re sending which here is just a map of a value to a string.
You should now be able to run run this program. Each new run will add a message to the topic.
> dotnet run
Delivered 'Sent message at 12/15/2021 7:49:20 AM' to 'user-added [[0]] @0'
> dotnet run
Delivered 'Sent message at 12/15/2021 7:49:26 AM' to 'user-added [[0]] @1'
> dotnet run
Delivered 'Sent message at 12/15/2021 7:49:31 AM' to 'user-added [[0]] @2'
Now let’s flip over and look at consuming.
Consumer
This is another command line application. It looks a bit longer but isn’t too bad. Again we start with pulling in the namespace and setting up a config.
using Confluent.Kafka;
var config = new ConsumerConfig
{
BootstrapServers = "localhost:29092",
GroupId = "consumer",
AutoOffsetReset = AutoOffsetReset.Earliest
};
The two new things here are the GroupId and AutoOffsetReset. The GroupId is the id of the consumer group. This is used to identify the consumer to the Kafka cluster. The AutoOffsetReset is used to tell the consumer how to start reading from the topic. In this case we’re using the earliest offset which means that the consumer will start reading from the first message in the topic. The other option is the latest offset which means that the consumer will start reading from the last message in the topic.
Next we need some plumbing just for our example. This will allow us to ctrl-c out of the consumer so it doesn’t run forever which is the default behaviour.
CancellationTokenSource source = new CancellationTokenSource();
var keepConsuming = true;
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
keepConsuming = false;
Console.WriteLine("Stopping...");
source.Cancel();
};
With that out of the way we can actually consume messages. Again there is a little bit of plumbing here around cancellation but you can ignore that.
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
try
{
consumer.Subscribe("user-added");
while (keepConsuming)
{
try
{
var consumeResult = consumer.Consume(source.Token);
Console.WriteLine($"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
//process in here
consumer.Commit(consumeResult);
Console.WriteLine("Committed offset.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occurred: {e.Error.Reason}");
}
catch (OperationCanceledException e)
{
Console.WriteLine($"Consumption failed: {e}");
}
finally
{
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation canceled.");
}
finally
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}
In here we set up a consumer, subscribe to the user-added
topic and then consume the messages. This line here:
var consumeResult = consumer.Consume(source.Token);
Is a blocking call and will wait for new messages on the topic forever. When one comes in from the producer we’ll print that message out. Finally we mark the message has having been consumed by the consumer group. You can set up the .NET client to automatically commit the offset to the Kafka cluster by setting the AutoCommit
property to true
. However this is not recommended by me as it will cause the consumer to commit the offset on a background thread at unpredictable intervals.
Running this will get you something like
> dotnet run
Consumed message 'Sent message at 12/15/2021 7:49:20 AM' at: 'user-added [[0]] @0'.
Committed offset.
Consumed message 'Sent message at 12/15/2021 7:49:26 AM' at: 'user-added [[0]] @1'.
Committed offset.
Consumed message 'Sent message at 12/15/2021 7:49:31 AM' at: 'user-added [[0]] @2'.
Committed offset.
Pretty nifty!
In the next entry we’ll get a bit further into how we can use Kafka to solve some real world problems.