fiqs

fiqs helps you make queries against Elasticsearch, and more easily consume the results. It is built on top of the official Python Elasticsearch client and the great Elasticsearch DSL library.

You can still dive closer to the Elasticsearch JSON DSL by accessing the Elasticsearch DSL client or even the Elasticsearch python client.

fiqs can help you in the following ways:

  • A helper function can flatten the result dictionary returned by Elasticsearch

  • A model class, a la Django:

    • Automatically generate a mapping
    • Less verbose aggregations and metrics
    • Less verbose filtering (soon)
    • Automatically add missing buckets (soon)

Compatibility

fiqs is compatible with Elasticsearch 5.X and works with both Python 2.7 and Python 3.3

Contributing

The fiqs project is hosted on GitHub

To run the tests on your machine use this command: python setup.py test Some tests are used to generate results output from Elasticsearch. To run them you will need to run a docker container on your machine: docker run -d -p 8200:9200 -p 8300:9300 elasticsearch:5.0.2 and then run py.test -k docker.

Contents

Flatten result

Consuming the results from an Elasticsearch query can be troublesome. fiqs exposes a flatten_result function that transforms an elasticsearch-dsl Result, or a dictionary, into the list of its nodes. You will lose access to some data (doc_count_error_upper_bound, sum_other_doc_count, the hits etc.) so beware.

Here is a basic example with an aggregation and a metric:

print(flatten_result({
    "_shards": {
        ...
    },
    "hits": {
        ...
    },
    "aggregations": {
        "shop": {
            "buckets": [
                {
                    "doc_count": 30,
                    "key": 1,
                    "total_sales": {
                        "value": 12345.0
                    },
                },
                {
                    "doc_count": 20,
                    "key": 2,
                    "total_sales": {
                        "value": 23456.0
                    },
                },
                {
                    "doc_count": 10,
                    "key": 3,
                    "total_sales": {
                        "value": 34567.0
                    },
                },
            ],
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
        },
    },
}))
# [
#     {
#         "shop": 1,
#         "doc_count": 30,
#         "total_sales": 12345.0,
#     },
#     {
#         "shop": 2,
#         "doc_count": 20,
#         "total_sales": 23456.0,
#     },
#     {
#         "shop": 3,
#         "doc_count": 10,
#         "total_sales": 34567.0,
#     },
# ]

flatten_result can handle multiple aggregations on the same level, and nested aggregations. It can also handled nested fields:

print(flatten_result({
    ...
    "aggregations": {
        "products": {
            "doc_count": 1540,
            "product_type": {
                "buckets": [
                    {
                        "avg_product_price": {
                            "value": 179.53889943074003,
                        },
                        "doc_count": 527,
                        "key": "product_type_3",
                    },
                    {
                        "avg_product_price": {
                            "value": 159.18296529968455,
                        },
                        "doc_count": 317,
                        "key": "product_type_2",
                    },
                    {
                        "avg_product_price": {
                            "value": 152.76785714285714,
                        },
                        "doc_count": 280,
                        "key": "product_type_1",
                    },
                ],
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
            },
        },
    }
}))
# [
#     {
#         "avg_product_price": 179.53889943074003,
#         "product_type": "product_type_3",
#         "doc_count": 527,
#     },
#     {
#         "avg_product_price": 159.18296529968455,
#         "product_type": "product_type_2",
#         "doc_count": 317,
#     },
#     {
#         "avg_product_price": 152.76785714285714,
#         "product_type": "product_type_1",
#         "doc_count": 280,
#     },
# ]

A word on reverse nested aggregations

flatten_result cannot distinguish between a nested bucket and a reverse nested aggregation. If you want to flatten an Elasticsearch result with reverse nested aggregations, make sure these aggregations’ names start with reverse_nested:

