Kafka
PM> Install-Package Shuttle.Esb.Kafka
Configuration
The URI structure is kafka://configuration-name/queue-name.
c#
services.AddKafka(builder =>
{
var kafkaOptions = new KafkaOptions
{
BootstrapServers = "localhost:9092",
ReplicationFactor = 1,
NumPartitions = 1,
MessageSendMaxRetries = 3,
RetryBackoff = TimeSpan.FromSeconds(1),
EnableAutoCommit = false,
EnableAutoOffsetStore = false,
FlushEnqueue = false,
UseCancellationToken = true,
ConsumeTimeout = TimeSpan.FromSeconds(30),
OperationTimeout = TimeSpan.FromSeconds(30),
ConnectionsMaxIdle = TimeSpan.Zero,
Acks = Acks.All,
EnableIdempotence = true
};
kafkaOptions.ConfigureConsumer += (sender, args) =>
{
Console.WriteLine($"[event] : ConfigureConsumer / Uri = '{((IQueue)sender).Uri}'");
};
kafkaOptions.ConfigureProducer += (sender, args) =>
{
Console.WriteLine($"[event] : ConfigureProducer / Uri = '{((IQueue)sender).Uri}'");
};
builder.AddOptions("local", kafkaOptions);
});
The ConfigureConsumer event args arugment exposes the ConsumerConfig directly for any specific options that need to be set. Similarly, the ConfigureProducer event args arugment exposes the ProducerConfig.
The default JSON settings structure is as follows:
json
{
"Shuttle": {
"Kafka": {
"local": {
"BootstrapServers": "localhost:9092",
"ReplicationFactor": 1,
"NumPartitions": 1,
"MessageSendMaxRetries": 3,
"RetryBackoff": "00:00:01",
"EnableAutoCommit": false,
"EnableAutoOffsetStore": false,
"FlushEnqueue": false,
"UseCancellationToken": true,
"ConsumeTimeout": "00:00:30",
"OperationTimeout": "00:00:30",
"ConnectionsMaxIdle": "00:00:00",
"Acks": "All",
"EnableIdempotence": true,
}
}
}
}
Options
| Option | Default | Description |
|---|---|---|
BootstrapServers | Initial list of brokers as a CSV list of broker host or host:port. | |
ReplicationFactor | 1 | The replication factor for the new topic or -1 (the default) if a replica assignment is specified instead. |
NumPartitions | 1 | The number of partitions for the new topic or -1 (the default) if a replica assignment is specified. |
MessageSendMaxRetries | 3 | How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true. |
RetryBackoff | "00:00:01" | The backoff time before retrying a protocol request. |
EnableAutoCommit | false | Automatically and periodically commit offsets in the background. |
EnableAutoOffsetStore | false | Automatically store offset of last message provided to application. |
FlushEnqueue | false | If true will call Flush on the producer after a message has been enqueued. |
UseCancellationToken | true | Indicates whether a cancellation token is used for relevant methods. |
ConsumeTimeout | "00:00:30" | The duration to poll for messages before returning null, when the cancellation token is not used. |
OperationTimeout | "00:00:30" | The duration to wait for relevant async methods to complete before timing out. |
ConnectionsMaxIdle | "00:00:00" | Close broker connections after the specified time of inactivity. |
Acks | "All" | This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request. |
EnableIdempotence | true | When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. |