MetalPipe: Modules for ETL Pipelines¶
MetalPipe is a set of modules for building configuration-driven, efficient ETL pipelines with a minimum amount of custom code.
Overview¶
MetalPipe is a package of classes and functions that help you write consistent, efficient, configuration-driven ETL pipelines in Python. It is open-source and as simple as possible (but not simpler).
This overview tells you why MetalPipe exists, and how it can help you escape from ETL hell.
Why is it?¶
Tolstoy said that every happy family is the same, but every unhappy family is unhappy in its own way. ETL pipelines are unhappy families.
Why are they so unhappy? Every engineer who does more than one project involving ETL eventually goes through the same stages of ETL grief. First, they think it’s not so bad. Then they do another project and discover that they have to rewrite very similar code. Then they think, “Surely, I could have just written a few library functions and reused that code, saving lots of time.” But when they try to do this, they discover that although their ETL projects are very similar, they are just different enough that their code isn’t reusable. So they resign themselves to rewriting code over and over again. The code is unreliable, difficult to maintain, and usually poorly tested and documented because it’s such a pain to write in the first place. The task of writing ETL pipelines is so lousy that engineering best practices tend to go out the window because the engineer has better things to do.
What is it?¶
MetalPipe is an ETL framework for the real world. It aims to provide structure and consistency to your ETL pipelines, while still allowing you to write bespoke code for all of the weird little idiosyncratic features of your data. It is opinionated without being bossy.
The overall idea of MetalPipe is simple. On the surface, it looks a lot like streaming frameworks such as Spark or Storm. You hook up various tasks in a directed graph called a “pipeline”. The pipeline ingests data from or more places, transforms it, and loads the data somewhere else. But it differs from Spark-like systems in important ways:
- It is agnostic between stream and batch. Batches of data can be turned into streams and vice-versa.
- It is lightweight, requiring no specialized infrastructure or network configuration.
- Its built-in functionality is specifically designed for ETL tasks.
- It is meant to accommodate 90% of your ETL needs entirely by writing configuration files.
What isn’t it?¶
There are many things that MetalPipe is not:
- It is not a Big Data(tm) tool. If you’re handling petabytes of data, you do not want to use MetalPipe.
- It is not suitable for large amounts of computation. If you need to use dataframes to calculate lots of complex statistical information in real-time, this is not the tool for you.
Basically, MetalPipe deliberately makes two trade-offs: (1) it gives up Big Data(tm) for simplicity; and (2) it gives up being a general-purpose analytic tool in favor of being very good at ETL.
MetalPipe pipelines¶
An ETL pipeline in MetalPipe is a series of nodes connected by queues. Data is generated or processed in each node, and the output is placed on a queue to be picked up by downstream nodes.
For the sake of convenience, we distinguish between three types of nodes (although there’s no real difference in their use or implementation):
- Source nodes. These are nodes that generate data and send it to the rest of the pipeline. They might, for example, read data from an external data source such as an API endpoint or a database.
- Worker nodes. The workers process data by picking up messages from their incoming queues. Their output is placed onto any number of outgoing queues to be further processed by downstream nodes.
- Sink nodes. These are worker nodes with no outgoing queue. They will typically perform tasks such as inserting data into a database or generating statistics to be sent somewhere outside the pipeline.
All pipelines are implemented in pure Python (version >=3.5). Each node is
instantiated from a class that inherits from the MetalNode
class. Queues
are never instantiated directly by the user; they are created automatically
whenever two nodes are linked together.
There is a large (and growing) number of specialized MetalNode
subclasses,
each geared toward a specific task. Such tasks include:
- Querying a table in a SQL database and sending the results downstream.
- Making a request to a REST API, paging through the responses until there are no more results.
- Ingesting individual messages from an upstream node and batching them together into a single message, or doing the reverse.
- Reading environment variables.
- Watching a directory for new files and sending the names of those files down the pipeline when they appear.
- Filtering messages, letting them through the pipeline only if a particular test is passed.
All results and messages passed among the nodes must be dictionary-like objects. By default, messages retain any keys and values that were created by upstream nodes.
The goal is for MetalPipe to be “batteries included”, with built-in
MetalNode
subclasses for every common ETL task. But because ETL pipelines
generally have something weird going on somewhere, MetalNode makes it easy to
roll your own node classes.
Nodes are defined in code by instantiating classes that inherit from
MetalNode
. Upon instantiation, the constructor takes the same set of
keyword arguments as you see in the configuration. Nodes are linked together
by the >
operator, as in node_1 > node_2
. After the pipeline has been
built in this way, it is started by calling node.global_start()
on any
of the nodes in the pipeline.
The code corresponding to the configuration file above would look like this:
# Define the nodes using the various subclasses of MetalNode
get_environment_variables =
GetEnvironmentVariables(
environment_variables=['API_KEY', 'API_USER_ID'])
print_variables = PrinterOfThings(prepend='Environment variables: ')
# The '>' operator can also be chained, as in:
# node_1 > node_2 > node_3 > ...
get_environment_variables > print_variables
# Run the pipeline. This command will not block.
get_environment_variables.global_start()
Rolling your own MetalNode
class¶
If there are no built-in MetalNode
classes suitable for your ETL pipeline,
it is easy to write your own.
For example, suppose you want to create a source node for your pipeline that simply emits a user-defined string every few seconds forever. The user would be able to specify the string and the number of seconds to pause after each message has been sent. The class could be defined like so:
class FooEmitter(MetalNode): # inherit from MetalNode
'''
Sends ``self.output_string`` every ``self.interval`` seconds.
'''
def __init__(self, output_string='', interval=1, **kwargs):
self.output_string = output_string
self.interval = interval
super(FooEmitter, self).__init__() # Must call the `MetalNode` __init__
def generator(self):
while True:
time.sleep(self.interval)
yield self.output_string # Output must be yielded, not returned
Let’s look at each part of this class.
The first thing to note is that the class inherits from MetalNode
– this
is the mix-in class that gives the node all of its functionality within the
MetalPipe framework.
The __init__
method should take only keyword arguments, not positional
arguments. This restriction is to guarantee that the configuration files have
names for any options that are specified in the pipeline. In the __init__
function, you should also be sure to accept **kwargs
, because options that
are common to all MetalNode
objects are expected to be there.
After any attributes have been defined, the __init__
method must
invoke the parent class’s constructor through the use of the super
function. Be sure to pass the **kwargs
argument into the function as
shown in the example.
If the node class is intended to be used as a source node, then you need to
define a generator
method. This method can be virtually anything, so long
as it sends its output via a yield
statement.
If you need to define a worker node (that is, a node that accepts input
from a queue), you will provide a process_item
method instead of a
generator
. But the structure of that method is the same, with the single
exception that you will have access to a __message__
attribute which
contains the incoming message data. The structure of a typical process_item
method is shown in the figure.
For example, let’s suppose you want to create a node that is passed a string as a
message, and returns True
if the message has an even number of
characters, False
otherwise. The class definition would look like
this:
class MessageLengthTester(MetalNode):
def __init__(self):
# No particular initialization required in this example
super(MessageLengthTester, self).__init__()
def process_item(self):
if len(self.__message__) % 2 == 0:
yield True
else:
yield False
Composing and configuring MetalNode
objects¶
Warning
The code described in this section is experimental and very unstable. It would be bad to use it for anything important.
Let’s suppose you’ve worked very hard to create the pipeline from the
last example. Now, your boss says that another engineering team wants to
use it, but they want to rename parameters and “freeze” the values of
certain other parameters to specific values. Once that’s done, they want
to use it as just one part of a more complicated MetalPipe
pipeline.
This can be accomplished using a configuration file. When MetalPipe
parses the configuration file, it will dynamically create the desired
class, which can be instantiated and used as if it were a single node in
another pipeline.
The configuration file is written in YAML, and it would look like this:
name: FooMessageTester
nodes:
- name: foo_generator
class FooEmitter
frozen_arguments:
message: foobar
arg_mapping:
interval: foo_interval
- name: length_tester
class: MessageLengthTester
arg_mapping: null
With this file saved as (e.g.) foo_message.yaml
, the following code
will create a FooMessageTester
class and instantiate it:
foo_message_config = yaml.load(open('./foo_message.yaml', 'r').read())
class_factory(foo_message_config)
# At this point, there is now a `FooMessageTester` class
foo_node = FooMessageTester(foo_interval=1)
You can now use foo_node
just as you would any other node. So in
order to run it, you just do:
foo_node.global_start()
Because foo_node
is just another node, you can insert it into a
larger pipeline and reuse it. For example, suppose that other
engineering team wants to add a PrinterOfThings
to the end of the
pipeline. They’d do this:
printer = PrinterOfThings()
foo_node > printer
Quickstart¶
This explains how to install MetalPipe, create a simple configuration file, and execute a pipeline.
Install MetalPipe¶
MetalPipe is installed in the usual way, with pip:
pip install metalpipe
To test your installation, try typing
metalpipe --help
If MetalPipe is installed correctly, you should see a help message.
Write a configuration file¶
You use MetalPipe by (1) writing a configuration file that describes your pipeline, and (2) running the metalpipe
command, specifying the location of your
configuration file. MetalPipe will read the configuration, create the pipeline,
and run it.
The configuration file is written in YAML. It has three parts:
- A list of global variables (optional)
- The nodes and their options (required)
- A list of edges connecting those nodes to each other.
This is a simple configuration file. If you want to, you can copy it into a
file called sample_config.yaml
:
---
pipeline_name: Sample MetalPipe configuration
pipeline_description: Reads some environment variables and prints them
nodes:
get_environment_variables:
class: GetEnvironmentVariables
summary: Gets all the necessary environment variables
options:
environment_variables:
- API_KEY
- API_USER_ID
print_variables:
class: PrinterOfThings
summary: Prints the environment variables to the terminal
options:
prepend: "Environment variables: "
paths:
-
- get_environment_variables
- print_variables
Run the pipeline¶
If you’ve installed MetalPipe and copied this configuration into sample_config.yaml
, then you can execute the pipeline:
metalpipe run --filename sample_config.yaml
The output should look like this (you might also see some log messages):
Environment variables:
{'API_USER_ID': None, 'API_KEY': None}
The MetalPipe pipeline has found the values of two environment variables (API_KEY
and API_USER_ID
) and printed them to the terminal. If those environmet variables have not been set, their values will be None
. But if you were to set any of them, their values would be printed.
Configuration file structure¶
A configuration file starts with two top-level options, pipeline_name
and pipeline_description
. These are optional, and are only used for the user’s convenience.
Below those are two sections: nodes
and paths
. Each nodes
section contains one or more blocks that always have this form:
do_something:
class: node class
summary: optional string describing what this node does
options:
option_1: value of this option
option_2: value of another option
Let’s go through this one line at a time.
Each node block describes a single node in the MetalPipe pipeline. A node
must be given a name, which can be any arbitrary string. This should be a
short, descriptive string describing its action, such as get_environment_variables
or parse_json
, for example. We encourage
you to stick to a clear naming convention. We like nodes to have names of
the form verb_noun
(as in print_name
).
MetalPipe contains a number of node classes, each of which is designed
for a specific type of ETL task. In the sample configuration, we’ve used
the built-in classes GetEnvironmentVariables
and PrinterOfThings
; these are the value following class
. You can also roll your own node classes (we’ll describe how to do this later in the documentation).
Next is a set of keys and values for the various options that are supported by that class. Because each node class does something different,
the options are different as well. In the sample configuration, the
GetEnvironmentVariables
node class requires a list of environment variables to retrieve, so as you would expect, we specify that list under the environment_variables
option. The various options are explained in
the documentation for each class. In addition to the options that are specific to each node, there are also options that are common to every type of node. These will be explained later.
The structure of the pipeline is given in the paths
section, which contains a list of lists. Each list is a set of nodes that are to be linked together in
order. In our example, the paths
value says that
get_environment_variables
will send its output to print_variables
.
Paths can be arbitrarily long.
If you wanted to send the environment variables down two different execution
paths, you add another list to the paths
, like so:
paths:
-
- get_environment_variables
- print_variables
-
- get_environment_variables
- do_something_else
- and_then_do_this
With this set of paths
, the pipeline looks like a very simple tree, with
get_environment_variables
at the root, which branches to
print_variables
and do_something_else
.
When you have written the configuration file, you’re ready to use the
MetalPipe CLI. It accepts a command, followed by some options. As of now, the
commands it accepts are run
, which executes the pipeline, and draw
,
which generates a diagram of the pipeline. The relevant command(s) are:
python metalpipe_cli.py [run | draw] --filename my_sample_config.yaml
The metalpipe
command can generate a pdf file containing a drawing of the pipeline, showing the flow of data through the various nodes. Just speciy draw
instead of run
to generate the diagram. For our simple little pipeline, we get this:
It is also possible to skip using the configuration file and define your pipelines directly in code. In general, it’s better to use the configuration file for a variety of reasons, but you always have the option of doing this in Python.
Node and Data Lifecycle¶
This section describes what’s happening under the hood in a MetalPipe
data pipeline. Most people won’t need to read this section. But if you’re
planning on writing custom classes that inherit from MetalNode
, this
will be helpful.
The Node Lifecycle¶
The MetalNode
class is where the crucial work happens in a pipeline. The
lifecycle of a MetalNode
object comprises several steps.
Instantiating the node and pipeline¶
Recall that when a node is defined in a configuration file, the definition looks like this:
my_node:
class: MyMetalNodeClass
options:
an_option: foo
another_option: bar
The code for any MetalNode
subclass has an __init__
method that has
the following form:
class MyMetalNodeClass(MetalNode):
def __init__(self, an_option=None, another_option=None, **kwargs):
self.an_option = an_option
self.another_option = another_option
super(MyMetalNodeClass, self).__init__(**kwargs)
As you can see, the keyword arguments directly correspond to the keys under
the options
key in the configuration file. When the configuration file is
read by the command-line tool, the class is instantiated and the options
are converted to keyword arguments to be passed to the constructor. Keyword
arguments will typically be a combination of options that are specific to
that class and options that are inherited by any subclass of MetalNode
.
Instantiating the class does not create any input or output queues. That
happens only when two nodes are hooked together. In python code, you can
hook up two or more nodes by using the >
operator, as in:
node_1 > node_2 > node_3
In a configuration file, this is accomplished with the paths
key, like so:
paths:
-
- node_1
- node_2
- node_3
Starting the node¶
To do.
Processing data in the pipeline¶
To do.
Shutting down normally¶
To do.
Shutting down due to error¶
To do.
The data journey¶
REVISE THIS
MetalPipe pipelines are sets of MetalNode
objects connected by MetalPipeQueue
objects. Think of each MetalNode
as a vertex in a directed graph, and each
MetalPipeQueue
as a directed edge.
There are two types of MetalNode
objects. A “source” is a MetalNode
that does not accept incoming data from another MetalNode
. A “processor” is any MetalNode
that is not a “source”. Note that there is nothing in the class definition or object that distinguishes between these two – the only
difference is that processors have a process_item
method, and sources have a generator
method. Other than that, they are identical.
The data journey begins with one or more source nodes. When a source node is started (by calling its start
method), a new thread is created and the node’s generator
method is executed inside the thread. As results from the generator
method are yielded, they are placed on each outgoing MetalPipeQueue
to be picked up by one or more processors downstream.
The data from the source’s generator
is handled by the MetalPipeQueue
object. At its heart, the MetalPipeQueue
is simply a class which has a Python Queue.queue
object as an attribute. The reason we don’t simply use Python Queue
objects is because the MetalPipeQueue
contains some logic that’s useful. In particular:
- It wraps the data into a
MetalPipeMessage
object, which also holds useful metadata including a UUID, the ID of the node that generated the data, and a timestamp. - If the
MetalPipeQueue
receives data that is simply aNone
object, then it is skipped.
Monitoring¶
MetalPipe lets you easily monitor your pipeline, identify bottlenecks, and help diagnose failures.
Logging table¶
While a pipeline is being executed, a table of information will periodically be logged (at the INFO logging level). Each row provide diagnostic information about a single node in the pipeline. This is a typical example:
We’ll go through each column of the table.
The Node
column contains the name of a node. This is the name that was
given in the configuration file as a top-level key in the nodes
section.
If the name is printed in red (as in contacts_epoch_to_timestamp
in the
example), then the node is a “bottleneck”. In order to identify bottlenecks,
MetalPipe periodically polls each node to determine if (1) it input queue is
full and (2) its output queue is not full. If those conditions are frequently
met, then the node is identified as a bottleneck.
Note that being a bottleneck is not necessarily a sign of inefficiency. For any sufficiently long-running pipeline, it is very likely that some node will happen to be the slowest, and it will be considered a bottleneck.
The Class
column simply gives the class of the MetalNode
object, which
tells you what function it is performing.
The Received
, Sent
, and Queued
columns tell you how many messages
are at various stages of processing. The Received
number indicate how
many messages have been procesed by the node, including any message that is
currently being procesed. Sent
gives how many messages have been output
by this node. Finally, Queued
is the number of messages that are on that
nodes incoming queue(s). If there are several incoming queues, then this number
is the sum. Note that for a source node, the value of Received
will always
be zero, and for any sink node, the value of Sent
will be zero.
The Status
column has three possible values: running
, success
,
and error
. Here, success
means that the node has completed its work
and has terminated without raising an error. A node is considered to be
done with its work when its parent nodes (if any) have completed, its incoming
queues are all empty, and it is not processing any messages. An error
is
indicated whenever a node raises an Exception. When this happens, the entire
pipeline is shut down automatically. These status messages are colored yellow,
green, and red respectively.
Finally Time
is the total amount of time the node has spent running. When
it is in a non-running state (either success
or error
), the clock stops.
Treehorn¶
Treehorn is a set of classes for manipulating dictionary- and list-like objects in a declarative style. It is meant to be useful for the sort of tasks required for ETL, such as extracting structured data from JSON objects.
Using Treehorn¶
Treehorn allows you to search for information in a dictionary- or list-like object by specifying conditions. Structures that match those conditions can be returned, or they can be labeled. If they are labeled, you can use those labels to build more complex searches later, or retrieve the data. The style of Treehorn is somewhat like JQuery and similar languages that are good for manipulating tree-like data structures such as web pages.
We’ll explain Treehorn by stepping through an example of how we would extract data from the following JSON blob:
{
"source": "users",
"hash": "Ch8KFgjQj67igOnVto4BELHgwMD7iNfjkQEYlrfjtZAt",
"events": [
{
"appName": "mobileapp",
"browser": {
"name": "Google Chrome",
"version": []
},
"duration": 0,
"created": 1550596005797,
"location": {
"country": "United States",
"state": "Massachusetts",
"city": "Boston"
},
"id": "af6de71b",
"smtpId": null,
"portalId": 537105,
"email": "alice@gmail.com",
"sentBy": {
"id": "befa29c9",
"created": 1550518557458
},
"type": "OPEN",
"filteredEvent": false,
"deviceType": "COMPUTER"
},
{
"appName": "desktopapp",
"browser": {
"name": "Firefox",
"version": []
},
"duration": 0,
"created": 1550596005389,
"location": {
"country": "United States",
"state": "New York",
"city": "New York"
},
"id": "12aadd80",
"smtpId": null,
"portalId": 537105,
"email": "bob@gmail.com",
"sentBy": {
"id": "2cd1e257",
"created": 1550581974777
},
"type": "OPEN",
"filteredEvent": false,
"deviceType": "COMPUTER"
}
]
}
As you can see, this JSON blob is similar to a typical response from a REST API (in fact, this is actually an example from a real REST API, with all personal information deleted).
Let’s suppose you need to extract the email address and corresponding city name for each entry in events
. This example is simple enough that you might not see the usefulness of Treehorn, but it’s complex enough to get a sense of how Treehorn works. Later, we’ll look at circumstances where Treehorn’s declarative style is especially useful.
There are four classes that are important for Treehorn:
Conditions
– These are classes that test a particular location in a tree (e.g. a dictionary) for some condition. Examples of useful conditions are being a dictionary with a certain key, being a non-empty list, having an integer value, and so on.Traversal
– These classes move throughout a tree, recursively applying tests to each node that they visit. Traversals can be upward (toward the root) or downward (toward the leaves).Label
– These are nothing more than strings that are attached to particular locations in the tree. Typically, we apply a label to locations in the tree that match particular conditions.Relations
– Finally, this class represents n-tuples of locations in the tree. For example, if an email address is present in the tree, and the user’s city is present, aRelation
can be used to denote that the person with that email address lives in that city.
The workflow for a typical Treehorn query is that we (1) define some conditions (such as being an email address field); (2) traverse the tree, searching for locations that match those conditions; (3) label those locations; and (4) define a relationship from those labels, which we can use to extract the right information. We’ll gradually build up a query by adding each of these steps one at a time.
Condition objects¶
For this example, let’s suppose you’ve loaded the JSON into a dictionary, like so:
import json
with open('./sample_api_response.json', 'r') as infile:
api_response = json.load(infile)
Let’s extract the email addresses and corresonding cities for each user in the API response. First, we create a couple of Condition
objects using the built-in class HasKey
:
has_email_key = HasKey('email')
has_city_key = HasKey('city')
The HasKey
class is a subclass of MeetsCondition
, all of which are callable and return True
or False
. For example, you could do the following:
d = {'email': 'myemail.com', 'name': 'carol'}
has_email_key(d) # Returns True
has_city_key(d) # Returns False
What if you want to test for two conditions on a single node? MeetsCondition
objects can be combined into larger boolean expressions using &
, |
, and ~
like so:
(has_email_key & has_city_key)(d) # Returns False
(has_email_key & ~ has_city_key)(d) # Returns True
(has_email_key | has_city_key)(d) # Returns True
Traversal objects¶
MeetsCondition
objects aren’t very useful unless they’re combined with traversals. There are two types of traversal classes: GoUp
and GoDown
. Each takes a MeetsCondition
object as a parameter. For example, if you want to search from the root of the tree for every location that is a dictionary with the email
key, the traversal is:
find_email = GoDown(condition=has_email_key) # or GoDown(condition=HasKey('email'))
Similarly for finding places with a city
key:
find_city = GoDown(condition=has_city_key) # or GoDown(condition=HasKey('city'))
If you want to retrieve all of find_city
’s matches, you can use its matches
method, which will yield each match:
for match in has_email_key.matches(api_response):
print(match)
which will yield:
{'id': 'af6de71b', 'portalId': 537105, 'location': {'state': 'Massachusetts', 'city': 'Boston', 'country': 'United States'}, 'type': 'OPEN', 'sentBy': {'id': 'befa29c9', 'created': 1550518557458}, 'appName': 'mobileapp', 'duration': 0,'smtpId': None, 'deviceType': 'COMPUTER', 'created': 1550596005797, 'email': 'alice@gmail.com', 'browser': {'version': [], 'name': 'Google Chrome'}, 'filteredEvent': False}
{'id': '12aadd80', 'portalId': 537105, 'location': {'state': 'New York', 'city': 'New York', 'country': 'United States'}, 'type': 'OPEN', 'sentBy': {'id': '2cd1e257', 'created': 1550581974777}, 'appName': 'desktopapp', 'duration': 0, 'smtpId': None, 'deviceType': 'COMPUTER', 'created': 1550596005389, 'email': 'bob@gmail.com', 'browser': {'version': [], 'name': 'Firefox'}, 'filteredEvent': False}
Examining each of the dictionaries, we see that they do in fact contain an email
key. Note that the traversal does not return the email string itself – we asked only for the dictionary containing the key. This is by design, as we will see soon.
Because we want to retrieve not only the email addresses but also the cities, we need another traversal. Each of the two dictionaries containing the email
key also have a subdictionary that contains a city
key. So we need a second traversal to get that subdictionary. In other words, retrieving the data we need is a two-step process:
- Starting at the root of the tree, we traverse downward until we find a dictionary with the
email
key. - From each of those dictionaries, we go down until we find a dictionary with the
city
key.
Any non-trivial ETL task involving nested dictionary-like objects will require multi-stage traversals like this one. So Treehorn allows you to chain traversals together using the >
operator:
chained_traversal = find_email > find_city
The chained_traversal
says, in effect, “Go down into the tree and find every node that has an email
key. Then, from each of those, continue to go down until you find a node that contains a city
key. In pseudo-code:
For each node_1 starting at the root:
if node_1 has ``email`` key:
for each node_2 starting at node_1:
if node_2 has ``city`` key:
return
So far, we have set set up multi-stage searches for nodes in a tree that satisfy various conditions. Next, we have to extract the right data from those searches. This is where the Label
and Relation
classes come into play.
Labels¶
When nodes are identified that satisfy certain conditions, we will want to label those nodes so that we can extract data from them later. The mechanism for doing this is to use a “label”.
Continuing the example, let’s use the labels “email” and “city” to mark the respective nodes in the two-stage traversal. We do so by adding a label to the traversal chain. Recall that in the previous section, we wrote:
chained_traversal = find_email > find_city
whereas we now have:
chained_traversal = find_email + 'email' > find_city + 'city'
We use +
to add a label, and the label is just a string. Under the hood, Treehorn is instantiating a Label
object, but ordinarily, you shouldn’t have to do that directly.
Relations¶
Lastly, we define a Relation
object to extract the data from our search. In this example, we might think of the search as returning data about people who live in a certain city. So we might name the Relation
“FROM_CITY”.We’ll want to extract the value of the email
key from the node labeled with email
, and similarly with the city
node. This is accomplished by adding a little more syntax:
Relation('FROM_CITY') == (
(find_email + 'email')['email'] > (find_city + 'city')['city'])
After executing that statement, Treehorn will create an object named FROM_CITY
, which can be called on a dictionary to yield the information we want, like so:
for email_city in FROM_CITY(api_response):
print(email_city)
which will give us:
{'city': 'Boston', 'email': 'alice@gmail.com'}
{'city': 'New York', 'email': 'bob@gmail.com'}
Voila!
Summing up¶
Normally, ETL pipelines that extract data from dictionary-like objects involve a lot of loops and hard-coded keypaths. To accomplish the simple task of extracting emails and city names from our sample JSON blob, we’d probably hard-code paths for each specific key and value, and then we’d loop over various levels in the dictionary. This has several disadvantages:
- It leads to brittle code. If the JSON blob changes structure in even very small ways, the hard-coded paths become obsolete and have to be rewritten.
- The code is difficult to understand and debug. Given a whole bunch of nested loops and hard-coded keypaths, it’s very difficult to understand the intent of the code. Errors have to be found by painstakingly stepping through the execution.
- It is very difficult to accommodate JSON blobs with variable structure. Some JSON blobs returned from APIs have unpredictable levels of nesting, for example. Therefore, keypaths cannot be hard-coded and recursive searches have to be written, which are inefficient and difficult to debug.
The approach taken by Treehorn alleviates some of this pain. For example, the GoDown
traversal doesn’t care how many levels down in the tree it must search; so it is often able to cope with inconsistent structures (within reason) without any code changes. It’s also much easier to understand. You can tell from glancing at the code that the intention is to search for a dictionary with a key, and then search from there for lower-level dictionaries with another key, and return the results. Treehorn is also more efficient than writing loops and keypaths because all of its evaluations are lazy – it doesn’t hold partial results in memory any longer than necessary because everything is yielded by generators.
API Documentation¶
Node module¶
The node
module contains the MetalNode
class, which is the foundation
for MetalPipe.
-
class
metalpipe.node.
AggregateValues
(values=False, tail_path=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Does that.
-
class
metalpipe.node.
BatchMessages
(batch_size=None, batch_list=None, counter=0, timeout=5, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
CSVReader
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
CSVToDictionaryList
(**kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
ConstantEmitter
(thing=None, max_loops=5, delay=0.5, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Send a thing every n seconds
-
class
metalpipe.node.
CounterOfThings
(*args, batch=False, get_runtime_attrs=<function no_op>, get_runtime_attrs_args=None, get_runtime_attrs_kwargs=None, runtime_attrs_destinations=None, input_mapping=None, retain_input=True, throttle=0, keep_alive=True, max_errors=0, max_messages_received=None, name=None, input_message_keypath=None, key=None, messages_received_counter=0, prefer_existing_value=False, messages_sent_counter=0, post_process_function=None, post_process_keypath=None, summary='', fixturize=False, post_process_function_kwargs=None, output_key=None, break_test=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
DynamicClassMediator
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
Filter
(test=None, test_keypath=None, value=True, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Applies tests to each message and filters out messages that don’t pass
- Built-in tests:
- key_exists value_is_true value_is_not_none
Example
- {‘test’: ‘key_exists’,
- ‘key’: mykey}
-
class
metalpipe.node.
FunctionOfMessage
(function_name, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
GetEnvironmentVariables
(mappings=None, environment_variables=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
This node reads environment variables and stores them in the message.
The required keyword argument for this node is
environment_variables
, which is a list of – you guessed it! – environment variables. By default, they will be read and stored in the outgoing message under keys with the same names as the environment variables. E.g.FOO_VAR
will be stored in the message{"FOO_BAR": whatever}
.Optionally, you can provide a dictionary to the
mappings
keyword argument, which maps environment variable names to new names. E.g. ifmappings = {"FOO_VAR": "bar_var"}
, then the value ofFOO_VAR
will be stored in the message{"bar_var": whatever}
.If the environment variable is not defined, then its value will be set to
None
.Parameters: - mappings (dict) – An optional dictionary mapping environment variable names to new names.
- environment_variables (list) – A list of environment variable names.
-
class
metalpipe.node.
InsertData
(overwrite=True, overwrite_if_null=True, value_dict=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
LocalDirectoryWatchdog
(directory='.', check_interval=3, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
LocalFileReader
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
MetalNode
(*args, batch=False, get_runtime_attrs=<function no_op>, get_runtime_attrs_args=None, get_runtime_attrs_kwargs=None, runtime_attrs_destinations=None, input_mapping=None, retain_input=True, throttle=0, keep_alive=True, max_errors=0, max_messages_received=None, name=None, input_message_keypath=None, key=None, messages_received_counter=0, prefer_existing_value=False, messages_sent_counter=0, post_process_function=None, post_process_keypath=None, summary='', fixturize=False, post_process_function_kwargs=None, output_key=None, break_test=None, **kwargs)[source]¶ Bases:
object
The foundational class of MetalPipe. This class is inherited by all nodes in a computation graph.
Order of operations: 1. Child class
__init__
function 2.MetalNode
__init__
function 3.preflight_function
(Specified in initialization params) 4.setup
5. startThese methods have the following intended uses:
__init__
Sets attribute values and calls theMetalNode
__init__
method.get_runtime_attrs
Sets any attribute values that are to be determined at runtime, e.g. by checking environment variables or reading values from a database. Theget_runtime_attrs
should return a dictionary of attributes -> values, or elseNone
.setup
Sets the state of theMetalNode
and/or creates any attributes that require information available only at runtime.
Parameters: - send_batch_markers – If
True
, then aBatchStart
marker will be sent when a new input is received, and aBatchEnd
will be sent after the input has been processed. The intention is that a number of items will be emitted for each input received. For example, we might emit a table row-by-row for each input. - get_runtime_attrs – A function that returns a dictionary-like object.
The keys and values will be saved to this
MetalNode
object’s attributes. The function is executed one time, upon starting the node. - get_runtime_attrs_args – A tuple of arguments to be passed to the
get_runtime_attrs
function upon starting the node. - get_runtime_attrs_kwargs – A dictionary of kwargs passed to the
get_runtime_attrs
function. - runtime_attrs_destinations – If set, this is a dictionary mapping
the keys returned from the
get_runtime_attrs
function to the names of the attributes to which the values will be saved. - throttle – For each input received, a delay of
throttle
seconds will be added. - keep_alive – If
True
, keep the node’s thread alive after everything has been processed. - name – The name of the node. Defaults to a randomly generated hash. Note that this hash is not consistent from one run to the next.
- input_mapping – When the node receives a dictionary-like object, this dictionary will cause the keys of the dictionary to be remapped to new keys.
- retain_input – If
True
, then combine the dictionary-like input with the output. If keys clash, the output value will be kept. - input_message_keypath – Read the value in this keypath as the content of the incoming message.
-
add_edge
(target, **kwargs)[source]¶ Create an edge connecting self to target.
This method instantiates the
MetalPipeQueue
object that connects the nodes. Connecting the nodes together consists in (1) adding the queue to the other’sinput_queue_list
oroutput_queue_list
and (2) setting the queue’ssource_node
andtarget_node
attributes.Parameters: target ( MetalNode
) – The node to whichself
will be connected.
-
all_connected
(seen=None)[source]¶ Returns all the nodes connected (directly or indirectly) to
self
. This allows us to loop over all the nodes in a pipeline even if we have a handle on only one. This is used byglobal_start
, for example.Parameters: seen (set) – A set of all the nodes that have been identified as connected to self
.Returns: - All the nodes connected to
self
. This - includes
self
.
Return type: (set of MetalNode
)- All the nodes connected to
-
broadcast
(broadcast_message)[source]¶ Puts the message into all the input queues for all connected nodes.
-
cleanup
()[source]¶ If there is any cleanup (closing files, shutting down database connections), necessary when the node is stopped, then the node’s class should provide a
cleanup
method. By default, the method is just a logging statement.
-
global_start
(prometheus=False, pipeline_name=None, max_time=None, fixturize=False)[source]¶ Starts every node connected to
self
. Mainly, it: 1. callsstart()
on each node #. sets some global variables #. optionally starts some experimental code for monitoring
-
input_queue_size
¶ Return the total number of items in all of the queues that are inputs to this node.
-
is_sink
¶ Tests whether the node is a sink or not, i.e. whether there are no outputs from the node.
Returns: True
if the node has no output nodes,False
otherwise.Return type: (bool)
-
is_source
¶ Tests whether the node is a source or not, i.e. whether there are no inputs to the node.
Returns: True
if the node has no inputs,False
otherwise.Return type: (bool)
-
logjam
¶ Returns the logjam score, which measures the degree to which the node is holding up progress in downstream nodes.
We’re defining a logjam as a node whose input queue is full, but whose output queue(s) is not. More specifically, we poll each node in the
monitor_thread
, and increment a counter if the node is a logjam at that time. This property returns the percentage of samples in which the node is a logjam. Our intention is that if this score exceeds a threshold, the user is alerted, or the load is rebalanced somehow (not yet implemented).Returns: Logjam score Return type: (float)
-
pipeline_finished
¶
-
setup
()[source]¶ For classes that require initialization at runtime, which can’t be done when the class’s
__init__
function is called. TheMetalNode
base class’s setup function is just a logging call.It should be unusual to have to make use of
setup
because in practice, initialization can be done in the__init__
function.
-
start
()[source]¶ Starts the node. This is called by
MetalNode.global_start()
.The node’s main loop is contained in this method. The main loop does the following:
- records the timestamp to the node’s
started_at
attribute. - calls
get_runtime_attrs
(TODO: check if we can deprecate this) - calls the
setup
method for the class (which is a no-op by default) - if the node is a source, then successively yield all the results of
the node’s
generator
method, then exit. - if the node is not a source, then loop over the input queues, getting
the next message. Note that when the message is pulled from the queue,
the
MetalPipeQueue
yields it as a dictionary. - gets either the content of the entire message if the node has no
key
attribute, or the value ofmessage[self.key]
. - remaps the message content if a
remapping
dictionary has been given in the node’s configuration - calls the node’s
process_item
method, yielding back the results. (Note that a single input message may cause the node to yield zero, one, or more than one output message.) - places the results into each of the node’s output queues.
- records the timestamp to the node’s
-
terminate_pipeline
(error=False)[source]¶ This method can be called on any node in a pipeline, and it will cause all of the nodes to terminate if they haven’t stopped already.
Parameters: error (bool) – Not yet implemented.
-
thread_monitor
(max_time=None)[source]¶ This function loops over all of the threads in the pipeline, checking that they are either
finished
orrunning
. If any have had an abnormal exit, terminate the entire pipeline.
-
time_running
¶ Return the number of wall-clock seconds elapsed since the node was started.
-
class
metalpipe.node.
NothingToSeeHere
[source]¶ Bases:
object
Vacuous class used as a no-op message type.
-
class
metalpipe.node.
PrinterOfThings
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
RandomSample
(sample=0.1)[source]¶ Bases:
metalpipe.node.MetalNode
Lets through only a random sample of incoming messages. Might be useful for testing, or when only approximate results are necessary.
-
class
metalpipe.node.
Remapper
(mapping=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
SequenceEmitter
(sequence, *args, max_sequences=1, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Emits
sequence
max_sequences
times, or forever ifmax_sequences
isNone
.
-
class
metalpipe.node.
Serializer
(values=False, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Takes an iterable thing as input, and successively yields its items.
-
class
metalpipe.node.
SimpleTransforms
(missing_keypath_action='ignore', starting_path=None, transform_mapping=None, target_value=None, keypath=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
StreamMySQLTable
(*args, host='localhost', user=None, table=None, password=None, database=None, port=3306, to_row_obj=False, send_batch_markers=True, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
setup
()[source]¶ For classes that require initialization at runtime, which can’t be done when the class’s
__init__
function is called. TheMetalNode
base class’s setup function is just a logging call.It should be unusual to have to make use of
setup
because in practice, initialization can be done in the__init__
function.
-
-
class
metalpipe.node.
StreamingJoin
(window=30, streams=None, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Joins two streams on a key, using exact match only. MVP.
-
class
metalpipe.node.
SubstituteRegex
(match_regex=None, substitute_string=None, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
TimeWindowAccumulator
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Every N seconds, put the latest M seconds data on the queue.
-
class
metalpipe.node.
bcolors
[source]¶ Bases:
object
This class holds the values for the various colors that are used in the tables that monitor the status of the nodes.
-
BOLD
= '\x1b[1m'¶
-
ENDC
= '\x1b[0m'¶
-
FAIL
= '\x1b[91m'¶
-
HEADER
= '\x1b[95m'¶
-
OKBLUE
= '\x1b[94m'¶
-
OKGREEN
= '\x1b[92m'¶
-
UNDERLINE
= '\x1b[4m'¶
-
WARNING
= '\x1b[93m'¶
-
Civis-specific node types¶
This is where any classes specific to the Civis API live.
-
class
metalpipe.node_classes.civis_nodes.
CivisSQLExecute
(*args, sql=None, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, dummy_run=False, query_dict=None, returned_columns=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Execute a SQL statement and return the results.
-
class
metalpipe.node_classes.civis_nodes.
CivisToCSV
(*args, sql=None, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, dummy_run=False, query_dict=None, returned_columns=None, include_headers=True, delimiter=', ', **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Execute a SQL statement and return the results via a CSV file.
-
class
metalpipe.node_classes.civis_nodes.
EnsureCivisRedshiftTableExists
(on_failure='exit', table=None, schema=None, database=None, columns=None, block=True, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node_classes.civis_nodes.
FindValueInRedshiftColumn
(on_failure='exit', table=None, database=None, schema=None, column=None, choice='max', **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node_classes.civis_nodes.
SendToCivis
(*args, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, schema=None, existing_table_rows='append', include_columns=None, dummy_run=False, block=False, max_errors=0, table=None, via_staging_table=False, columns=None, staging_table=None, remap=None, recorded_tables={}, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
cleanup
()[source]¶ Check if we’re using staging tables. If so, copy the staging table into the production table. TODO: options for merge, upsert, append, drop
-
full_table_name
¶
-
Data structures module¶
Data types (e.g. Rows, Records) for ETL.
-
class
metalpipe.utils.data_structures.
BOOL
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
¶ alias of
builtins.bool
-
-
class
metalpipe.utils.data_structures.
DATETIME
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
()¶
-
-
class
metalpipe.utils.data_structures.
DataSourceTypeSystem
[source]¶ Bases:
object
Information about mapping one type system onto another contained in the children of this class.
-
class
metalpipe.utils.data_structures.
DataType
(value, original_type=None, name=None)[source]¶ Bases:
object
Each
DataType
gets apython_cast_function
, which is a function.-
intermediate_type
= None¶
-
python_cast_function
= None¶
-
to_intermediate_type
()[source]¶ Convert the
DataType
to anIntermediateDataType
using its class’sintermediate_type
attribute.
-
type_system
¶ Just for convenience to make the type system an attribute.
-
-
class
metalpipe.utils.data_structures.
FLOAT
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
¶ alias of
builtins.float
-
-
class
metalpipe.utils.data_structures.
INTEGER
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
¶ alias of
builtins.int
-
-
class
metalpipe.utils.data_structures.
IntermediateTypeSystem
[source]¶ Bases:
metalpipe.utils.data_structures.DataSourceTypeSystem
Never instantiate this by hand.
-
class
metalpipe.utils.data_structures.
MYSQL_BOOL
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
¶ alias of
builtins.bool
-
-
class
metalpipe.utils.data_structures.
MYSQL_DATE
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
()¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_ENUM
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
¶ alias of
builtins.str
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER0
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 0¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER1
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 1¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER10
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 10¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER1024
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 1024¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER11
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 11¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER12
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 12¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER128
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 128¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER13
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 13¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER14
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 14¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER15
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 15¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER16
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 16¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER16384
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 16384¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER17
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 17¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER18
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 18¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER19
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 19¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER2
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 2¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER20
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 20¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER2048
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 2048¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER21
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 21¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER22
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 22¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER23
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 23¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER24
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 24¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER25
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 25¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER256
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 256¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER26
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 26¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER27
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 27¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER28
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 28¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER29
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 29¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER3
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 3¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER30
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 30¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER31
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 31¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER32
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 32¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER32768
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 32768¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER4
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 4¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER4096
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 4096¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER5
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 5¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER512
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 512¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER6
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 6¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER64
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 64¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER7
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 7¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER8
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 8¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER8192
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 8192¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER9
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 9¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER_BASE
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
¶ alias of
builtins.int
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR0
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 0¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR1
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 1¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR10
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 10¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR1024
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 1024¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR11
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 11¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR12
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 12¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR128
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 128¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR13
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 13¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR14
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 14¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR15
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 15¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR16
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 16¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR16384
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 16384¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR17
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 17¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR18
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 18¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR19
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 19¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR2
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 2¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR20
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 20¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR2048
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 2048¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR21
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 21¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR22
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 22¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR23
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 23¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR24
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 24¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR25
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 25¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR256
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 256¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR26
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 26¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR27
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 27¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR28
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 28¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR29
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 29¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR3
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 3¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR30
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 30¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR31
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 31¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR32
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 32¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR32768
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 32768¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR4
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 4¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR4096
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 4096¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR5
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 5¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR512
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 512¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR6
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 6¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR64
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 64¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR7
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 7¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR8
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 8¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR8192
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 8192¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR9
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 9¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR_BASE
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
¶ alias of
builtins.str
-
-
class
metalpipe.utils.data_structures.
MySQLTypeSystem
[source]¶ Bases:
metalpipe.utils.data_structures.DataSourceTypeSystem
Each
TypeSystem
gets atype_mapping
static method that takes a string and returns the class in the type system named by that string. For example,int(8)
in a MySQL schema should return theMYSQL_INTEGER8
class.
-
class
metalpipe.utils.data_structures.
Row
(*records, type_system=None)[source]¶ Bases:
object
A collection of
DataType
objects (typed values). They are dictionaries mapping the names of the values to theDataType
objects.
-
class
metalpipe.utils.data_structures.
STRING
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
¶ alias of
builtins.str
-
Network nodes module¶
Classes that deal with sending and receiving data across the interwebs.
-
class
metalpipe.node_classes.network_nodes.
HttpGetRequest
(endpoint_template=None, endpoint_dict=None, protocol='http', retries=5, json=True, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Node class for making simple GET requests.
-
process_item
()[source]¶ The input to this function will be a dictionary-like object with parameters to be substituted into the endpoint string and a dictionary with keys and values to be passed in the GET request.
Three use-cases: 1. Endpoint and parameters set initially and never changed. 2. Endpoint and parameters set once at runtime 3. Endpoint and parameters set by upstream messages
-
-
class
metalpipe.node_classes.network_nodes.
HttpGetRequestPaginator
(endpoint_dict=None, json=True, pagination_get_request_key=None, endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_template_key=None, default_offset_value='', **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Node class for HTTP API requests that require paging through sets of results.
This class handles making HTTP GET requests, determining whether there are additional results, and making additional calls if necessary. A typical case is to have an HTTP request something like this:
http://www.someapi.com/endpoint_name?resultpage=0
with a response like:
{"data": "something", "additional_pages": true, "next_page": 1}
The response contains some data, a flag
additional_pages
for determining whether there are additional results, and a parameter that gets passed to the next request for retrieving the right page of results (next_page
). So the next GET request would be:http://www.someapi.com/endpoint_name?resultpage=1
This process will repeat until
additional_pages
is false.In order to use this node class, you’ll need to provide arguments that tell the node where to look for the equivalent of
additional_pages
andnext_page
.endpoint_template
: The parameteried URL for the API.additional_data_key
: The keypath to the value in the API response that determines whether there are additional pages to request.pagination_key
: The keypath to the value in the API response that contains the value that would be passed to the API to retrieve the next set of values.pagination_get_request_key
: The key in theendpoint_template
that will contain the value of thepagination_key
.
For our simple example, the arguments would be
endpoint_template: http://www.someapi.com/endpoint_name?resultpage={result_page}
additional_data_key: ["additional_pages"]
pagination_key: ["next_page"]
pagination_get_request_key: result_page
In addition to those mandatory arguments, you can also optionally specify an
endpoint_dict
, which contains other values that will be substituted into theendpoint_template
. For example, these APIs often have an option that controls the number of results to provide in each response, like so:http://www.someapi.com/endpoint_name?results={num_results}?resultpage={result_page}
For cases like this, the value of
endpoint_dict
is a dictionary mapping keys from theendpoint_template
to their values. So if you wanted to have ten results per page, you would specify:endpoint_dict = {"num_results": 10}
There can be any number of other parameters specified in the
endpoint_dict
.If there are other keys in the
endpoint_template
that are not provided in theendpoint_dict
, then the node will try to find them in the current message that’s being processed. For example, it is common to have some kind of security token that might be given in an environment variable. If the value of that environment variable has been provided by some upstream node and placed in the keytoken
, then it would be substituted into the URL, provded that theendpoint_template
had a place for it, such as:http://www.someapi.com/endpoint_name?auth_token={token}?resultpage={result_page}
-
class
metalpipe.node_classes.network_nodes.
PaginatedHttpGetRequest
(endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_get_request_key=None, protocol='http', retries=5, default_offset_value='', additional_data_test=<class 'bool'>, calling_node=None)[source]¶ Bases:
object
For handling requests in a semi-general way that require paging through lists of results and repeatedly making GET requests.
MetalPipeMessage module¶
The MetalPipeMesaage
encapsulates the content of each piece of data,
along with some useful metadata.
Trigger module¶
A simple class containing no data, which is intended merely as a trigger, signaling that the downstream node should do something.
Batch module¶
We’ll use markers to delimit batches of things, such as serialized files and that kind of thing.
MetalPipeQueue module¶
These are queues that form the directed edges between nodes.
-
class
metalpipe.node_queue.queue.
MetalPipeQueue
(max_queue_size, name=None)[source]¶ Bases:
object
-
empty
¶
-
License¶
Copyright (C) 2016 Zachary Ernst zac.ernst@gmail.com
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.