{
    'aggs': {
        'products': {
            'aggs': {
                'product_id': {
                    'aggs': {
                        'reverse_nested_root': {  # This aggregation starts with `reverse_nested`
                            'aggs': {
                                'avg_price': {
                                    'avg': {
                                        'field': 'price',
                                    },
                                },
                            },
                            'reverse_nested': {},
                        },
                    },
                    'terms': {
                        'field': 'products.product_id',
                    },
                },
            },
            'nested': {
                'path': 'products',
            }
        },
    },
}

Models

fiqs lets you create Model classes, a la Django, which automatically generate an elasticsearch mapping, and allows you to write cleaner queries.

A model is a class inheriting from fiqs.models.Model. It needs to define a doc_type, an index and its fields:

from fiqs import fields, models

class Sale(models.Model):
    index = 'sale_data'
    doc_type = 'sale'

    id = fields.IntegerField()
    shop_id = fields.IntegerField()
    client_id = fields.KeywordField()

    timestamp = fields.DateField()
    price = fields.IntegerField()
    payment_type = fields.KeywordField(choices=['wire_transfer', 'cash', 'store_credit'])

The doc_type will be used for the mapping, the index for the queries. Instead of defining these values as class attributes, you can override the class methods get_index and get_doc_type:

@classmethod
def get_index(cls, *args, **kwargs):
    if not cls.index:
        raise NotImplementedError('Model class should define an index')

    return cls.index

@classmethod
def get_doc_type(cls, *args, **kwargs):
    if not cls.doc_type:
        raise NotImplementedError('Model class should define a doc_type')

    return cls.doc_type

Model fields

This section contains all the API references for fields, including the field options and the field types.

Field options

The following arguments are available to all field types. All are optional, except type.

type

This is a string that tells fiqs which field datatype will be used in Elasticsearch. This option is mandatory.

choices

A list of possible values for the field. fiqs will use it to fill the missing buckets. It can also contains a list of tuples, where the first element is the key, and the second is a ‘pretty key’:

payment_type = fields.KeywordField(choices=[
    ('wire_transfer', _('Wire transfer')),
    ('cash', _('Cash')),
    ('store_credit', _('Store credit')),
])
data

A dictionary containing data used in the aggregations. For the time being, only size is used.

parent

Used for nested documents to define the name of the parent document. For example:

from fiqs import models

class Sale(models.Model):
    ...

    products = fields.NestedField()
    product_id = fields.KeywordField(parent='products')
    ...
    parts = fields.NestedField(parent='products')
    part_id = fields.KeywordField(parent='paths')
storage_field

The name of the field in your Elasticsearch cluster. By default fiqs will use the field’s name. In the case of nested fields, fiqs will use the storage_field as the path.

unit

Not yet used.

verbose_name

A human-readable name for the field. If the verbose name isn’t given, fiqs will use the field’s name in the model. Not yet used.

Field types
TextField

A field with the text Elasticsearch data type.

KeywordField

A field with the keyword Elasticsearch data type.

DateField

A field with the date Elasticsearch data type.

LongField

A field with the long Elasticsearch data type.

IntegerField

A field with the integer Elasticsearch data type.

ShortField

A field with the short Elasticsearch data type.

ByteField

A field with the byte Elasticsearch data type.

DoubleField

A field with the double Elasticsearch data type.

FloatField

A field with the float Elasticsearch data type.

DayOfWeekField

A field inheriting from ByteField. It accepts iso as a keyword argument. Depending on the value of iso, this field will have data and choices matching weekdays or isoweekdays.

HourOfDayField

A field inheriting from ByteField. By default, it will be able to contain values betweek 0 and 23.

BooleanField

A field with the boolean Elasticsearch data type.

NestedField

A field with the nested Elasticsearch data type.

Mapping

Model classes expose a get_mapping class method, that returns a strict and dynamic elasticsearch-dsl Mapping object. You can use it to create or update the mapping in your Elasticsearch cluster:

from elasticsearch import Elasticsearch

client = Elasticsearch(['http://my.cluster.com'])
mapping = MyModel.get_mapping()
client.indices.create(index='my_index', body={'mappings': mapping.to_dict()})

Making queries with fiqs

The FQuery object

