Pooja Singh

Pooja Singh

  • 2.1k
  • 9
  • 388

Manually committing offsets in Kafka

May 20 2021 12:25 AM
Hi,
 
I have a list of offsets with their corresponding partition and I need to commit them manually.
 
To do so I am looping through the list and assigning partition to the consumer and then seeking to a particular offset.
 
then I am consuming the message and passing the ConsumerBulider to commit method.
 
Sometimes it executes smoothly but sometimes it throws "Local:Waiting for Coordinator" exception.
 
But in both the cases , when I try consuming messages afterwards I re-consume the same series of messages I already have committed or should I say I tried committing. Which means I never really could commit them :(
  1. foreach(var item in cmdparamslist)  
  2. {  
  3. Partition p = new Partition(Int16.Parse(item.PartitionID));  
  4. TopicPartition tp = new TopicPartition(configuration.GetSection("KafkaSettings").GetSection("Topic").Value, p);  
  5. Offset o = new Offset(long.Parse(item.Offset));  
  6. TopicPartitionOffset tpo = new TopicPartitionOffset(tp,o);  
  7. //Ltpo.Add(tpo);  
  8. try  
  9. {  
  10. KafkaConsumer.Assign(tpo);  
  11. await Task.Delay(TimeSpan.FromSeconds(1));  
  12. KafkaConsumer.Seek(tpo);  
  13. var cr = KafkaConsumer.Consume(cts.Token);  
  14. try  
  15. {  
  16. KafkaConsumer.Commit(cr);  
  17. }  
  18. catch (TopicPartitionOffsetException e1)  
  19. {  
  20. Console.WriteLine("exception "+e);  
  21. }  
  22. catch (KafkaException e)  
  23. {  
  24. Console.WriteLine("exception "+e);  
  25. }  
  26. }  
  27. catch (KafkaException e)  
  28. {  
  29. Console.WriteLine("exception "+e);  
  30. }  
  31. }  
  32. KafkaConsumer.Close();  
  33. }  
  34. catch(Exception e)  
  35. {  
  36. Console.WriteLine("exception "+e);  
  37. }  
  38. }  
  39.   
  40. Consumer / Client configuration:  
  41. var conf = new ConsumerConfig  
  42. {  
  43. GroupId = Guid.NewGuid().ToString(),  
  44. BootstrapServers = configuration.GetSection("KafkaSettings").GetSection("RemoteServers").Value,  
  45. AutoOffsetReset = AutoOffsetReset.Earliest,  
  46. SaslMechanism = SaslMechanism.Gssapi,  
  47. SecurityProtocol = SecurityProtocol.SaslPlaintext,  
  48. EnableAutoCommit = false  
  49. //EnableAutoOffsetStore = false  
  50. };
I am using Confluent.Kafka 1.6.2 version
 
Could someone please help me ?