Skip to main content

Logstash: Looping through nested JSON in ruby filter

Suppose we have a JSON payload (may be a stream coming from Kafka) that looks like this:
{"regionName":"asia","nodes":[{"node":"router.dc1.singhaiuklimited.com","weight":"0"},{"node":"router.dc2.singhaiuklimited.com","weight":"100"}]}
{"regionName":"asia","nodes":[{"node":"router.dc1.singhaiuklimited.com","weight":"0"},{"node":"router.dc1.singhaiuklimited.com","weight":"0"},{"node":"router.dc2.singhaiuklimited.com","weight":"100"},{"node":"router.dc2.singhaiuklimited.com","weight":"100"}]}
To loop through the nested fields and generate extra fields from the calculations while using Logstash, we can do something like this:
input {
  kafka {
    bootstrap_servers => "kafka.singhaiuklimited.com:9181"
    topics => ["routerLogs"]
    group_id => "logstashConsumerGroup"
    auto_offset_reset => "earliest"
    codec => "json"
    client_id => "logstashConsumerGroupClient"
  }
}

filter {
  ruby {
    code => '
      dc1NodesTraversal = 0
      dc2NodesTraversal = 0
      totalWeight = 0
      arrayLength = 0

      for currObj in event.get("nodes") do
 arrayLength += 1
        node = currObj[node]
        nodeWeight = currObj[weight].to_i

        totalWeight += nodeWeight
        if nodeWeight > 0 && node =~ /dc1/
            dc1NodesTraversal += 1
        elsif nodeWeight > 0 && node =~ /dc2/
            dc2NodesTraversal +=1
        end
      end

      # Below is same loop, but with different syntax, which also works
      # event.get("nodes").each_index { |i|
      #   arrayLength += 1
      #   node = event.get("[nodes]["+i.to_s+"][node]")
      #   nodeWeight = event.get("[nodes]["+i.to_s+"][weight]").to_i
      # 
      #   totalWeight += nodeWeight
      #   if nodeWeight > 0 && node =~ /dc1/
      #       dc1NodesTraversal += 1
      #   elsif nodeWeight > 0 && node =~ /dc2/
      #       dc2NodesTraversal += 1
      #   end
      # }

      event.set("dc1NodesTraversal", dc1NodesTraversal)
      event.set("dc2NodesTraversal", dc2NodesTraversal)
      event.set("totalWeight", totalWeight)
      event.set("arrayLength", arrayLength)
    '
  }
  mutate {
    remove_field => ["message" ]
  }
}

output {
  stdout { codec => rubydebug }
}

Comments

Domen Jesenovec said…
Just wanted to thank you for this post. I spent almost two days trying to figure out how to work with nested documents in Logstash using Ruby filter. Thanks to this post I got a working solution.
Based on your data, what I was expecting will work: event.set('[nodes][i][weight]', 7)
What actually works: event.set('[nodes]['+i.to_s+'][weight]', 7)

Thank You!
Anonymous said…
Hi iam trying to extract some feild and rename the feild from json message.

this is the message
"facets":[{"name":"xrbcppor02d_10.73.29.72_PRD_DALLAS","results":[{"average":10.380978588330544}]},{"name":"xrbcppor02h_10.170.187.11_PRD_WASHING","results":[{"average":9.200648820400238}]},{"name":"xrbcpdb201_10.73.29.68_PRD_DALLAS","results":[{"average":9.16922921245381}]},{"name":"xrbcppor01d_10.73.29.71_PRD_DALLAS","results":[{"average":8.910986583111649}]},{"name":"xrbcppor01h_10.170.187.10_PRD_WASHING","results":[{"average":7.534192990448515}]},{"name":"xrbcpihs04h_10.170.194.134_PRD_WASHING","results":[{"average":3.249048608844563}]},{"name":"xrbcpihs01h_10.170.194.131_PRD_WASHING","results":[{"average":3.240313716864182}]},{"name":"xrbcpihs01d_10.73.73.201_PRD_DALLAS","results":[{"average":3.0964625934759775}]},{"name":"xrbcpihs02h_10.170.194.132_PRD_WASHING","results":[{"average":3.045999195616124}]},{"name":"xrbcpihs02d_10.73.73.202_PRD_DALLAS","results":[{"average":2.9824970687850048}]},{"name":"xrbcpihs04d_10.73.73.204_PRD_DALLAS","results":[{"average":2.9326073820308105}]},{"name":"xrbcpihs03h_10.170.194.133_PRD_WASHING","results":[{"average":2.9221162684893205}]},{"name":"xrbcpihs03d_10.73.73.203_PRD_DALLAS","results":[{"average":2.103803092241287}]}],"totalResult":{"results":[{"average":5.2880065943513594}]},"unknownGroup":{"results":[{"average":null}]},"performanceStats":{"inspectedCount":6817,"omittedCount":0,"matchCount":770,"wallClockTime":88},"metadata":{"eventTypes":["SystemSample"],"eventType":"SystemSample","openEnded":true,"beginTime":"2020-12-30T07:30:00Z","endTime":"2020-12-30T07:35:00Z","beginTimeMillis":1609313400933,"endTimeMillis":1609313700933,"rawSince":"5 MINUTES AGO","rawUntil":"NOW","rawCompareWith":"","facetExpression":"`entityName`","guid":"4cef1910-5c16-9727-ce2f-0a0e4e78b457","routerGuid":"82aa2ec3-0b90-9323-6900-71069b9f1811","messages":[],"facet":"entityName","offset":0,"limit":100,"contents":{"messages":[],"contents":[{"function":"average","attribute":"cpuUserPercent","simple":true}]}}}


i want to get each name with its average using logstash.
Thank you

Popular posts from this blog

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem: New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf). Solution is as follows: Right click on "My Computer". Select "Properties". Go to "Advanced" tab. Click on "Environment Variables". Delete "HOME" variable from User / System variables.

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above ...

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance: Consumer group ID : Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets , hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name . Skewed : A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed. Spread : Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread...