March 4, 2016 · Cloud Computing kafka

Kafka Consumer Offset Management

Apache Kafka is the leading data landing platform. It is the de-facto standard for collecting and then streaming data to different systems. It is also used a filter system in many cases where messages from a topic are read and then put on a different topic after processing, much like unix pipes. It is a messaging/queuing system with at-least-once delivery semantics.

Kafka is different from other messaging system in that it delegates offset management to consumers. Generally, messaging systems maintain a counter of where a consumer was while reading from a topic. By moving offset management to each client, it simplifies processing and reduces complexity (compared to other exactly-once delivery semantic systems). More details on the philosophy behind Kafka can be found at its website.

Kafka works with topics, each topic is divided into one or more partitions. Messages in each partitions are ordered. When a consumer reads a topic, it actually reads data from all of the partitions. Obviously, Kafka provides means to read data from a single partition if client wanted. As a consumer reads data from a partition, it advances its offset, this offset when stored becomes an acknowledgement. Kafka does not have traditional message acknowleddgements.

Consumer Offset Management

With Kafka, consumers have different options to manage topic offsets:

As we can see, offset management is a topic getting much attention in Kafka. This is because seldom does a consumer just reads messages from a topic. Most of the times we have:

These requirements does not go well with the kafka's definition of 'consumed'. Kafka considers a messages consumed as soon as it was given to the consumer. In most situation, 'consumed' would be defined differently. That is why applications seek greater control on offset management.

To complicate matters further, Kafka (< 0.9.0) has two consumer clients: A Highlevel Consumer and another called a Simple Consumer. Simple consumer is anything but simple. With Kafka 0.9.0 though, there is a new consumer API that replaces both the previous APIs. Based on the client you use, certain configuration parameters behave differently.

Configuration Parameters

Before we move ahead and look each type of offset management, there are a few configuration switches that behave peculiarly and some others are important to understand.

auto.offset.reset

This configuration parameter behaves differently for Highlevel and Simple consumers.

A. If you are working with Highlevel Consumer, reading from a topic foo and you have:

B. If you are working with Simple Consumer, reading from a topic foo then for every run of the consumer it check auto.offset.reset value and starts reading messages accordingly. The consumer will have to manage offsets itself and can ask Kafka to read from any available position.

Offsets in Zookeeper

Storing offsets in zookeeper was the default behaviour in Kafka < 0.8.1 for the high level consumer. Zookeeper may become a bottleneck under heavy load. Storing offsets in zookeeper is highly non-advisable.

Still, with auto.commit.enable set to false, one could manually control when offsets are committed thus allowing to define 'consumed'. commitOffsets API allows a HLC to commit offsets at a time of it choosing BUT it commits offsets for all partitions. It does not allow control over which partition offsets to commit. Which may be fine for straightforward applications. For applications that want to run multiple threads doing processing on messages, this single offset commit may not work.

Changing Offset would require manually changing offsets in zookeeper.

With Kafka 0.9.0, the new consumer takes a map with specific offsets to be committed. The API methods are called commitSync and commitAsync. This release also adds a commitOffsets method to the old highlevel consumer API

Offsets in Kafka

With 0.8.2, kafka allowed to store offsets inside a kafaka topic named __consumer_offsets. This is one of the most recommended methods of storing offsets. This method relies on Offset Management API. This API stores offsets in Kafka by default. For a highlevel consumer that wants to store offsets in zookeeper but still use offset management API, it should change offset.storage to zookeeper.

For a Simple consumer that wishes to store offsets in zookeeper but use this API, it needs to store offsets manually in zookeeper directly.

Offsets elsewhere

Storing offsets anywhere else would simply mean using Simple Consumer for Kafka <0.9.0. This means writing more code and managing Kafka internals. But by storing offsets outside Kafka and zookeeper, application need not use Offset management API. One can store offsets on S3, local files, ftp locations etc.

Changing offsets would require just using a different offset than what is stored in the offset store.

Offsets in Kafka 0.9.0

With 0.9.0, Kafka has done away with two different consumers. There is only one consumer API which simplifies offset management greatly. Detailed java code example here.

This new API still allows to commit offsets automatically but also allows to commit manually with control over what to commit for each partition.

Changing offsets with new API is easy with seek, seekToBeginning and seekToEnd methods.

What is the bottom line?

Going with the new consumer API would be an easy choice. The amount of code one writes is less, clean and simple.

Regardless of which method an application goes with, the fundamental question of when a message is considered 'consumed' remains and forces the application towards at-least-once delivery. It needs to take care of situations when a messages was considered consumed, stored in Database or stored in Hadoop or Stored on local file, and when offsets were stored. There may be a failure in between message storage and offset storage. And when a failure happens between those two events, we have stale offsets in our store of choice. So next time, applications starts from that stale state, which introduces duplicate messages. Which is why at-least-once delivery.

Making message and offset storage atomic in any combination of techniques above would give us exactly-once delivery. If the application uses something like HBase and make sure that there is unique key for each message, we get each message 'exactly-once', though technically that is not same as exactly-once delivery.

Why do i get ERROR OOME with size error?

Regardless of which consumer API you use and which offset management you go with, few use cases produce an error in consumer:
ERROR OOME with size <dddddd>(kafka.network.BoundedByteBufferReceive)

This error has nothing to do with offset management. Kafka uses internal byte buffers to store messages (message chunks to be precise) for each consumer. This buffer is governed by below configurations:

These settings determine how much memory a consumer is goign to take. For, older clients, it is:
(number of consumer threads) * (queuedchunks.max) * (fetch.size) or (number of consumer threads) * (queued.max.message.chunks) * (fetch.message.max.bytes)

For the new consumer API it is:
(number of partitions) * (max.partition.fetch.bytes)

When consumers get busy with processing and are slow (due to RAM and/or CPU pressure) to consume and remove data from internal buffers, an internal call to pre-fetch these buffer would end up in the error above.

Solution to this error would be to:

The last option above is handy if we have started a new consumer/consumer group knowing that

Slowing down would mean measuring message consumption rates in the application and putting threads to sleep :-)

Comments powered by Disqus