Skip to main content

A sample Logstash config to connect to ElasticSearch with TLS

Following up my previous blog post below is a sample Logstash config that can be used to connect to the ElasticSearch for the output of data:
cd /work/elk/logstash-5.2.6/
vim ./config/twitter_feeds_consumer/twitter_feeds_consumer.conf

input {
  kafka {
    topics => ["twitter_feeds_kafka_topic_name"]
    bootstrap_servers => "kafka-broker-1.domain.name:9092,kafka-broker-2.domain.name:9092"
    # consumer_threads => 5
    # auto_offset_reset => "earliest"
    group_id => "logstash562_twitter_feeds_consumer_group"
    codec => json { charset => "ISO-8859-1" }
  }
}

output {
# stdout { codec => "rubydebug" }

  elasticsearch {
    hosts => ["https://coord_01:9200"]
    index => "index-name-%{+YYYY.MM.dd}"
    ssl => true
    cacert => '/work/elk/logstash-5.6.2/config/ca.crt'
    user => logstash_internal
    password => logstash_internal_password
  }
}

Logstash 5.x onwards require that every logstash process specifies its own data path folder.
To do that follow below steps:
mkdir -p /work/elk/data/data-logstash562/twitter_feeds_consumer
mkdir -p /work/elk/data/logs-logstash562/twitter_feeds_consumer/
./bin/logstash -f ./config/twitter_feeds_consumer/twitter_feeds_consumer.conf -w 5 --path.data=/work/elk/data/data-logstash562/twitter_feeds_consumer -l /work/elk/data/logs-logstash562/twitter_feeds_consumer &

Important flags used are:
-f Logstash config file
-w Sets the number of pipeline workers that will, in parallel, execute the filter and output stages of the pipeline.
--path.data Logstash 5.x onwards, you need to specify different data folders for every Logstash process.
-t Check configuration for valid syntax and then exit
-r Reload config automatically
--config.reload.interval RELOAD_INTERVAL How frequently to poll the configuration location for changes, in seconds. The default is every 3 seconds.

Comments

Popular posts from this blog

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above ...

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance: Consumer group ID : Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets , hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name . Skewed : A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed. Spread : Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread...

ElasticSearch pipeline bucket selector aggregation

ElasticSearch has a concept of bucket selection generated from aggregation. This works as a pipeline, where first aggregation generates buckets, and then bucket selection further filters out buckets. We have an ElasticSearch index ' daily_reports ', where a row represents a particular version of report. When a report is created a new row is inserted in the index with a new ' reportId ' field value and ' publishDate ' field representing the UNIX timestamp. Each report/row has multiple other fields representing properties of the report, for e.g., ' title ', ' activity ', ' reportStatus ', ' reportLevel ', etc. When the report is edited/deleted, a new row is inserted into the index, with same ' reportId ', but different '_id', 'publishDate', 'reportLevel' etc. Now if user wants to get the latest version for each report matching a particular filter criterion ( reportLevel = Monitoring AND repor...