ElasticSearch has a concept of bucket selection generated from aggregation.
This works as a pipeline, where first aggregation generates buckets, and then bucket selection further filters out buckets.
We have an ElasticSearch index 'daily_reports', where a row represents a particular version of report.
When a report is created a new row is inserted in the index with a new 'reportId' field value and 'publishDate' field representing the UNIX timestamp.
Each report/row has multiple other fields representing properties of the report, for e.g., 'title', 'activity', 'reportStatus', 'reportLevel', etc.
When the report is edited/deleted, a new row is inserted into the index, with same 'reportId', but different '_id', 'publishDate', 'reportLevel' etc.
Now if user wants to get the latest version for each report matching a particular filter criterion (reportLevel = Monitoring AND reportStatus = 1), we can get it using following query:
We restrict the result to contain only aggregations, and not search hits using
First a terms aggregation aggregates all the reports belonging to the same 'reportId'.
The 'size' parameter defines how many term buckets should be returned out of the overall terms list. By default, the node coordinating the search process will request each shard to provide its own top size term buckets and once all shards respond, it will reduce the results to the final list that will then be returned to the client. This means that if the number of unique terms is greater than size, the returned list is slightly off and not accurate (it could be that the term counts are slightly off and it could even be that a term that should have been in the top size buckets was not returned).
It also orders the buckets in descending order of 'latest_date' aggregation results (i.e. most latest report will be shown at the top):
Next the 'max aggregartion returns the maximum 'publishDate' value among rows extracted from the aggregated documents. This is required because buckets_path can't access a value directly from 'top_hits' aggregation:
The 'top_hits aggregartion sorts the rows based on 'publishDate' value and returns the top-most row:
'filters aggregartion filters the records based on filter criterion and then 'max' sub-aggregation returns the maximum 'publishDate' value (which is then directly used as buckets_path variable):
Finally the 'bucket_selector pipeline aggregartion selects only those rows for which 'latest_date' aggregation value is same as that of 'filters>latest_publish_date' aggregation value:
The result looks like:
This works as a pipeline, where first aggregation generates buckets, and then bucket selection further filters out buckets.
We have an ElasticSearch index 'daily_reports', where a row represents a particular version of report.
When a report is created a new row is inserted in the index with a new 'reportId' field value and 'publishDate' field representing the UNIX timestamp.
Each report/row has multiple other fields representing properties of the report, for e.g., 'title', 'activity', 'reportStatus', 'reportLevel', etc.
When the report is edited/deleted, a new row is inserted into the index, with same 'reportId', but different '_id', 'publishDate', 'reportLevel' etc.
Now if user wants to get the latest version for each report matching a particular filter criterion (reportLevel = Monitoring AND reportStatus = 1), we can get it using following query:
POST daily_reports/reports/_search?filter_path=aggregations { "size": 0, "aggs": { "groupedByReportId": { "terms": { "field": "reportId", "size": 9999, "order": { "latest_date": "desc" } }, "aggs": { "latest_date": { "max": { "field": "publishDate" } }, "latest_report": { "top_hits": { "size": 1, "sort": [ { "publishDate": { "order": "desc" } } ] } }, "filters": { "filter": { "bool": { "must": [ { "term": { "reportLevel": "Monitoring" } }, { "term": { "reportStatus": 1 } } ], "should": [] } }, "aggs": { "latest_publish_date": { "max": { "field": "publishDate" } } } }, "should_we_consider": { "bucket_selector": { "buckets_path": { "latest_publish_date_report": "latest_date", "latest_publish_date_filtered": "filters>latest_publish_date" }, "script": "latest_publish_date_report == latest_publish_date_filtered" } } } } } }
We restrict the result to contain only aggregations, and not search hits using
"size": 0,
The 'size' parameter defines how many term buckets should be returned out of the overall terms list. By default, the node coordinating the search process will request each shard to provide its own top size term buckets and once all shards respond, it will reduce the results to the final list that will then be returned to the client. This means that if the number of unique terms is greater than size, the returned list is slightly off and not accurate (it could be that the term counts are slightly off and it could even be that a term that should have been in the top size buckets was not returned).
It also orders the buckets in descending order of 'latest_date' aggregation results (i.e. most latest report will be shown at the top):
"terms": { "field": "reportId", "size": 9999, "order": { "latest_date": "desc" } },
"latest_date": { "max": { "field": "publishDate" } }
"top_hits": { "size": 1, "sort": [ { "publishDate": { "order": "desc" } } ] } },
"filters": { "filter": { "bool": { "must": [ { "term": { "threatLevel": "Monitoring" } }, { "term": { "reportStatus": 1 } } ], "should": [] } }, "aggs": { "latest_publish_date": { "max": { "field": "publishDate" } } } }
"should_we_consider": { "bucket_selector": { "buckets_path": { "latest_publish_date_report": "latest_date", "latest_publish_date_filtered": "filters>latest_publish_date" }, "script": "latest_publish_date_report == latest_publish_date_filtered" } }
{ "aggregations": { "groupedByReportId": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "1d9590fbc8248", "doc_count": 3, "latest_date": { "value": 1500475062123, "value_as_string": "2017-07-19T14:37:42.123Z" }, "filters": { "doc_count": 3, "latest_publish_date": { "value": 1500475062123, "value_as_string": "2017-07-19T14:37:42.123Z" } }, "latest_report": { "hits": { "total": 3, "max_score": null, "hits": [ { "_index": "daily_report", "_type": "reports", "_id": "AV1bSHqEtARK6ulVqCq9", "_score": null, "_source": { "title": "Report 1", "type": "Heads-Up", "threatLevel": "Monitoring", "reportStatus": 1, "activity": "Edited", "publishDate": 1500475062123, "reportId": "1d9590fbc8248", "_id": "AVzPf2EIgkVDCoUgGLmL" }, "sort": [ 1500475062123 ] } ] } } }, { "key": "eb1bacc2-05f1-49f6-a5e9-b2a40cfbe991", "doc_count": 1, "latest_date": { "value": 1499259615983, "value_as_string": "2017-07-05T13:00:15.983Z" }, "filters": { "doc_count": 1, "latest_publish_date": { "value": 1499259615983, "value_as_string": "2017-07-05T13:00:15.983Z" } }, "latest_report": { "hits": { "total": 1, "max_score": null, "hits": [ { "_index": "daily_report", "_type": "reports", "_id": "AV0S1kFpfsv-z4NmmeKK", "_score": null, "_source": { "title": "Report 2", "type": "Incident Closure", "threatLevel": "Monitoring", "reportStatus": 1, "activity": "Created", "publishDate": 1499259615983, "reportId": "eb1bacc2-05f1-49f6-a5e9-b2a40cfbe991" }, "sort": [ 1499259615983 ] } ] } } }, { "key": "0d75fad5-d865-4456-8d3a-303e3261eb28", "doc_count": 1, "latest_date": { "value": 1498124863900, "value_as_string": "2017-06-22T09:47:43.900Z" }, "filters": { "doc_count": 1, "latest_publish_date": { "value": 1498124863900, "value_as_string": "2017-06-22T09:47:43.900Z" } }, "latest_report": { "hits": { "total": 1, "max_score": null, "hits": [ { "_index": "daily_report", "_type": "reports", "_id": "AVzPM03FgkVDCoUgF9Cf", "_score": null, "_source": { "title": "Test report 3", "type": "Daily Situation", "threatLevel": "Monitoring", "reportStatus": 1, "activity": "Created", "publishDate": 1498124863900, "reportId": "0d75fad5-d865-4456-8d3a-303e3261eb28" }, "sort": [ 1498124863900 ] } ] } } }, { "key": "184204f0-d0fb-4755-9573-2332940d0ac9", "doc_count": 5, "latest_date": { "value": 1497540223817, "value_as_string": "2017-06-15T15:23:43.817Z" }, "filters": { "doc_count": 5, "latest_publish_date": { "value": 1497540223817, "value_as_string": "2017-06-15T15:23:43.817Z" } }, "latest_report": { "hits": { "total": 5, "max_score": null, "hits": [ { "_index": "daily_report", "_type": "reports", "_id": "AVysWmbCgkVDCoUg-8nQ", "_score": null, "_source": { "title": "Test 5", "type": "Daily Situation", "threatLevel": "Monitoring", "reportId": "184204f0-d0fb-4755-9573-2332940d0ac9", "reportStatus": 1, "activity": "Edited", "publishDate": 1497540223817, "_id": "AVysUj08gkVDCoUg-8nP" }, "sort": [ 1497540223817 ] } ] } } } ] } } }
Comments