fiqs exposes a FQuery object which lets you write less verbose simple queries against ElasticSearch. It is built on top of the elasticsearch-dsl Search object. Here is a quick example of what FQuery can do, compared to elasticsearch-dsl:

from elasticsearch_dsl import Search
from fiqs.aggregations import Sum
from fiqs.query import FQuery

from .models import Sale

# The elasticsearch-dsl way
search = Search(...)
search.aggs.bucket(
    'shop_id', 'terms', field='shop_id',
).bucket(
    'client_id', 'terms', field='client_id',
).metric(
    'total_sales', 'sum', field='price',
)
result = search.execute()

# The FQuery way
search = Search(...)
fquery = FQuery(search).values(
    total_sales=Sum(Sale.price),
).group_by(
    Sale.shop_id,
    Sale.client_id,
)
result = fquery.eval()
Loss of expresiveness

Let’s start with a warning :> FQuery may allow you to write cleaner and more re-usable queries, but at the cost of a loss of expresiveness. For example, you will not be able to have metrics at multiple aggregation levels. You may not be able to use FQuery for all your queries, and that’s OK!

FQuery options

A FQuery object only needs an elasticsearch-dsl object to get started. You may also configure the following options:

  • default_size: the size used by default in aggregations built by this object.
eval call

To execute the Elasticsearch query, you need to call eval on the FQuery object. This call accepts the following arguments:

  • flat: If False, will return the elasticsearch-dsl Result object, without flattening the result. Note that you cannot ask for a flat result if you used computed expressions. True by default.
  • fill_missing_buckets: If False, FQuery will not try to fill the missing buckets. For more details see Filling missing buckets. Note that fiqs cannot fill the missing buckets in non flat mode. True by default.

Values

You need to call values on a FQuery object to specify the metrics you want to use in your request. values accepts both arguments and keyword arguments:

from fiqs.aggregation import Sum, Avg

from .models import Sale

FQuery(search).values(
    Avg(Sale.price),
    total_sales=Sum(Sale.price),
)

In this case, the nodes will contain two keys for the metrics: total_sales, and sale__price__avg, a string representation of the Avg(Sale.price) metric. A values call returns the FQuery object, to allow chaining calls.

fiqs contains several classes, which all take a field as argument, to help you make these metric calls:

Avg

Used for the Elasticsearch avg aggregation.

Cardinality

Used for the Elasticsearch cardinality aggregation

Count

Used if you only want to count the documents present in your search. This aggregation does not change the Elasticsearh request, since it always returns the number of documents in the doc_count.

Max

Used for the Elasticsearch max aggregation

Min

Used for the Elasticsearch min aggregation

Sum

Used for the Elasticsearch sum aggregation

Operations

fiqs lets you query computed fields, created with operations on a model’s fields. For example:

from fiqs.aggregation import Sum

from .models import TrafficCount

FQuery(search).values(
    total_traffic=Addition(
        Sum(TrafficCount.in_count),
        Sum(TrafficCount.out_count),
    ),
    in_traffic_ratio=Ratio(
        Sum(TrafficCount.in_count),
        Addition(
            Sum(TrafficCount.in_count),
            Sum(TrafficCount.out_count),
        ),
    ),
)

The three existing operations are Addition, Subtraction and Ratio. Do note that these operations cannot be used in non-flat mode. For example this will not work:

fquery = FQuery(search).values(
    total_traffic=Addition(
        Sum(TrafficCount.in_count),
        Sum(TrafficCount.out_count),
    ),
).group_by(
    TrafficCount.shop_id,
)
results = fquery.eval(flat=False)  # Will raise an exception
ReverseNested

The ReverseNested class lets you make reverse nested aggregation. It takes as a first argument the path for the reverse nested aggregation (it can be empty) and a list of expressions:

class Sale(models.Model):
    price = fields.IntegerField()

    products = fields.NestedField()
    product_id = fields.KeywordField(parent='products')

    parts = fields.NestedField(parent='products')
    part_id = fields.KeywordField(parent='parts')

