Skip to main content

ElasticSearch pipeline bucket selector aggregation

ElasticSearch has a concept of bucket selection generated from aggregation.
This works as a pipeline, where first aggregation generates buckets, and then bucket selection further filters out buckets.

We have an ElasticSearch index 'daily_reports', where a row represents a particular version of report.
When a report is created a new row is inserted in the index with a new 'reportId' field value and 'publishDate' field representing the UNIX timestamp.
Each report/row has multiple other fields representing properties of the report, for e.g., 'title', 'activity', 'reportStatus', 'reportLevel', etc.
When the report is edited/deleted, a new row is inserted into the index, with same 'reportId', but different '_id', 'publishDate', 'reportLevel' etc.

Now if user wants to get the latest version for each report matching a particular filter criterion (reportLevel = Monitoring AND reportStatus = 1), we can get it using following query:

POST daily_reports/reports/_search?filter_path=aggregations
{
   "size": 0,
   "aggs": {
      "groupedByReportId": {
         "terms": {
            "field": "reportId",
            "size": 9999,
            "order": {
               "latest_date": "desc"
            }
         },
         "aggs": {
            "latest_date": {
               "max": {
                  "field": "publishDate"
               }
            },
            "latest_report": {
               "top_hits": {
                  "size": 1,
                  "sort": [
                     {
                        "publishDate": {
                           "order": "desc"
                        }
                     }
                  ]
               }
            },
            "filters": {
               "filter": {
                  "bool": {
                     "must": [
                        {
                           "term": {
                              "reportLevel": "Monitoring"
                           }
                        },
                        {
                           "term": {
                              "reportStatus": 1
                           }
                        }
                     ],
                     "should": []
                  }
               },
               "aggs": {
                  "latest_publish_date": {
                     "max": {
                        "field": "publishDate"
                     }
                  }
               }
            },
            "should_we_consider": {
               "bucket_selector": {
                  "buckets_path": {
                     "latest_publish_date_report": "latest_date",
                     "latest_publish_date_filtered": "filters>latest_publish_date"
                  },
                  "script": "latest_publish_date_report == latest_publish_date_filtered"
               }
            }
         }
      }
   }
}

We restrict the result to contain only aggregations, and not search hits using
"size": 0,

First a terms aggregation aggregates all the reports belonging to the same 'reportId'.
The 'size' parameter defines how many term buckets should be returned out of the overall terms list. By default, the node coordinating the search process will request each shard to provide its own top size term buckets and once all shards respond, it will reduce the results to the final list that will then be returned to the client. This means that if the number of unique terms is greater than size, the returned list is slightly off and not accurate (it could be that the term counts are slightly off and it could even be that a term that should have been in the top size buckets was not returned).
It also orders the buckets in descending order of 'latest_date' aggregation results (i.e. most latest report will be shown at the top):
"terms": {
   "field": "reportId",
   "size": 9999,
   "order": {
     "latest_date": "desc"
   }
},

Next the 'max aggregartion returns the maximum 'publishDate' value among rows extracted from the aggregated documents. This is required because buckets_path can't access a value directly from 'top_hits' aggregation:
"latest_date": {
  "max": {
    "field": "publishDate"
  }
}

The 'top_hits aggregartion sorts the rows based on 'publishDate' value and returns the top-most row:
"top_hits": {
  "size": 1,
    "sort": [
      {
        "publishDate": {
          "order": "desc"
        }
      }
    ]
  }
},

'filters aggregartion filters the records based on filter criterion and then 'max' sub-aggregation returns the maximum 'publishDate' value (which is then directly used as buckets_path variable):
"filters": {
  "filter": {
    "bool": {
      "must": [
        {
          "term": {
            "threatLevel": "Monitoring"
          }
        },
        {
          "term": {
            "reportStatus": 1
          }
        }
      ],
      "should": []
    }
  },
  "aggs": {
    "latest_publish_date": {
      "max": {
        "field": "publishDate"
      }
    }
  }
}

Finally the 'bucket_selector pipeline aggregartion selects only those rows for which 'latest_date' aggregation value is same as that of 'filters>latest_publish_date' aggregation value:
"should_we_consider": {
  "bucket_selector": {
    "buckets_path": {
      "latest_publish_date_report": "latest_date",
      "latest_publish_date_filtered": "filters>latest_publish_date"
    },
    "script": "latest_publish_date_report == latest_publish_date_filtered"
  }
}

