Check MirrorMaker.scala for more details.
Target cluster setup
- Download and install Kafka (target cluster). Select appropriate version and download its tgz from Kafka Downloads page.
- Configure Target Kafka cluster's ZooKeeper
- Start Target Kafka cluster's ZooKeeper
- Configure Target Kafka cluster's Server
- Start Target Kafka cluster's Server
- Create topic in target cluster
- Start listening to the topic in target cluster using Console Consumer
tar -zxf kafka_2.11-0.10.0.1.tgz cd kafka_2.11-0.10.0.1
vim ./config/zookeeper.properties
# the directory where the snapshot is stored. dataDir=/work/kafka_2.11-0.10.0.1/zookeeper-data # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on thseparatedof connections since this is a non-production config maxClientCnxns=0
./bin/zookeeper-server-start.sh config/zookeeper.properties
vim ./config/server.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 # A comma separated list of directories under which to store log files log.dirs=/work/kafka_2.11-0.10.0.1/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 # The minimum age of a log file to be eligible for deletion log.retention.hours=2 log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 log.cleaner.enable=true delete.topic.enable=true controlled.shutdown.enable=true fetch.message.max.bytes=20000000 replica.fetch.max.bytes=20000000 message.max.bytes=20000000 unclean.leader.election.enable=true ########################## SSL settings (optional) ############################### listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/server.keystore.jks ssl.keystore.password=changeit ssl.key.password=changeit ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/server.truststore.jks ssl.truststore.password=changeit ssl.client.auth=required ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.secure.random.implementation=SHA1PRNG ssl.enabled.protocols=TLSv1.2 security.inter.broker.protocol=SSL
./bin/kafka-server-start.sh ./config/server.properties
./bin/kafka-topics.sh --zookeeper localhost:2181 --list ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --config 'retention.ms=3600000' --topic mirrored_topic
./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mirrored_topic
Kafka MirrorMaker
Configure Kafka MirrorMaker
- Source cluster consumer config
- Target cluster producer config
- Run Kafka MirrorMaker
- Check progress
bootstrap.servers=sourceKafka01:9093,sourceKafka02:9093 # use plaintext if source Kafka cluster is not using SSL client.id=mirror-maker-client-01 group.id=dp-mirror-maker-group-01 exclude.internal.topics=true auto.offset.reset=earliest enable.auto.commit=true security.protocol=SSL # ignore if source Kafka cluster is not using SSL ssl.truststore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterTrustStore.jks # ignore if source Kafka cluster is not using SSL ssl.truststore.password=changeit # ignore if source Kafka cluster is not using SSL ssl.keystore.location=/work/kafka_2.11-0.10.0.1/ssl/sourceKafkaClusterKeyStore.jks # The location of the key store file. This is optional for client and can be used for two-way authentication for client. ssl.keystore.password=changeit # The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. ssl.key.password=changeit # The password of the private key in the key store file. This is optional for client.
bootstrap.servers=localhost:9092 acks=all batch.size=1024 linger.ms=500 client.id=mirror_maker_producer
./bin/kafka-run-class.sh kafka.tools.MirrorMaker --new.consumer --consumer.config config/sourceClusterConsumer.conf --num.streams 2 --producer.config config/targetClusterProducer.conf --whitelist="mirrored_topic"
# List all consumer groups ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --list # Describe consumer group and list offset lag related to given group (at source Kafka cluster): ./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer --bootstrap-server sourceKafka01:9092,sourceKafka02:9092 --describe --group mirror-maker-group-01
Comments