Skip to main content

Kafka MirrorMaker in Kafka 0.10.0.1+

Check MirrorMaker.scala for more details.

Target cluster setup

  1. Download and install Kafka (target cluster). Select appropriate version and download its tgz from Kafka Downloads page.
    1. tar -zxf kafka_2.11-0.10.0.1.tgz
    2. cd kafka_2.11-0.10.0.1
  2. Configure Target Kafka cluster's ZooKeeper
    1. vim ./config/zookeeper.properties
    1. # the directory where the snapshot is stored.
    2. dataDir=/work/kafka_2.11-0.10.0.1/zookeeper-data
    3. # the port at which the clients will connect
    4. clientPort=2181
    5. # disable the per-ip limit on thseparatedof connections since this is a non-production config
    6. maxClientCnxns=0
  3. Start Target Kafka cluster's ZooKeeper
    1. ./bin/zookeeper-server-start.sh config/zookeeper.properties
  4. Configure Target Kafka cluster's Server
    1. vim ./config/server.properties
    1. # The id of the broker. This must be set to a unique integer for each broker.
    2. broker.id=0
    3. # The number of threads handling network requests
    4. num.network.threads=3
    5. # The number of threads doing disk I/O
    6. num.io.threads=8
    7. # The send buffer (SO_SNDBUF) used by the socket server
    8. socket.send.buffer.bytes=102400
    9. # The receive buffer (SO_RCVBUF) used by the socket server
    10. socket.receive.buffer.bytes=102400
    11. # The maximum size of a request that the socket server will accept (protection against OOM)
    12. socket.request.max.bytes=104857600
    13. # A comma separated list of directories under which to store log files
    14. log.dirs=/work/kafka_2.11-0.10.0.1/kafka-logs
    15. num.partitions=1
    16. num.recovery.threads.per.data.dir=1
    17. # The minimum age of a log file to be eligible for deletion
    18. log.retention.hours=2
    19. log.retention.bytes=1073741824
    20. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    21. log.segment.bytes=1073741824
    22. # The interval at which log segments are checked to see if they can be deleted according to the retention policies
    23. log.retention.check.interval.ms=300000
    24. ############################# Zookeeper #############################
    25. zookeeper.connect=localhost:2181
    26. # Timeout in ms for connecting to zookeeper
    27. zookeeper.connection.timeout.ms=6000
    28.  
    29. log.cleaner.enable=true
    30. delete.topic.enable=true
    31. controlled.shutdown.enable=true
    32. fetch.message.max.bytes=20000000
    33. replica.fetch.max.bytes=20000000
    34. message.max.bytes=20000000
    35. unclean.leader.election.enable=true
    36.  
    37. ########################## SSL settings (optional) ###############################
    38. listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    39. ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/server.keystore.jks
    40. ssl.keystore.password=changeit
    41. ssl.key.password=changeit
    42. ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/server.truststore.jks
    43. ssl.truststore.password=changeit
    44. ssl.client.auth=required
    45. ssl.keystore.type=JKS
    46. ssl.truststore.type=JKS
    47. ssl.secure.random.implementation=SHA1PRNG
    48. ssl.enabled.protocols=TLSv1.2
    49. security.inter.broker.protocol=SSL
  5. Start Target Kafka cluster's Server
    1. ./bin/kafka-server-start.sh ./config/server.properties
  6. Create topic in target cluster
    1. ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
    2. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --config 'retention.ms=3600000' --topic mirrored_topic
  7. Start listening to the topic in target cluster using Console Consumer
    1. ./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mirrored_topic

Kafka MirrorMaker

Configure Kafka MirrorMaker

  1. Source cluster consumer config
    1. bootstrap.servers=sourceKafka01:9093,sourceKafka02:9093 # use plaintext if source Kafka cluster is not using SSL
    2. client.id=mirror-maker-client-01
    3. group.id=dp-mirror-maker-group-01
    4. exclude.internal.topics=true
    5. auto.offset.reset=earliest
    6. enable.auto.commit=true
    7. security.protocol=SSL # ignore if source Kafka cluster is not using SSL
    8. ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterTrustStore.jks # ignore if source Kafka cluster is not using SSL
    9. ssl.truststore.password=changeit # ignore if source Kafka cluster is not using SSL
    10. ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterKeyStore.jks # The location of the key store file. This is optional for client and can be used for two-way authentication for client.
    11. ssl.keystore.password=changeit # The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
    12. ssl.key.password=changeit # The password of the private key in the key store file. This is optional for client.
  2. Target cluster producer config
    1. bootstrap.servers=localhost:9092
    2. acks=all
    3. batch.size=1024
    4. linger.ms=500
    5. client.id=mirror_maker_producer
  3. Run Kafka MirrorMaker
    1. ./bin/kafka-run-class.sh kafka.tools.MirrorMaker --new.consumer --consumer.config config/sourceClusterConsumer.conf --num.streams 2 --producer.config config/targetClusterProducer.conf --whitelist="mirrored_topic"
  4. Check progress
    1. # List all consumer groups
    2. ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --list
    3. # Describe consumer group and list offset lag related to given group (at source Kafka cluster):
    4. ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --describe --group mirror-maker-group-01

Comments

Unknown said…
Hi Rahul,I have tried mirror maker with SSL enabled within all kafka brokers in DC1 and DC2. With SSL it is not working for me but with out SSL it is working fine.I am using kafka 2.1.0 version.Please help me to work with SSL.

Popular posts from this blog

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance: Consumer group ID : Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets , hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name . Skewed : A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed. Spread : Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread...

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above ...

ElasticSearch Curator

Curator is a tool from Elastic to help manage your ElasticSearch cluster. For certain logs/data, we use one ElasticSearch index per year/month/day and might keep a rolling 7 day window of history. This means that every day we need to create, backup, and delete some indices. Curator helps make this process automated and repeatable. Installation Curator is written in Python , so will need pip to install it: pip install elasticsearch-curator curator --config ./curator_cluster_config.yml curator_actions.yml --dry-run Configuration Create a file curator_cluster_config.yml with following contents: --- # Remember, leave a key empty if there is no value. None will be a string, not a Python "NoneType" client: hosts: - "es_coordinating_01.singhaiuklimited.com" port: 9200 url_prefix: use_ssl: True # The certificate file is the CA certificate used to sign all ES node certificates. # Use same CA certificate to generate and sign the certificate running ...