The result looks like:
{
    "aggregations": {
      "groupedByReportId": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 0,
         "buckets": [
            {
               "key": "1d9590fbc8248",
               "doc_count": 3,
               "latest_date": {
                  "value": 1500475062123,
                  "value_as_string": "2017-07-19T14:37:42.123Z"
               },
               "filters": {
                  "doc_count": 3,
                  "latest_publish_date": {
                     "value": 1500475062123,
                     "value_as_string": "2017-07-19T14:37:42.123Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 3,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AV1bSHqEtARK6ulVqCq9",
                           "_score": null,
                           "_source": {
                              "title": "Report 1",
                              "type": "Heads-Up",
                              "threatLevel": "Monitoring",
                              "reportStatus": 1,
                              "activity": "Edited",
                              "publishDate": 1500475062123,
                              "reportId": "1d9590fbc8248",
                              "_id": "AVzPf2EIgkVDCoUgGLmL"
                           },
                           "sort": [
                              1500475062123
                           ]
                        }
                     ]
                  }
               }
            },
            {
               "key": "eb1bacc2-05f1-49f6-a5e9-b2a40cfbe991",
               "doc_count": 1,
               "latest_date": {
                  "value": 1499259615983,
                  "value_as_string": "2017-07-05T13:00:15.983Z"
               },
               "filters": {
                  "doc_count": 1,
                  "latest_publish_date": {
                     "value": 1499259615983,
                     "value_as_string": "2017-07-05T13:00:15.983Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 1,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AV0S1kFpfsv-z4NmmeKK",
                           "_score": null,
                           "_source": {
                              "title": "Report 2",
                              "type": "Incident Closure",
                              "threatLevel": "Monitoring",
                              "reportStatus": 1,
                              "activity": "Created",
                              "publishDate": 1499259615983,
                              "reportId": "eb1bacc2-05f1-49f6-a5e9-b2a40cfbe991"
                           },
                           "sort": [
                              1499259615983
                           ]
                        }
                     ]
                  }
               }
            },
            {
               "key": "0d75fad5-d865-4456-8d3a-303e3261eb28",
               "doc_count": 1,
               "latest_date": {
                  "value": 1498124863900,
                  "value_as_string": "2017-06-22T09:47:43.900Z"
               },
               "filters": {
                  "doc_count": 1,
                  "latest_publish_date": {
                     "value": 1498124863900,
                     "value_as_string": "2017-06-22T09:47:43.900Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 1,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AVzPM03FgkVDCoUgF9Cf",
                           "_score": null,
                           "_source": {
                              "title": "Test report 3",
                              "type": "Daily Situation",
                              "threatLevel": "Monitoring",
                              "reportStatus": 1,
                              "activity": "Created",
                              "publishDate": 1498124863900,
                              "reportId": "0d75fad5-d865-4456-8d3a-303e3261eb28"
                           },
                           "sort": [
                              1498124863900
                           ]
                        }
                     ]
                  }
               }
            },
            {
               "key": "184204f0-d0fb-4755-9573-2332940d0ac9",
               "doc_count": 5,
               "latest_date": {
                  "value": 1497540223817,
                  "value_as_string": "2017-06-15T15:23:43.817Z"
               },
               "filters": {
                  "doc_count": 5,
                  "latest_publish_date": {
                     "value": 1497540223817,
                     "value_as_string": "2017-06-15T15:23:43.817Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 5,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AVysWmbCgkVDCoUg-8nQ",
                           "_score": null,
                           "_source": {
                              "title": "Test 5",
                              "type": "Daily Situation",
                              "threatLevel": "Monitoring",
                              "reportId": "184204f0-d0fb-4755-9573-2332940d0ac9",
                              "reportStatus": 1,
                              "activity": "Edited",
                              "publishDate": 1497540223817,
                              "_id": "AVysUj08gkVDCoUg-8nP"
                           },
                           "sort": [
                              1497540223817
                           ]
                        }
                     ]
                  }
               }
            }
         ]
      }
   }
}

Comments

Popular posts from this blog

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above ...

Eclipse crashing and not starting

Problem: After restarting Eclipse, it crashes immediately and asks me to check C:\Users\username\Adobe Flash Builder 4.6\.metadata\.log In log it shows following error: !ENTRY org.eclipse.osgi 4 0 2013-02-13 11:53:46.760 !MESSAGE Application error !STACK 1 org.eclipse.swt.SWTError: Cannot initialize Drop     at org.eclipse.swt.dnd.DND.error(DND.java:266)     at org.eclipse.swt.dnd.DND.error(DND.java:227)     at org.eclipse.swt.dnd.DropTarget. (DropTarget.java:142)     at org.eclipse.ui.internal.EditorSashContainer.addDropSupport(EditorSashContainer.java:542)     at org.eclipse.ui.internal.EditorSashContainer.createControl(EditorSashContainer.java:534)     at org.eclipse.ui.internal.EditorAreaHelper. (EditorAreaHelper.java:41)     at org.eclipse.ui.internal.WorkbenchPage.init(WorkbenchPage.java:2507)     at org.eclipse.ui.internal.WorkbenchPage. (Workbench...

Procedure for name and date of birth change (Pune)

For change of name, the form (scribd) is available free of cost at Government Book Depot (Shaskiya Granthagar), which is located near Collector’s office, next to Saint Helena's School. The postal address is: Government Photozinco Press Premises and Book Depot, 5, Photozinco Press Road, Pune, MH, 411001. Wikimapia link Charges for name or date of birth change, in the Maharashtra Government Gazette: INR 120.00 per insertion (for two copies of the Gazette) For backward class applicants: INR 60.00 Charges for extra copy of the Gazette: INR 15.00 per copy (two copies are enough, so you may not want to pay extra for extra copies). Backward class applicants are required to submit a xerox of caste certificate of old name as issued by the Collector of the District concerned. Once the form is duly submitted, it normally takes 10 to 15 days for publication of advertisement in the Maharashtra Government Gazette. The Gazette copy reaches to the address filled in the form within nex...