Welcome to WeaveQ!¶
WeaveQ is a program and module for pivoting and joining across collections of data, with special support for pivoting and joining across JSON files, CSV files and Elasticsearch resultsets.
Using WeaveQ from the command line consists of passing a WeaveQ query string that specifies how to join or pivot across records from multiple data sources based on relationships between fields. Using WeaveQ from its Python API consists of specifying join and pivot behaviour programmatically and using custom callbacks to fetch data from data sources, optionally transform field values and output results. The WeaveQ query parser API can even be used to provide a customised textual query interface.
Getting Started¶
Installing¶
Install WeaveQ using pip (you may need to sudo
):
$ pip install weaveq
Note
WeaveQ only officially supports Linux for now.
Running¶
Pivot from a CSV file to a JSON file to find bikes and cars of the same
colour, writing the output to stdout
:
weaveq -q '#from "csv:bikes.csv" #as b #pivot-to "js:cars.json" #as c #where b.color = c.color'
Run the same query, but write the output to another file:
weaveq -o /path/to/out/file.jsonlines -q '#from "csv:bikes.csv" #as b #pivot-to "js:cars.json" #as c #where b.color = c.color'
Supply a configuration file and use Elasticsearch results as part of the query to join Honda bikes to cars of the same colour:
weaveq -c config.json -q '#from "el:bikes" #as b #filter |make:honda| #join-to "jsl:cars.jsonlines" #as c #where b.color = c.color'
For more details, see Running Queries
The Basics¶
WeaveQ reads data from a set of data sources and uses information you provide about the relationships between these data sources to perform ‘pivot’ or ‘join’ operations.
A pivot operation selects records from one data source based on there being related records in a second data source, and then discards the records from the second data source. A join operation merges records from one data source into related records from a second data source. For a more detailed explanation, see Running Queries
As the name suggests, a data source is a WeaveQ component that retrieves data from an external source for use in join and pivot operations. WeaveQ currently supports 4 data sources for use from within command line queries: JSON lines, JSON, CSV and Elasticsearch.
Note
You can write custom data source components for WeaveQ using the WeaveQ API. For more details, see Querying from Code
WeaveQ always outputs line-separated JSON, either to stdout
or to a file
you specify using the -o
command line option.
Configuring¶
WeaveQ uses a configuration file to control data source settings. Currently, only the Elasticsearch and CSV data sources have settings that can be configured.
As a result, you only need to supply WeaveQ a configuration file if either:
- You want to use Elasticsearch as a data source.
- You don’t want WeaveQ to use the first row of CSV files to determine field names (in the absence of a configuration file, WeaveQ defaults to doing this).
You must specify the configuration as a JSON file and pass it to WeaveQ using
the -c
option. For example:
$ weaveq -c /path/to/config.json -q ...
An example configuration file is shown below:
{
"data_sources" :
{
"elasticsearch" :
{
"hosts" : ["10.1.1.2:9200","10.1.1.3:9200"],
"timeout" : 10,
"use_ssl" : false,
"verify_certs" : false,
"ca_certs" : "/path/to/ca/certs",
"client_cert" : "/path/to/client/cert",
"client_key" : "/path/to/client/key"
}
"csv" :
{
"first_row_contains_field_names" : true
}
}
}
Configuration Item | Description | Required? |
---|---|---|
elasticsearch/hosts | An array of the host names/addresses and ports of Elasticsearch nodes. | Only if using the Elasticsearch data source within a query |
elasticsearch/timeout | Global Elasticsearch timeout. Default = 10 | No |
elasticsearch/use_ssl | Whether or not to use SSL in Elasticsearch communication. Default = false | No |
elasticsearch/verify_certs | Whether or not to verify SSL certificates. Default = false | No |
elasticsearch/ca_certs | Path to CA (certificate authority) certificate files. Default = none | No |
elasticsearch/client_cert | Path to a PEM-formatted SSL client certificate file. Default = none | No |
elasticsearch/client_key | Path to a PEM-formatted SSL client key. Default = none | No |
csv/first_row_names | Whether or not the first row of CSV files should be used to define field names. If not, fields will be named column_n, where n is the index (starting at 0) of the CSV column from which the field was read. Default = true | No |
Running Queries¶
Query Syntax¶
Primer¶
The example below illustrates the basic structure of a WeaveQ query.
Queries always begin with the #from
keyword, denoting the start of
the first or “seed” step that retrieves an initial set of data. This is
followed by one or more subsequent “pivot” or “join” steps.
All steps must specify a data source in the form of a string enclosed in quotes. The format of this string is as follows:
<resource type>:<resource name>
Where <resource type>
is one of the following:
Resource Type | Description | Resource Name |
---|---|---|
csv | A comma-separated values file in UTF-8 encoding. Each row will be treated as a separate record | Path to a CSV file |
js | A JSON file containing an array of objects at its root. Each object in the array will be treated as a separate record | Path to a JSON file |
jsl | A “JSON lines” file containing line-separated JSON objects | Path to a JSON lines file |
el | Elasticsearch query | Elasticsearch index name |
After the data source string, the compulsory #as
keyword is used to
assign an alias to the data source so that it can be referred to later
in the query.
The final part of the step is the #where
clause. This is where the
relationships between fields in the current and previous step’s data
sources are defined so that WeaveQ can determine how to perform the
pivot or join operation.
Field relationships can currently only be expressed in terms of equality
(using the =
operator) or inequality (using the !=
operator).
Multiple field relationships can be expressed using logical and
and
or
operators to create complex expressions. For example:
... #where (b.color = c.color and b.make != c.make) or b.speed = c.speed
You can refer to fields in sub-objects by chaining names separated by dots together:
c.roof.color
Filters¶
For data sources that support it (currently only the Elasticsearch data
source), you can also specify a filter string using the #filter
keyword after the data source alias.
How the filter string is used is specific to the data source. The Elasticsearch data source treats it as an Elasticsearch Query String Query, passing it to an Elasticsearch instance for controlling which documents are returned to WeaveQ. For example, the following will cause Elasticsearch to find all cars made by Honda:
#from "csv:bikes.csv" #as b #pivot-to "el:cars" #as c #filter |make:"honda"| #where b.color = c.color
The filter string is enclosed by pipe (“|”) characters. If you need to
include a pipe in the filter string, escape it with a backslash (i.e.
\|
).
Step Options¶
The join step accepts several options that can be used to modify its default behaviour:
Option | Description | Arguments |
---|---|---|
#exclude-empty | Don’t output records from the step’s data source that haven’t had a record joined to them | None |
#field-name | Specifies the name of the field to store joined records in. If this option is not specified, a default field name of “joined_data” is used | A name for the field (not enclosed in quotes) |
#array | Specifies that the joined field must be an array. If multiple records are eligible to be joined to the same record, specifying this option will result in them being appended to the array. If this option is not specified, only the first record eligible to be joined to another will successfully be joined - subsequent records will be discarded | None |
Options are specified after the #where
clause. For example:
... #where b.color = c.color #field-name car #exclude-empty #array
Query EBNF¶
The EBNF grammar below describes the WeaveQ query syntax, with some minor approximations for the sake of simplicity.
identifier = { alphanum | "_" | "." | "@" | "$" | "?" } ;
literal = '"', { anychar - '"' }, '"' ;
comparison-ops = "=" | "!=" ;
field-relation = identifier, comparison-ops, identifier ;
logical-ops = "and" | "or"
field-expr = { field-relation [logical-ops field-relation] } ;
where-clause = "where", field-expr ;
filter-expr = '|', { anychar - '|' }, '|' ;
source-spec = literal, "#as", identifier, ["#filter", filter-expr] ;
pivot-clause = "#pivot-to", source-spec, where-clause ;
join-options = ["#field-name", identifier], ["#exclude-empty"], ["#array"] ;
join-clause = "#join-to", source-spec, where-clause, join-options ;
process-clause = pivot-clause | join-clause ;
seed-clause = "#from", source-spec ;
query = seed-clause, {process-clause} ;
How Queries Work¶
A query is made up of a set of steps that run one after the other from left to right.
The first step is called a “seed” step. Its only purpose is to retrieve an initial set of records the next step will use as one of its inputs.
All subsequent steps are either “pivot” or “join” steps. They take 2 inputs: the output of the previous step and the output of the step’s data source. They produce 1 output: the result of processing the two inputs according to relationships between record fields as defined in the query. The nature of this output depends on the type of step, the field relationships defined in the query and any options specified for the step.
Field Relationships¶
Relationships between record fields are expressed as logical statements associating fields from the step’s data source with fields from the previous step’s output.
Consider, for example, a step whose data source contains network information about observed IP addresses that is to be correlated with another data source containing IP addresses known to send spam.
The field relationship expression for this step might be as follows:
spam.ip = network.src_ip or spam.ip = network.dest_ip
Currently, WeaveQ only supports relating fields by equality (=
) and
inequality (!=
).
Pivot Steps¶
A pivot step outputs the records from its data source that are related to the records in the output of the previous step according to the field relationships defined.
A pivot step’s output is always a subset of the records provided by its data source. Records in a pivot step’s results never include records from the previous step’s output.
Join Steps¶
A join step merges the records from the output of the previous step with the records from its data source according to the field relationships defined.
By default, join steps will output all records from their data source, even
ones that haven’t had anything joined to them. You can cause the step to only
output records that have resulted in a join by specifying the
#exclude-empty
switch in the query.
WeaveQ will not overwrite existing fields when performing a join. Instead,
the data to be joined will be discarded. As a result, you should make sure
that the name of the field used to contain joined data is either not in use
(using the #field-name
option) or it is treated as an array (using the
#array
option).
Querying from Code¶
Overview¶
There are 2 ways you can interact with WeaveQ programmatically:
- Query API: Construct and execute queries directly from code, supplying
query logic and data by using the Query API (module
weaveq.query
). - Parser API: Run WeaveQ textual queries - stored as strings in memory -
with data from custom sources by using the Parser API (module
weaveq.parser
).
The WeaveQ command line application is a simple wrapper on top of these APIs. It may be helpful to look at its source code on GitHub for a complete example of how to use them. You can also find in-code documentation in the repository that can be built to HTML using Doxygen.
Data Sources¶
As the name suggests, a data source is a WeaveQ component that retrieves data
from an external source for use in join and pivot operations. They are
represented by objects exposing the interface defined by
weaveq.query.DataSource
and invoked when WeaveQ is ready to retrieve data
from the associated query steps. This interface is compatible with both the
Query API and Parser API.
A DataSource
object is passed a URI string and filter string on
initialisation (uri
and filter_string
parameters to __init__
).
How this information is used and the form it must take is for each
particular DataSource
to define. Generally:
- URIs should indicate the type and location of the data to be retrieved by the data source (for example, a web URL or filename).
- Filter strings should specify how to selectively include/exclude data from the data source before passing it to WeaveQ.
Your data sources should be prepared to handle filter_string
arguments
that are None
, and raise an exception of the type
weaveq.wqexception.DataSourceBuildError
if they can’t fall back to
default behaviour. Similarly, if your data sources can’t use filter strings,
they should raise an exception if passed a filter_string
that is not
None
.
1 of 2 methods are called by WeaveQ to retrieve data from the data source:
batch()
or stream()
. The method used is determined by the stream
argument passed to weaveq.query.WeaveQ::execute()
- if true
, the
stream()
method is called; otherwise, the batch()
method is called.
Both must return an iterable object that provides the __getitem__
interface, such as a collections.OrderedDict
or generator iterator.
Data sources should indicate failure by raising a
weaveq.wqexception.DataSourceError
exception.
The difference in behaviour of the two methods - if any - is defined by individual data source objects.
An example toy DataSource
class is illustrated below:
from collections import OrderedDict
from weaveq.query import DataSource
class ExampleDataSource(DataSource):
def __init__(self, uri, filter_string):
super(ExampleDataSource, self).__init__(uri, filter_string)
self.uri = uri
self.filter_string = filter_string
def _load_record(self, line):
result = None
if (self.filter_string is None):
result = OrderedDict()
result["field"] = line
else:
if (line.find(self.filter_string) != -1):
result = collections.OrderedDict()
result["field"] = line
return result
def batch(self):
results = []
for record in self.stream():
results.append(record)
return results
def stream(self):
with open(self.uri, "rb") as source:
for line in source:
record = self._load_record(line)
if (record is not None):
yield self._load_record(line)
Query API¶
A query is a weaveq.query.WeaveQ
object. Calls to its pivot_to
and
join_to
methods are chained to build the query steps, specifying
weaveq.query.DataSource
objects and field relationships as arguments.
A result handler - a callback object exposing the
weaveq.query.ResultHandler
interface - is assigned to the query using the
result_handler()
method. This object will be passed all records resulting
from the query’s execution.
The query is executed by calling the execute()
method.
For example:
from weaveq.query import WeaveQ
from weaveq.relations import F
d1 = ExampleDataSource(1)
d2 = ExampleDataSource(2)
q = WeaveQ(d1).pivot_to(d2, ((F("make") == F("make")) & (F("model") == F("model"))) | (F("color") == F("color")))
q.result_handler(r)
q.execute(stream=True)
The first argument to the pivot_to()
method is a data source object
exposing the weaveq.query.DataSource
interface.
The second argument to the pivot_to()
method specifies the field
relationships. Fields are denoted using weaveq.relations.F
objects,
passing a string containing the field name to the constructor. The ==
and
!=
operators can be used to associated F
objects by equality and
inequality, respectively. Similarly, the &
and |
operators can be
used to AND and OR field relationship expressions - either individual
relationships or sub-expressions - together.
F
objects can also optionally be provided with a “field proxy”. This is
a callback object to which WeaveQ passes the related field name and value
in records retrieved from the data source. The proxy object can return a
different value that WeaveQ will use as if it was the original. This can be
useful for normalising data or mapping it to discrete values to make
comparison easier.
from weaveq.relations import F, FieldProxy
class LowerCaserProxy(FieldProxy):
def __call__(self, name, value):
return value.lower()
field = F("make", proxy = example_proxy)
There are some syntactic restrictions when writing field expressions:
- Always enclose them in curved brackets, i.e.
(F("a") == F("b"))
instead ofF("a") == F("b")
- The order of
F
objects being compared defines which data source they are associated with. An object on the left-hand side of the operator denotes a field in the previous step’s data source; an object on the right-hand side denotes a field in the current step’s data source.
The join_to()
method works in the same way as pivot_to()
, with the
exception that it accepts the following additional parameters to control join
step options: field
(string
, defining the name of the field to join
to), array
(boolean
, specifying whether or not the field to join to
should be a list
) and exclude_empty_joins
(boolean
, specifying
whether or not records that have not resulted in a join should be excluded
from results).
pivot_to()
and join_to()
methods can be chained repeatedly:
q.pivot_to(d2, ...).join_to(d3, ...).pivot_to(d4, ...).join_to(d5, ...)
Parser API¶
The parser API, encapsulated in weaveq.parser.TextQuery
, converts a
string containing a WeaveQ text query to a
weaveq.query.WeaveQ
object. This eliminates the need to build the query -
for example, by expressing field relationships - in code.
However, DataSource
objects are still required to retrieve the data for each
query step and a result handler is required to receive the query results. In
order for a query step’s DataSource
object to be specified from within
the text query itself, rather than provide pre-built objects to the API you
instead provide a data source builder. This is an object exposing the
weaveq.parser.DataSourceBuilder
interface - a callback that is invoked by
the text query compiler to convert a data source URI and filter string into a
DataSource
object.
from weaveq.parser import DataSourceBuilder, TextQuery
from weaveq.wqexception import DataSourceBuildError
class ExampleDataSourceBuilder(DataSourceBuilder):
def __call__(self, source_uri, filter_string):
if (source_uri.endswith(".avro")):
return ExampleAvroFileDataSource(source_uri, filter_string)
elif (source_uri.endswith(".sqlite")):
return ExampleSqliteFileDataSource(source_uri, filter_string)
else:
raise DataSourceBuildError("Unknown file type")
def run_query(query_string):
builder = ExampleDataSourceBuilder()
query_compiler = TextQuery(builder)
compiled_query = query_compiler.compile_query(query_string)
result_handler = ExampleResultHandler()
compiled_query.result_handler(result_handler)
compiled_query.execute()
The WeaveQ command line interface uses this API and so the same query syntax
is supported, with one exception: data source URIs are not separated into
a data type and a location. This is because this functionality is in WeaveQ’s
own DataSourceBuilder
implementation. You can reproduce it easily enough
if you want to - take a look at the datasource.py
source in the
WeaveQ repository to see how
it works.