Skip to main content

Logstash: Looping through nested JSON in ruby filter

Suppose we have a JSON payload (may be a stream coming from Kafka) that looks like this:
{"regionName":"asia","nodes":[{"node":"router.dc1.singhaiuklimited.com","weight":"0"},{"node":"router.dc2.singhaiuklimited.com","weight":"100"}]}
{"regionName":"asia","nodes":[{"node":"router.dc1.singhaiuklimited.com","weight":"0"},{"node":"router.dc1.singhaiuklimited.com","weight":"0"},{"node":"router.dc2.singhaiuklimited.com","weight":"100"},{"node":"router.dc2.singhaiuklimited.com","weight":"100"}]}
To loop through the nested fields and generate extra fields from the calculations while using Logstash, we can do something like this:
input {
  kafka {
    bootstrap_servers => "kafka.singhaiuklimited.com:9181"
    topics => ["routerLogs"]
    group_id => "logstashConsumerGroup"
    auto_offset_reset => "earliest"
    codec => "json"
    client_id => "logstashConsumerGroupClient"
  }
}

filter {
  ruby {
    code => '
      dc1NodesTraversal = 0
      dc2NodesTraversal = 0
      totalWeight = 0
      arrayLength = 0

      for currObj in event.get("nodes") do
 arrayLength += 1
        node = currObj[node]
        nodeWeight = currObj[weight].to_i

        totalWeight += nodeWeight
        if nodeWeight > 0 && node =~ /dc1/
            dc1NodesTraversal += 1
        elsif nodeWeight > 0 && node =~ /dc2/
            dc2NodesTraversal +=1
        end
      end

      # Below is same loop, but with different syntax, which also works
      # event.get("nodes").each_index { |i|
      #   arrayLength += 1
      #   node = event.get("[nodes]["+i.to_s+"][node]")
      #   nodeWeight = event.get("[nodes]["+i.to_s+"][weight]").to_i
      # 
      #   totalWeight += nodeWeight
      #   if nodeWeight > 0 && node =~ /dc1/
      #       dc1NodesTraversal += 1
      #   elsif nodeWeight > 0 && node =~ /dc2/
      #       dc2NodesTraversal += 1
      #   end
      # }

      event.set("dc1NodesTraversal", dc1NodesTraversal)
      event.set("dc2NodesTraversal", dc2NodesTraversal)
      event.set("totalWeight", totalWeight)
      event.set("arrayLength", arrayLength)
    '
  }
  mutate {
    remove_field => ["message" ]
  }
}

output {
  stdout { codec => rubydebug }
}

Comments

Domen Jesenovec said…
Just wanted to thank you for this post. I spent almost two days trying to figure out how to work with nested documents in Logstash using Ruby filter. Thanks to this post I got a working solution.
Based on your data, what I was expecting will work: event.set('[nodes][i][weight]', 7)
What actually works: event.set('[nodes]['+i.to_s+'][weight]', 7)

Thank You!
Anonymous said…
Hi iam trying to extract some feild and rename the feild from json message.

this is the message
"facets":[{"name":"xrbcppor02d_10.73.29.72_PRD_DALLAS","results":[{"average":10.380978588330544}]},{"name":"xrbcppor02h_10.170.187.11_PRD_WASHING","results":[{"average":9.200648820400238}]},{"name":"xrbcpdb201_10.73.29.68_PRD_DALLAS","results":[{"average":9.16922921245381}]},{"name":"xrbcppor01d_10.73.29.71_PRD_DALLAS","results":[{"average":8.910986583111649}]},{"name":"xrbcppor01h_10.170.187.10_PRD_WASHING","results":[{"average":7.534192990448515}]},{"name":"xrbcpihs04h_10.170.194.134_PRD_WASHING","results":[{"average":3.249048608844563}]},{"name":"xrbcpihs01h_10.170.194.131_PRD_WASHING","results":[{"average":3.240313716864182}]},{"name":"xrbcpihs01d_10.73.73.201_PRD_DALLAS","results":[{"average":3.0964625934759775}]},{"name":"xrbcpihs02h_10.170.194.132_PRD_WASHING","results":[{"average":3.045999195616124}]},{"name":"xrbcpihs02d_10.73.73.202_PRD_DALLAS","results":[{"average":2.9824970687850048}]},{"name":"xrbcpihs04d_10.73.73.204_PRD_DALLAS","results":[{"average":2.9326073820308105}]},{"name":"xrbcpihs03h_10.170.194.133_PRD_WASHING","results":[{"average":2.9221162684893205}]},{"name":"xrbcpihs03d_10.73.73.203_PRD_DALLAS","results":[{"average":2.103803092241287}]}],"totalResult":{"results":[{"average":5.2880065943513594}]},"unknownGroup":{"results":[{"average":null}]},"performanceStats":{"inspectedCount":6817,"omittedCount":0,"matchCount":770,"wallClockTime":88},"metadata":{"eventTypes":["SystemSample"],"eventType":"SystemSample","openEnded":true,"beginTime":"2020-12-30T07:30:00Z","endTime":"2020-12-30T07:35:00Z","beginTimeMillis":1609313400933,"endTimeMillis":1609313700933,"rawSince":"5 MINUTES AGO","rawUntil":"NOW","rawCompareWith":"","facetExpression":"`entityName`","guid":"4cef1910-5c16-9727-ce2f-0a0e4e78b457","routerGuid":"82aa2ec3-0b90-9323-6900-71069b9f1811","messages":[],"facet":"entityName","offset":0,"limit":100,"contents":{"messages":[],"contents":[{"function":"average","attribute":"cpuUserPercent","simple":true}]}}}


i want to get each name with its average using logstash.
Thank you

Popular posts from this blog

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem: New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf). Solution is as follows: Right click on "My Computer". Select "Properties". Go to "Advanced" tab. Click on "Environment Variables". Delete "HOME" variable from User / System variables.

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance: Consumer group ID : Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets , hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name . Skewed : A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed. Spread : Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above