Skip to main content

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance:
  • Consumer group ID: Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets, hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name.
  • Skewed: A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed.
  • Spread: Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread.
  • Leader skewedThe cluster is in leader skewed state when a node is leader for more partitions than the # of partitions/# of brokers. Change broker config to auto.leader.rebalance.enable=true to enable auto-rebalancing of leader (from ISR) every 5 mins. You can also click on button "Preferred replica election" to elect another replica as leader automatically for all topics in a cluster.
  • # of partitions: The first thing to understand is that a topic partition is the unit of parallelism in Kafka. On both the producer and the broker side, writes to different partitions can be done fully in parallel. So expensive operations such as compression can utilize more hardware resources. On the consumer side, Kafka always gives a single partition’s data to one consumer thread. Thus, the degree of parallelism in the consumer (within a consumer group) is bounded by the number of partitions being consumed. Therefore, in general, the more partitions there are in a Kafka cluster, the higher the throughput one can achieve.
    A rough formula for picking the number of partitions is based on throughput. You measure the throughout that you can achieve on a single partition for production (call it p) and consumption (call it c). Let’s say your target throughput is t. Then you need to have at least max(t/p, t/c) partitions.
  • More Partitions Requires More Open File Handles: Each partition maps to a directory in the file system in the broker. Within that log directory, there will be two files (one for the index and another for the actual data) per log segment. Currently, in Kafka, each broker opens a file handle of both the index and the data file of every log segment. So, the more partitions, the higher that one needs to configure the open file handle limit in the underlying operating system. This is mostly just a configuration issue. We have seen production Kafka clusters running with more than 30 thousand open file handles per broker.
  • Limit the number of partitions per broker to 2000 - 4000 and the total number of partitions in the cluster to low tens of thousands.
  • Controller: Kafka supports intra-cluster replication, which provides higher availability and durability. A partition can have multiple replicas, each stored on a different broker. One of the replicas is designated as the leader and the rest of the replicas are followers. Internally, Kafka manages all those replicas automatically and makes sure that they are kept in sync. Both the producer and the consumer requests to a partition are served on the leader replica. When a broker fails, partitions with a leader on that broker become temporarily unavailable. Kafka will automatically move the leader of those unavailable partitions to some other replicas to continue serving the client requests. This process is done by one of the Kafka brokers designated as the controller. It involves reading and writing some metadata for each affected partition in ZooKeeper. Currently, operations to ZooKeeper are done serially in the controller.
    The more the number of partitions on unavailable node, more time will be taken by controller to pick up new leader.
  • More Partitions May Increase End-to-end Latency: The end-to-end latency in Kafka is defined by the time from when a message is published by the producer to when the message is read by the consumer. Kafka only exposes a message to a consumer after it has been committed, i.e., when the message is replicated to all the in-sync replicas. So, the time to commit a message can be a significant portion of the end-to-end latency. By default, a Kafka broker only uses a single thread to replicate data from another broker, for all partitions that share replicas between the two brokers. Our experiments show that replicating 1000 partitions from one broker to another can add about 20 ms latency, which implies that the end-to-end latency is at least 20 ms. This can be too high for some real-time applications.

    Note that this issue is alleviated on a larger cluster. For example, suppose that there are 1000 partition leaders on a broker and there are 10 other brokers in the same Kafka cluster. Each of the remaining 10 brokers only needs to fetch 100 partitions from the first broker on average. Therefore, the added latency due to committing a message will be just a few ms, instead of tens of ms.

    As a rule of thumb, if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x r, where b is the number of brokers in a Kafka cluster and r is the replication factor.
  • More Partitions May Require More Memory In the Client: If one increases the number of partitions, message will be accumulated in more partitions in the producer. The aggregate amount of memory used may now exceed the configured memory limit. When this happens, the producer has to either block or drop any new message, neither of which is ideal. To prevent this from happening, one will need to reconfigure the producer with a larger memory size.
    As a rule of thumb, to achieve good throughput, one should allocate at least a few tens of KB per partition being produced in the producer and adjust the total amount of memory if the number of partitions increases significantly.
  • Zookeeper: Post Kafka-0.8, Zookeeper is only used for the brokers management (failures, discovery), not for the offsets management. Though for legacy systems, Kafka can dual-writes the offsets into Zookeeper and Kafka’s __consumer_offsets (see dual.commit.enabled=true and offsets.storage=kafka)

Comments

Popular posts from this blog

Procedure for name and date of birth change (Pune)

For change of name, the form (scribd) is available free of cost at Government Book Depot (Shaskiya Granthagar), which is located near Collector’s office, next to Saint Helena's School. The postal address is:
Government Photozinco Press Premises and Book Depot,
5, Photozinco Press Road, Pune, MH, 411001.
Wikimapia link

Charges for name or date of birth change, in the Maharashtra Government Gazette:
INR 120.00 per insertion (for two copies of the Gazette)
For backward class applicants: INR 60.00
Charges for extra copy of the Gazette: INR 15.00 per copy (two copies are enough, so you may not want to pay extra for extra copies).

Backward class applicants are required to submit a xerox of caste certificate of old name as issued by the Collector of the District concerned.

Once the form is duly submitted, it normally takes 10 to 15 days for publication of advertisement in the Maharashtra Government Gazette. The Gazette copy reaches to the address filled in the form within next 7 to 15 day…

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem:
New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf).
Solution is as follows:
Right click on "My Computer".Select "Properties".Go to "Advanced" tab.Click on "Environment Variables".Delete "HOME" variable from User / System variables.

ElasticSearch max file descriptors too low error

ElasticSearch 5.x requires a minimum of Max file descriptors 65536 and Max virtual memory areas 262144.
It throws an error on start-up if these are set to very low value.
ERROR: bootstrap checks failed max file descriptors [16384] for elasticsearch process is too low, increase to at least [65536] max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
Check current values using:
$ cat /proc/sys/fs/file-max 16384 $ cat /proc/sys/vm/max_map_count 65530 $ ulimit -Hn 16384 $ ulimit -Sn 4096
To fix this, following files need to change/add below settings:
Recommended: Add a new file 99-elastic.conf under /etc/security/limits.d with following settings:
elasticsearch - nofile 800000 elasticsearch - nproc 16384 defaultusername - nofile 800000 defaultusername - nproc 16384 Alternatively, edit /etc/sysctl.conf with following settings:
fs.file-max = 800000 vm.max_map_count=300000