# Number of sales by product_id
FQuery(search).values(
    ReverseNested(
        '',
        Count(Sale),
    ),
).group_by(
    Sale.product_id,
)
# Number of products by part_id
FQuery(search).values(
    ReverseNested(
        Sale.products,  # You can give a field instead of a string
        Count(Sale.products),  # Or `Count(Sale)`, both work
    ),
).group_by(
    Sale.product_id,
    Sale.part_id,
)
# Total and average price by product id
FQuery(search).values(
    ReverseNested(
        Sale,  # Or `''`, both work
        avg_sale_price=Avg(Sale.price),
        total_sale_price=Sum(Sale.price),
    ),
).group_by(
    Sale.product_id,
)

Group by

You can call group_by on a FQuery object to add aggregations. Like values, group_by returns the FQuery object, to allow chaining. fiqs lets you build only one aggregation, which can be as deep as you need it to be. In a group_by call, you can use any fiqs Field, or Field subclass, object. fiqs also offers Field subclasses that help you configure your aggregation:

FieldWithChoices

A FieldWithChoices takes as argument an existing field, and a list of choice:

FieldWithChoices(Sale.shop_id, choices=(['Atlanta', 'Phoenix', 'NYC']))

This field is useful if you want to tune the capacity of FQuery to fill the missing buckets.

FieldWithRanges

A FieldWithRanges takes as argument an existing field, with a list of ranges. Ranges can either be a list of dictionaries forming an Elasticsearch range aggregation, or a list of tuples:

ranges = [
    {
        'from': 1,
        'to': 5,
        'key': '1 - 5',
    },
    {
        'from': 5,
        'to': 11,
        'key': '5 - 11',
    },
]
# Equivalent to :
ranges = [
    (1, 5),
    (5, 11),
]
FieldWithRanges(Sale.shop_id, ranges=ranges)

Do note that the from value (or the first tuple value) is included, and the to value (or the second tuple value) is excluded.

DataExtendedField

A DataExtendedField takes as argument an existing field, and a data dictionary:

DataExtendedField(Sale.shop_id, size=5)

This field is useful if you want to to fine tune the aggregation. In the example we changed the size parameter that will be used in the Elasticsearch aggregation.

GroupedField

A GroupedField aims to replicate the behavior of a filters aggregation. It takes as argument an existing field and a dictionary used to build the buckets:

shop_groups = {
    'group_a': [1, 2, 3, ],
    'group_b': [4, 5, 6, ],
}
# Number of Sale objects, grouped according to the `groups` argument
# One bucket will contain the Sale objects with shop_id in [1, 2, 3, ]
# The other bucket will contain the Sale objects with shop_id in [4, 5, 6, ]
fquery = FQuery(search).values(
    Count(Sale),
).group_by(
    GroupedField(Sale.shop_id, groups=groups),
)

Order by

You can call order_by on a FQuery object, to order the Elasticsearch result as you want. order_by returns the FQuery object, to allow chaining. order_by expects a dictionary that will be directly used in the aggregation as a sort:

FQuery(search).values(
    total_sales=Sum(Sale.price),
).group_by(
    Sale.shop_id,
).order_by(
    {'total_sales': 'desc'},
)

In this example, the Elasticsearch result will be ordered by total sales, in descending order.

Executing the query

Calling eval on the Fquery object will execute the Elasticsearch query and return the result.

Form of the result

FQuery will automatically flatten the result returned by Elasticsearch, as detailed here. It will also cast the value, depending on your model’s fields.

Each field may implement a get_casted_value method. FQuery will use this method to cast values returned by Elasticsearch. For example:

class IntegerField(Field):
    def __init__(self, **kwargs):
        super(IntegerField, self).__init__('integer', **kwargs)

    def get_casted_value(self, v):
        return int(v) if v is not None else v

As of today, only the following fields implement this method:

  • LongField, IntegerField, ShortField, ByteField and field inheriting from them cast values as int
  • DoubleField and FloatField cast values as float
  • DateField cast values as datetime, ignoring the milliseconds
