Skip to main content

Kafka MirrorMaker in Kafka 0.10.0.1+

Check MirrorMaker.scala for more details.

Target cluster setup

  1. Download and install Kafka (target cluster). Select appropriate version and download its tgz from Kafka Downloads page.
  2. tar -zxf kafka_2.11-0.10.0.1.tgz
    cd kafka_2.11-0.10.0.1
    
  3. Configure Target Kafka cluster's ZooKeeper
  4. vim ./config/zookeeper.properties
    
    # the directory where the snapshot is stored.
    dataDir=/work/kafka_2.11-0.10.0.1/zookeeper-data
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on thseparatedof connections since this is a non-production config
    maxClientCnxns=0
    
  5. Start Target Kafka cluster's ZooKeeper
  6. ./bin/zookeeper-server-start.sh config/zookeeper.properties
    
  7. Configure Target Kafka cluster's Server
  8. vim ./config/server.properties
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    # The number of threads handling network requests
    num.network.threads=3
    # The number of threads doing disk I/O
    num.io.threads=8
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    # A comma separated list of directories under which to store log files
    log.dirs=/work/kafka_2.11-0.10.0.1/kafka-logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=2
    log.retention.bytes=1073741824
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    # The interval at which log segments are checked to see if they can be deleted according to the retention policies
    log.retention.check.interval.ms=300000
    ############################# Zookeeper #############################
    zookeeper.connect=localhost:2181
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    log.cleaner.enable=true
    delete.topic.enable=true
    controlled.shutdown.enable=true
    fetch.message.max.bytes=20000000
    replica.fetch.max.bytes=20000000
    message.max.bytes=20000000
    unclean.leader.election.enable=true
    
    ########################## SSL settings (optional) ###############################
    listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/server.keystore.jks
    ssl.keystore.password=changeit
    ssl.key.password=changeit
    ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/server.truststore.jks
    ssl.truststore.password=changeit
    ssl.client.auth=required
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    ssl.secure.random.implementation=SHA1PRNG
    ssl.enabled.protocols=TLSv1.2
    security.inter.broker.protocol=SSL
    
  9. Start Target Kafka cluster's Server
  10. ./bin/kafka-server-start.sh ./config/server.properties
    
  11. Create topic in target cluster
  12. ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
    ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --config 'retention.ms=3600000' --topic mirrored_topic
    
  13. Start listening to the topic in target cluster using Console Consumer
  14. ./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mirrored_topic
    

Kafka MirrorMaker

Configure Kafka MirrorMaker

  1. Source cluster consumer config
  2. bootstrap.servers=sourceKafka01:9093,sourceKafka02:9093                               # use plaintext if source Kafka cluster is not using SSL
    client.id=mirror-maker-client-01
    group.id=dp-mirror-maker-group-01
    exclude.internal.topics=true
    auto.offset.reset=earliest
    enable.auto.commit=true
    security.protocol=SSL                                                                  # ignore if source Kafka cluster is not using SSL
    ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterTrustStore.jks # ignore if source Kafka cluster is not using SSL
    ssl.truststore.password=changeit                                                       # ignore if source Kafka cluster is not using SSL
    ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterKeyStore.jks     # The location of the key store file. This is optional for client and can be used for two-way authentication for client.
    ssl.keystore.password=changeit                                                         # The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
    ssl.key.password=changeit                                                              # The password of the private key in the key store file. This is optional for client.
    
  3. Target cluster producer config
  4. bootstrap.servers=localhost:9092
    acks=all
    batch.size=1024
    linger.ms=500
    client.id=mirror_maker_producer
    
  5. Run Kafka MirrorMaker
  6. ./bin/kafka-run-class.sh kafka.tools.MirrorMaker --new.consumer --consumer.config config/sourceClusterConsumer.conf --num.streams 2 --producer.config config/targetClusterProducer.conf --whitelist="mirrored_topic"
    
  7. Check progress
  8. # List all consumer groups
    ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --list
    # Describe consumer group and list offset lag related to given group (at source Kafka cluster):
    ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --describe --group mirror-maker-group-01
    

Comments

Popular posts from this blog

Procedure for name and date of birth change (Pune)

For change of name, the form (scribd) is available free of cost at Government Book Depot (Shaskiya Granthagar), which is located near Collector’s office, next to Saint Helena's School. The postal address is:
Government Photozinco Press Premises and Book Depot,
5, Photozinco Press Road, Pune, MH, 411001.
Wikimapia link

Charges for name or date of birth change, in the Maharashtra Government Gazette:
INR 120.00 per insertion (for two copies of the Gazette)
For backward class applicants: INR 60.00
Charges for extra copy of the Gazette: INR 15.00 per copy (two copies are enough, so you may not want to pay extra for extra copies).

Backward class applicants are required to submit a xerox of caste certificate of old name as issued by the Collector of the District concerned.

Once the form is duly submitted, it normally takes 10 to 15 days for publication of advertisement in the Maharashtra Government Gazette. The Gazette copy reaches to the address filled in the form within next 7 to 15 day…

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem:
New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf).
Solution is as follows:
Right click on "My Computer".Select "Properties".Go to "Advanced" tab.Click on "Environment Variables".Delete "HOME" variable from User / System variables.

ElasticSearch max file descriptors too low error

ElasticSearch 5.x requires a minimum of Max file descriptors 65536 and Max virtual memory areas 262144.
It throws an error on start-up if these are set to very low value.
ERROR: bootstrap checks failed max file descriptors [16384] for elasticsearch process is too low, increase to at least [65536] max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
Check current values using:
$ cat /proc/sys/fs/file-max 16384 $ cat /proc/sys/vm/max_map_count 65530 $ ulimit -Hn 16384 $ ulimit -Sn 4096
To fix this, following files need to change/add below settings:
Recommended: Add a new file 99-elastic.conf under /etc/security/limits.d with following settings:
elasticsearch - nofile 800000 elasticsearch - nproc 16384 defaultusername - nofile 800000 defaultusername - nproc 16384 Alternatively, edit /etc/sysctl.conf with following settings:
fs.file-max = 800000 vm.max_map_count=300000