Check MirrorMaker.scala for more details.
Target cluster setup
- Download and install Kafka (target cluster). Select appropriate version and download its tgz from Kafka Downloads page.
- tar -zxf kafka_2.11-0.10.0.1.tgz
- cd kafka_2.11-0.10.0.1
- Configure Target Kafka cluster's ZooKeeper
- vim ./config/zookeeper.properties
- # the directory where the snapshot is stored.
- dataDir=/work/kafka_2.11-0.10.0.1/zookeeper-data
- # the port at which the clients will connect
- clientPort=2181
- # disable the per-ip limit on thseparatedof connections since this is a non-production config
- maxClientCnxns=0
- Start Target Kafka cluster's ZooKeeper
- ./bin/zookeeper-server-start.sh config/zookeeper.properties
- Configure Target Kafka cluster's Server
- vim ./config/server.properties
- # The id of the broker. This must be set to a unique integer for each broker.
- broker.id=0
- # The number of threads handling network requests
- num.network.threads=3
- # The number of threads doing disk I/O
- num.io.threads=8
- # The send buffer (SO_SNDBUF) used by the socket server
- socket.send.buffer.bytes=102400
- # The receive buffer (SO_RCVBUF) used by the socket server
- socket.receive.buffer.bytes=102400
- # The maximum size of a request that the socket server will accept (protection against OOM)
- socket.request.max.bytes=104857600
- # A comma separated list of directories under which to store log files
- log.dirs=/work/kafka_2.11-0.10.0.1/kafka-logs
- num.partitions=1
- num.recovery.threads.per.data.dir=1
- # The minimum age of a log file to be eligible for deletion
- log.retention.hours=2
- log.retention.bytes=1073741824
- # The maximum size of a log segment file. When this size is reached a new log segment will be created.
- log.segment.bytes=1073741824
- # The interval at which log segments are checked to see if they can be deleted according to the retention policies
- log.retention.check.interval.ms=300000
- ############################# Zookeeper #############################
- zookeeper.connect=localhost:2181
- # Timeout in ms for connecting to zookeeper
- zookeeper.connection.timeout.ms=6000
- log.cleaner.enable=true
- delete.topic.enable=true
- controlled.shutdown.enable=true
- fetch.message.max.bytes=20000000
- replica.fetch.max.bytes=20000000
- message.max.bytes=20000000
- unclean.leader.election.enable=true
- ########################## SSL settings (optional) ###############################
- listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
- ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/server.keystore.jks
- ssl.keystore.password=changeit
- ssl.key.password=changeit
- ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/server.truststore.jks
- ssl.truststore.password=changeit
- ssl.client.auth=required
- ssl.keystore.type=JKS
- ssl.truststore.type=JKS
- ssl.secure.random.implementation=SHA1PRNG
- ssl.enabled.protocols=TLSv1.2
- security.inter.broker.protocol=SSL
- Start Target Kafka cluster's Server
- ./bin/kafka-server-start.sh ./config/server.properties
- Create topic in target cluster
- ./bin/kafka-topics.sh --zookeeper localhost:2181 --list
- ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --config 'retention.ms=3600000' --topic mirrored_topic
- Start listening to the topic in target cluster using Console Consumer
- ./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mirrored_topic
Kafka MirrorMaker
Configure Kafka MirrorMaker
- Source cluster consumer config
- bootstrap.servers=sourceKafka01:9093,sourceKafka02:9093 # use plaintext if source Kafka cluster is not using SSL
- client.id=mirror-maker-client-01
- group.id=dp-mirror-maker-group-01
- exclude.internal.topics=true
- auto.offset.reset=earliest
- enable.auto.commit=true
- security.protocol=SSL # ignore if source Kafka cluster is not using SSL
- ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterTrustStore.jks # ignore if source Kafka cluster is not using SSL
- ssl.truststore.password=changeit # ignore if source Kafka cluster is not using SSL
- ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterKeyStore.jks # The location of the key store file. This is optional for client and can be used for two-way authentication for client.
- ssl.keystore.password=changeit # The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
- ssl.key.password=changeit # The password of the private key in the key store file. This is optional for client.
- Target cluster producer config
- bootstrap.servers=localhost:9092
- acks=all
- batch.size=1024
- linger.ms=500
- client.id=mirror_maker_producer
- Run Kafka MirrorMaker
- ./bin/kafka-run-class.sh kafka.tools.MirrorMaker --new.consumer --consumer.config config/sourceClusterConsumer.conf --num.streams 2 --producer.config config/targetClusterProducer.conf --whitelist="mirrored_topic"
- Check progress
- # List all consumer groups
- ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --list
- # Describe consumer group and list offset lag related to given group (at source Kafka cluster):
- ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --describe --group mirror-maker-group-01
Comments