Skip to main content

ElasticSearch pipeline bucket selector aggregation

ElasticSearch has a concept of bucket selection generated from aggregation.
This works as a pipeline, where first aggregation generates buckets, and then bucket selection further filters out buckets.

We have an ElasticSearch index 'daily_reports', where a row represents a particular version of report.
When a report is created a new row is inserted in the index with a new 'reportId' field value and 'publishDate' field representing the UNIX timestamp.
Each report/row has multiple other fields representing properties of the report, for e.g., 'title', 'activity', 'reportStatus', 'reportLevel', etc.
When the report is edited/deleted, a new row is inserted into the index, with same 'reportId', but different '_id', 'publishDate', 'reportLevel' etc.

Now if user wants to get the latest version for each report matching a particular filter criterion (reportLevel = Monitoring AND reportStatus = 1), we can get it using following query:

POST daily_reports/reports/_search?filter_path=aggregations
{
   "size": 0,
   "aggs": {
      "groupedByReportId": {
         "terms": {
            "field": "reportId",
            "size": 9999,
            "order": {
               "latest_date": "desc"
            }
         },
         "aggs": {
            "latest_date": {
               "max": {
                  "field": "publishDate"
               }
            },
            "latest_report": {
               "top_hits": {
                  "size": 1,
                  "sort": [
                     {
                        "publishDate": {
                           "order": "desc"
                        }
                     }
                  ]
               }
            },
            "filters": {
               "filter": {
                  "bool": {
                     "must": [
                        {
                           "term": {
                              "reportLevel": "Monitoring"
                           }
                        },
                        {
                           "term": {
                              "reportStatus": 1
                           }
                        }
                     ],
                     "should": []
                  }
               },
               "aggs": {
                  "latest_publish_date": {
                     "max": {
                        "field": "publishDate"
                     }
                  }
               }
            },
            "should_we_consider": {
               "bucket_selector": {
                  "buckets_path": {
                     "latest_publish_date_report": "latest_date",
                     "latest_publish_date_filtered": "filters>latest_publish_date"
                  },
                  "script": "latest_publish_date_report == latest_publish_date_filtered"
               }
            }
         }
      }
   }
}

We restrict the result to contain only aggregations, and not search hits using
"size": 0,

First a terms aggregation aggregates all the reports belonging to the same 'reportId'.
The 'size' parameter defines how many term buckets should be returned out of the overall terms list. By default, the node coordinating the search process will request each shard to provide its own top size term buckets and once all shards respond, it will reduce the results to the final list that will then be returned to the client. This means that if the number of unique terms is greater than size, the returned list is slightly off and not accurate (it could be that the term counts are slightly off and it could even be that a term that should have been in the top size buckets was not returned).
It also orders the buckets in descending order of 'latest_date' aggregation results (i.e. most latest report will be shown at the top):
"terms": {
   "field": "reportId",
   "size": 9999,
   "order": {
     "latest_date": "desc"
   }
},

Next the 'max aggregartion returns the maximum 'publishDate' value among rows extracted from the aggregated documents. This is required because buckets_path can't access a value directly from 'top_hits' aggregation:
"latest_date": {
  "max": {
    "field": "publishDate"
  }
}

The 'top_hits aggregartion sorts the rows based on 'publishDate' value and returns the top-most row:
"top_hits": {
  "size": 1,
    "sort": [
      {
        "publishDate": {
          "order": "desc"
        }
      }
    ]
  }
},

'filters aggregartion filters the records based on filter criterion and then 'max' sub-aggregation returns the maximum 'publishDate' value (which is then directly used as buckets_path variable):
"filters": {
  "filter": {
    "bool": {
      "must": [
        {
          "term": {
            "threatLevel": "Monitoring"
          }
        },
        {
          "term": {
            "reportStatus": 1
          }
        }
      ],
      "should": []
    }
  },
  "aggs": {
    "latest_publish_date": {
      "max": {
        "field": "publishDate"
      }
    }
  }
}

Finally the 'bucket_selector pipeline aggregartion selects only those rows for which 'latest_date' aggregation value is same as that of 'filters>latest_publish_date' aggregation value:
"should_we_consider": {
  "bucket_selector": {
    "buckets_path": {
      "latest_publish_date_report": "latest_date",
      "latest_publish_date_filtered": "filters>latest_publish_date"
    },
    "script": "latest_publish_date_report == latest_publish_date_filtered"
  }
}

