Skip to main content

Kafka MirrorMaker in Kafka 0.10.0.1+

Check MirrorMaker.scala for more details.

Target cluster setup

  1. Download and install Kafka (target cluster). Select appropriate version and download its tgz from Kafka Downloads page.
  2. tar -zxf kafka_2.11-0.10.0.1.tgz
    cd kafka_2.11-0.10.0.1
    
  3. Configure Target Kafka cluster's ZooKeeper
  4. vim ./config/zookeeper.properties
    
    # the directory where the snapshot is stored.
    dataDir=/work/kafka_2.11-0.10.0.1/zookeeper-data
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on thseparatedof connections since this is a non-production config
    maxClientCnxns=0
    
  5. Start Target Kafka cluster's ZooKeeper
  6. ./bin/zookeeper-server-start.sh config/zookeeper.properties
    
  7. Configure Target Kafka cluster's Server
  8. vim ./config/server.properties
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    # The number of threads handling network requests
    num.network.threads=3
    # The number of threads doing disk I/O
    num.io.threads=8
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    # A comma separated list of directories under which to store log files
    log.dirs=/work/kafka_2.11-0.10.0.1/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=2
    log.retention.bytes=1073741824
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    # The interval at which log segments are checked to see if they can be deleted according to the retention policies
    log.retention.check.interval.ms=300000
    ############################# Zookeeper #############################
    zookeeper.connect=localhost:2181
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    log.cleaner.enable=true
    delete.topic.enable=true
    controlled.shutdown.enable=true
    fetch.message.max.bytes=20000000
    replica.fetch.max.bytes=20000000
    message.max.bytes=20000000
    unclean.leader.election.enable=true
    
    ########################## SSL settings (optional) ###############################
    listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/server.keystore.jks
    ssl.keystore.password=changeit
    ssl.key.password=changeit
    ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/server.truststore.jks
    ssl.truststore.password=changeit
    ssl.client.auth=required
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    ssl.secure.random.implementation=SHA1PRNG
    ssl.enabled.protocols=TLSv1.2
    security.inter.broker.protocol=SSL
    
  9. Start Target Kafka cluster's Server
  10. ./bin/kafka-server-start.sh ./config/server.properties
    
  11. Create topic in target cluster
  12. ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
    ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --config 'retention.ms=3600000' --topic mirrored_topic
    
  13. Start listening to the topic in target cluster using Console Consumer
  14. ./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mirrored_topic
    

Kafka MirrorMaker

Configure Kafka MirrorMaker

  1. Source cluster consumer config
  2. bootstrap.servers=sourceKafka01:9093,sourceKafka02:9093                               # use plaintext if source Kafka cluster is not using SSL
    client.id=mirror-maker-client-01
    group.id=dp-mirror-maker-group-01
    exclude.internal.topics=true
    auto.offset.reset=earliest
    enable.auto.commit=true
    security.protocol=SSL                                                                  # ignore if source Kafka cluster is not using SSL
    ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterTrustStore.jks # ignore if source Kafka cluster is not using SSL
    ssl.truststore.password=changeit                                                       # ignore if source Kafka cluster is not using SSL
    ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterKeyStore.jks     # The location of the key store file. This is optional for client and can be used for two-way authentication for client.
    ssl.keystore.password=changeit                                                         # The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
    ssl.key.password=changeit                                                              # The password of the private key in the key store file. This is optional for client.
    
  3. Target cluster producer config
  4. bootstrap.servers=localhost:9092
    acks=all
    batch.size=1024
    linger.ms=500
    client.id=mirror_maker_producer
    
  5. Run Kafka MirrorMaker
  6. ./bin/kafka-run-class.sh kafka.tools.MirrorMaker --new.consumer --consumer.config config/sourceClusterConsumer.conf --num.streams 2 --producer.config config/targetClusterProducer.conf --whitelist="mirrored_topic"
    
  7. Check progress
  8. # List all consumer groups
    ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --list
    # Describe consumer group and list offset lag related to given group (at source Kafka cluster):
    ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --describe --group mirror-maker-group-01
    

Comments

Unknown said…
Hi Rahul,I have tried mirror maker with SSL enabled within all kafka brokers in DC1 and DC2. With SSL it is not working for me but with out SSL it is working fine.I am using kafka 2.1.0 version.Please help me to work with SSL.

Popular posts from this blog

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem: New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf). Solution is as follows: Right click on "My Computer". Select "Properties". Go to "Advanced" tab. Click on "Environment Variables". Delete "HOME" variable from User / System variables.

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance: Consumer group ID : Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets , hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name . Skewed : A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed. Spread : Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above