Filling missing buckets

By default, FQuery will try to add buckets missing from the Elasticsearch result. FQuery uses several heuristics to determine which buckets are missing, as we will see below. FQuery will fill the group_by values with the missing keys, and the metric values with None.

  • If a field in the group_by defines the choices attribute, FQuery will expect all the choices’ keys to be present as keys in the Elasticsearch buckets:

    # Our model
    class Sale(Model):
        shop_id = fields.IntegerField(choices=(1, 2, 3, ))
        price = fields.IntegerField()
    
    # Our query
    results = FQuery(search).values(
        total_sales=Sum(Sale.price),
    ).group_by(
        Sale.shop_id,
    ).eval()
    
    # Elasticsearch result, notice there is no bucket with shop_id 1
    # {
    #     [...],
    #     "aggregations": {
    #         "shop": {
    #             "buckets": [
    #                 {
    #                     "doc_count": 20,
    #                     "key": 2,
    #                     "total_sales": {
    #                         "value": 123,
    #                     },
    #                 },
    #                 {
    #                     "doc_count": 10,
    #                     "key": 3,
    #                     "total_sales": {
    #                         "value": 456,
    #                     },
    #                 },
    #             ],
    #             [...],
    #         },
    #     },
    # }
    
    # FQuery result, with the empty line added
    # [
    #     {
    #         'shop_id': 2,
    #         'doc_count': 20,
    #         'total_sales': 123,
    #     },
    #     {
    #         'shop_id': 3,
    #         'doc_count': 10,
    #         'total_sales': 456,
    #     },
    #     {
    #         'shop_id': 1,
    #         'doc_count': 0,
    #         'total_sales': None,
    #     },
    # ]
    
  • If an aggregate in the group_by returns a value when calling choice_keys, FQuery will expect all the keys to be present in the Elasticsearch buckets. Only available with daily DateHistogram for the time being.

  • Finally, FQuery will look at all the values each key takes in the result buckets, and will expect all keys to be present in all buckets:

    # Our model
    class Sale(Model):
        shop_id = fields.IntegerField()
        price = fields.IntegerField()
        payment_type = fields.KeywordField(choices=('wire_transfer', 'cash', ))
    
    # Our query
    results = FQuery(search).values(
        total_sales=Sum(Sale.price),
    ).group_by(
        Sale.payment_type,
        Sale.shop_id,
    ).eval()
    
    # Elasticsearch result
    # {
    #     [...],
    #     "aggregations": {
    #         "payment_type": {
    #             "buckets": [
    #                 {
    #                     "key": "wire_transfer",
    #                     "shop_id": {
    #                         "buckets": [
    #                             {
    #                                 doc_count: 10,
    #                                 "key": 1,
    #                                 "total_sales": {
    #                                     "value": 123,
    #                                 },
    #                             },
    #                         ],
    #                     },
    #                 },
    #                 {
    #                     "key": "cash",
    #                     "shop_id": {
    #                         "buckets": [
    #                             {
    #                                 doc_count: 20,
    #                                 "key": 2,
    #                                 "total_sales": {
    #                                     "value": 456,
    #                                 },
    #                             },
    #                         ],
    #                     },
    #                 },
    #             ],
    #         },
    #     },
    # }
    
    # FQuery result, with two empty lines added
    # [
    #     {
    #         'shop_id': 1,
    #         'doc_count': 10,
    #         'total_sales': 123,
    #         'payment_type': 'wire_transfer',
    #     },
    #     {
    #         'shop_id': 2,
    #         'doc_count': 0,
    #         'total_sales': None,
    #         'payment_type': 'wire_transfer',
    #     },
    #     {
    #         'shop_id': 2,
    #         'doc_count': 20,
    #         'total_sales': 456,
    #         'payment_type': 'cash',
    #     },
    #     {
    #         'shop_id': 1,
    #         'doc_count': 0,
    #         'total_sales': None,
    #         'payment_type': 'cash',
    #     },
    # ]