The result looks like:
{
    "aggregations": {
      "groupedByReportId": {
         "doc_count_error_upper_bound": 0,
         "sum_other_doc_count": 0,
         "buckets": [
            {
               "key": "1d9590fbc8248",
               "doc_count": 3,
               "latest_date": {
                  "value": 1500475062123,
                  "value_as_string": "2017-07-19T14:37:42.123Z"
               },
               "filters": {
                  "doc_count": 3,
                  "latest_publish_date": {
                     "value": 1500475062123,
                     "value_as_string": "2017-07-19T14:37:42.123Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 3,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AV1bSHqEtARK6ulVqCq9",
                           "_score": null,
                           "_source": {
                              "title": "Report 1",
                              "type": "Heads-Up",
                              "threatLevel": "Monitoring",
                              "reportStatus": 1,
                              "activity": "Edited",
                              "publishDate": 1500475062123,
                              "reportId": "1d9590fbc8248",
                              "_id": "AVzPf2EIgkVDCoUgGLmL"
                           },
                           "sort": [
                              1500475062123
                           ]
                        }
                     ]
                  }
               }
            },
            {
               "key": "eb1bacc2-05f1-49f6-a5e9-b2a40cfbe991",
               "doc_count": 1,
               "latest_date": {
                  "value": 1499259615983,
                  "value_as_string": "2017-07-05T13:00:15.983Z"
               },
               "filters": {
                  "doc_count": 1,
                  "latest_publish_date": {
                     "value": 1499259615983,
                     "value_as_string": "2017-07-05T13:00:15.983Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 1,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AV0S1kFpfsv-z4NmmeKK",
                           "_score": null,
                           "_source": {
                              "title": "Report 2",
                              "type": "Incident Closure",
                              "threatLevel": "Monitoring",
                              "reportStatus": 1,
                              "activity": "Created",
                              "publishDate": 1499259615983,
                              "reportId": "eb1bacc2-05f1-49f6-a5e9-b2a40cfbe991"
                           },
                           "sort": [
                              1499259615983
                           ]
                        }
                     ]
                  }
               }
            },
            {
               "key": "0d75fad5-d865-4456-8d3a-303e3261eb28",
               "doc_count": 1,
               "latest_date": {
                  "value": 1498124863900,
                  "value_as_string": "2017-06-22T09:47:43.900Z"
               },
               "filters": {
                  "doc_count": 1,
                  "latest_publish_date": {
                     "value": 1498124863900,
                     "value_as_string": "2017-06-22T09:47:43.900Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 1,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AVzPM03FgkVDCoUgF9Cf",
                           "_score": null,
                           "_source": {
                              "title": "Test report 3",
                              "type": "Daily Situation",
                              "threatLevel": "Monitoring",
                              "reportStatus": 1,
                              "activity": "Created",
                              "publishDate": 1498124863900,
                              "reportId": "0d75fad5-d865-4456-8d3a-303e3261eb28"
                           },
                           "sort": [
                              1498124863900
                           ]
                        }
                     ]
                  }
               }
            },
            {
               "key": "184204f0-d0fb-4755-9573-2332940d0ac9",
               "doc_count": 5,
               "latest_date": {
                  "value": 1497540223817,
                  "value_as_string": "2017-06-15T15:23:43.817Z"
               },
               "filters": {
                  "doc_count": 5,
                  "latest_publish_date": {
                     "value": 1497540223817,
                     "value_as_string": "2017-06-15T15:23:43.817Z"
                  }
               },
               "latest_report": {
                  "hits": {
                     "total": 5,
                     "max_score": null,
                     "hits": [
                        {
                           "_index": "daily_report",
                           "_type": "reports",
                           "_id": "AVysWmbCgkVDCoUg-8nQ",
                           "_score": null,
                           "_source": {
                              "title": "Test 5",
                              "type": "Daily Situation",
                              "threatLevel": "Monitoring",
                              "reportId": "184204f0-d0fb-4755-9573-2332940d0ac9",
                              "reportStatus": 1,
                              "activity": "Edited",
                              "publishDate": 1497540223817,
                              "_id": "AVysUj08gkVDCoUg-8nP"
                           },
                           "sort": [
                              1497540223817
                           ]
                        }
                     ]
                  }
               }
            }
         ]
      }
   }
}

Comments

Popular posts from this blog

Procedure for name and date of birth change (Pune)

For change of name, the form (scribd) is available free of cost at Government Book Depot (Shaskiya Granthagar), which is located near Collector’s office, next to Saint Helena's School. The postal address is:
Government Photozinco Press Premises and Book Depot,
5, Photozinco Press Road, Pune, MH, 411001.
Wikimapia link

Charges for name or date of birth change, in the Maharashtra Government Gazette:
INR 120.00 per insertion (for two copies of the Gazette)
For backward class applicants: INR 60.00
Charges for extra copy of the Gazette: INR 15.00 per copy (two copies are enough, so you may not want to pay extra for extra copies).

Backward class applicants are required to submit a xerox of caste certificate of old name as issued by the Collector of the District concerned.

Once the form is duly submitted, it normally takes 10 to 15 days for publication of advertisement in the Maharashtra Government Gazette. The Gazette copy reaches to the address filled in the form within next 7 to 15 day…

ElasticSearch max file descriptors too low error

ElasticSearch 5.x requires a minimum of Max file descriptors 65536 and Max virtual memory areas 262144.
It throws an error on start-up if these are set to very low value.
ERROR: bootstrap checks failed max file descriptors [16384] for elasticsearch process is too low, increase to at least [65536] max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
Check current values using:
$ cat /proc/sys/fs/file-max 16384 $ cat /proc/sys/vm/max_map_count 65530 $ ulimit -Hn 16384 $ ulimit -Sn 4096
To fix this, following files need to change/add below settings:
Recommended: Add a new file 99-elastic.conf under /etc/security/limits.d with following settings:
elasticsearch - nofile 800000 elasticsearch - nproc 16384 defaultusername - nofile 800000 defaultusername - nproc 16384 Alternatively, edit /etc/sysctl.conf with following settings:
fs.file-max = 800000 vm.max_map_count=300000

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem:
New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf).
Solution is as follows:
Right click on "My Computer".Select "Properties".Go to "Advanced" tab.Click on "Environment Variables".Delete "HOME" variable from User / System variables.