Skip to main content

ElasticSearch pipeline bucket selector aggregation

ElasticSearch has a concept of bucket selection generated from aggregation.
This works as a pipeline, where first aggregation generates buckets, and then bucket selection further filters out buckets.

We have an ElasticSearch index 'daily_reports', where a row represents a particular version of report.
When a report is created a new row is inserted in the index with a new 'reportId' field value and 'publishDate' field representing the UNIX timestamp.
Each report/row has multiple other fields representing properties of the report, for e.g., 'title', 'activity', 'reportStatus', 'reportLevel', etc.
When the report is edited/deleted, a new row is inserted into the index, with same 'reportId', but different '_id', 'publishDate', 'reportLevel' etc.

Now if user wants to get the latest version for each report matching a particular filter criterion (reportLevel = Monitoring AND reportStatus = 1), we can get it using following query:

POST daily_reports/reports/_search?filter_path=aggregations
{
   "size": 0,
   "aggs": {
      "groupedByReportId": {
         "terms": {
            "field": "reportId",
            "size": 9999,
            "order": {
               "latest_date": "desc"
            }
         },
         "aggs": {
            "latest_date": {
               "max": {
                  "field": "publishDate"
               }
            },
            "latest_report": {
               "top_hits": {
                  "size": 1,
                  "sort": [
                     {
                        "publishDate": {
                           "order": "desc"
                        }
                     }
                  ]
               }
            },
            "filters": {
               "filter": {
                  "bool": {
                     "must": [
                        {
                           "term": {
                              "reportLevel": "Monitoring"
                           }
                        },
                        {
                           "term": {
                              "reportStatus": 1
                           }
                        }
                     ],
                     "should": []
                  }
               },
               "aggs": {
                  "latest_publish_date": {
                     "max": {
                        "field": "publishDate"
                     }
                  }
               }
            },
            "should_we_consider": {
               "bucket_selector": {
                  "buckets_path": {
                     "latest_publish_date_report": "latest_date",
                     "latest_publish_date_filtered": "filters>latest_publish_date"
                  },
                  "script": "latest_publish_date_report == latest_publish_date_filtered"
               }
            }
         }
      }
   }
}

We restrict the result to contain only aggregations, and not search hits using
"size": 0,

First a terms aggregation aggregates all the reports belonging to the same 'reportId'.
The 'size' parameter defines how many term buckets should be returned out of the overall terms list. By default, the node coordinating the search process will request each shard to provide its own top size term buckets and once all shards respond, it will reduce the results to the final list that will then be returned to the client. This means that if the number of unique terms is greater than size, the returned list is slightly off and not accurate (it could be that the term counts are slightly off and it could even be that a term that should have been in the top size buckets was not returned).
It also orders the buckets in descending order of 'latest_date' aggregation results (i.e. most latest report will be shown at the top):
"terms": {
   "field": "reportId",
   "size": 9999,
   "order": {
     "latest_date": "desc"
   }
},

Next the 'max aggregartion returns the maximum 'publishDate' value among rows extracted from the aggregated documents. This is required because buckets_path can't access a value directly from 'top_hits' aggregation:
"latest_date": {
  "max": {
    "field": "publishDate"
  }
}

The 'top_hits aggregartion sorts the rows based on 'publishDate' value and returns the top-most row:
"top_hits": {
  "size": 1,
    "sort": [
      {
        "publishDate": {
          "order": "desc"
        }
      }
    ]
  }
},

'filters aggregartion filters the records based on filter criterion and then 'max' sub-aggregation returns the maximum 'publishDate' value (which is then directly used as buckets_path variable):
"filters": {
  "filter": {
    "bool": {
      "must": [
        {
          "term": {
            "threatLevel": "Monitoring"
          }
        },
        {
          "term": {
            "reportStatus": 1
          }
        }
      ],
      "should": []
    }
  },
  "aggs": {
    "latest_publish_date": {
      "max": {
        "field": "publishDate"
      }
    }
  }
}

Finally the 'bucket_selector pipeline aggregartion selects only those rows for which 'latest_date' aggregation value is same as that of 'filters>latest_publish_date' aggregation value:
"should_we_consider": {
  "bucket_selector": {
    "buckets_path": {
      "latest_publish_date_report": "latest_date",
      "latest_publish_date_filtered": "filters>latest_publish_date"
    },
    "script": "latest_publish_date_report == latest_publish_date_filtered"
  }
}

The result looks like:
{
    "aggregations": {
      "groupedByReportId": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 0,
         "buckets": [
            {
               "key": "1d9590fbc8248",
               "doc_count": 3,
               "latest_date": {
                  "value": 1500475062123,
                  "value_as_string": "2017-07-19T14:37:42.123Z"
               },
               "filters": {
                  "doc_count": 3,
                  "latest_publish_date": {
                     "value": 1500475062123,
                     "value_as_string": "2017-07-19T14:37:42.123Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 3,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AV1bSHqEtARK6ulVqCq9",
                           "_score": null,
                           "_source": {
                              "title": "Report 1",
                              "type": "Heads-Up",
                              "threatLevel": "Monitoring",
                              "reportStatus": 1,
                              "activity": "Edited",
                              "publishDate": 1500475062123,
                              "reportId": "1d9590fbc8248",
                              "_id": "AVzPf2EIgkVDCoUgGLmL"
                           },
                           "sort": [
                              1500475062123
                           ]
                        }
                     ]
                  }
               }
            },
            {
               "key": "eb1bacc2-05f1-49f6-a5e9-b2a40cfbe991",
               "doc_count": 1,
               "latest_date": {
                  "value": 1499259615983,
                  "value_as_string": "2017-07-05T13:00:15.983Z"
               },
               "filters": {
                  "doc_count": 1,
                  "latest_publish_date": {
                     "value": 1499259615983,
                     "value_as_string": "2017-07-05T13:00:15.983Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 1,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AV0S1kFpfsv-z4NmmeKK",
                           "_score": null,
                           "_source": {
                              "title": "Report 2",
                              "type": "Incident Closure",
                              "threatLevel": "Monitoring",
                              "reportStatus": 1,
                              "activity": "Created",
                              "publishDate": 1499259615983,
                              "reportId": "eb1bacc2-05f1-49f6-a5e9-b2a40cfbe991"
                           },
                           "sort": [
                              1499259615983
                           ]
                        }
                     ]
                  }
               }
            },
            {
               "key": "0d75fad5-d865-4456-8d3a-303e3261eb28",
               "doc_count": 1,
               "latest_date": {
                  "value": 1498124863900,
                  "value_as_string": "2017-06-22T09:47:43.900Z"
               },
               "filters": {
                  "doc_count": 1,
                  "latest_publish_date": {
                     "value": 1498124863900,
                     "value_as_string": "2017-06-22T09:47:43.900Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 1,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AVzPM03FgkVDCoUgF9Cf",
                           "_score": null,
                           "_source": {
                              "title": "Test report 3",
                              "type": "Daily Situation",
                              "threatLevel": "Monitoring",
                              "reportStatus": 1,
                              "activity": "Created",
                              "publishDate": 1498124863900,
                              "reportId": "0d75fad5-d865-4456-8d3a-303e3261eb28"
                           },
                           "sort": [
                              1498124863900
                           ]
                        }
                     ]
                  }
               }
            },
            {
               "key": "184204f0-d0fb-4755-9573-2332940d0ac9",
               "doc_count": 5,
               "latest_date": {
                  "value": 1497540223817,
                  "value_as_string": "2017-06-15T15:23:43.817Z"
               },
               "filters": {
                  "doc_count": 5,
                  "latest_publish_date": {
                     "value": 1497540223817,
                     "value_as_string": "2017-06-15T15:23:43.817Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 5,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AVysWmbCgkVDCoUg-8nQ",
                           "_score": null,
                           "_source": {
                              "title": "Test 5",
                              "type": "Daily Situation",
                              "threatLevel": "Monitoring",
                              "reportId": "184204f0-d0fb-4755-9573-2332940d0ac9",
                              "reportStatus": 1,
                              "activity": "Edited",
                              "publishDate": 1497540223817,
                              "_id": "AVysUj08gkVDCoUg-8nP"
                           },
                           "sort": [
                              1497540223817
                           ]
                        }
                     ]
                  }
               }
            }
         ]
      }
   }
}

Comments

Popular posts from this blog

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem: New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf). Solution is as follows: Right click on "My Computer". Select "Properties". Go to "Advanced" tab. Click on "Environment Variables". Delete "HOME" variable from User / System variables.

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above ...

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance: Consumer group ID : Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets , hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name . Skewed : A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed. Spread : Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread...