Pythonflow: Dataflow programming for python.¶
Pythonflow is a simple implementation of dataflow programming for python. Users of Tensorflow will immediately be familiar with the syntax.
At Spotify, we use Pythonflow in data preprocessing pipelines for machine learning models because
- it automatically caches computationally expensive operations,
- any part of the computational graph can be easily evaluated for debugging purposes,
- it allows us to distribute data preprocessing across multiple machines.
Installation of Pythonflow¶
This section guides you through the installation of Pythonflow, which only supports python 3.
Installation for users¶
You can install Pythonflow from the command line like so.
$ pip install pythonflow
If you don’t have pip
installed, have a look at this guide.
Installation for developers¶
If you want to contribute to Pythonflow, we recommend you fork the GitHub repository of Pythonflow and clone your fork like so.
$ git clone git@github.com:<your username>/pythonflow.git
Next, create a virtual environment or set up a Conda environment and install Pythonflow in development mode like so.
$ pip install -e /path/to/your/clone/of/pythonflow
Testing your installation¶
To make sure your local installation of Pythonflow works as expected (and to make sure your changes don’t break a test when you submit a pull request), you can run Pythonflow’s unit tests. First, install the requirements needed for tests like so.
$ pip install -r requirements/test.txt
Then run the tests like so.
$ pylint pythonflow
$ py.test --cov --cov-fail-under=100 --cov-report=term-missing
We have also provided a Makefile
so you don’t have to remember the commands for testing and you can run the following instead.
$ make tests
For more details on how to contribute, have a look at GitHub’s developer guide and make sure you have read our guidelines for contributing.
Building the documentation¶
If you want to see what the documentation of your local changes looks like, first install the requirements needed for compiling it like so.
$ pip install -r dev-requirements.txt
Then compile the documentation like so.
$ make docs
Using Pythonflow¶
Pythonflow is a pure python library for dataflow programming. In contrast to the usual control flow paradigm of python, dataflow programming requires two steps. First, we set up a directed acyclic graph (DAG) of operations that represents the computation we want to perform. Second, we evaluate the operations of the DAG by providing input values for some of the nodes of the graph. But learning by example is usually easier than theory. So here we go.
import pythonflow as pf
with pf.Graph() as graph:
a = pf.constant(4)
b = pf.constant(38)
x = a + b
The code above creates a simple graph which adds two numbers. Importantly, none of the computation has been performed yet, and x is an operation that must be evaluated to obtain the result of the addition:
>>> x
<pf.func_op '...' target=<built-in function add> args=<2 items> kwargs=<0 items>>
Although a little cryptic, the output tells us that x
is an operation that wraps the built-in function add
, has two positional arguments, and no keyword arguments. We can evaluate the operation to perform the computation by calling the graph
like so.
>>> graph(x)
42
pythonflow supports all operations of the standard python data model that do not change the state of the operation (think :code:`const member functions <https://isocpp.org/wiki/faq/const-correctness#const-member-fns>`_) in C++) because operations are assumed to be stateless. For example, b = a + 1
is allowed whereas a += 1
is not. However, we do not believe this imposes significant restrictions.
Some background¶
Computations are encoded as a DAG where operations are represented by nodes and dependencies between operations are represented by edges. Operations are usually stateless, i.e. they provide the same output given the same input (see Writing new operations using pf.Operation for an exception). All state information is provided by the context
, which is a mapping from nodes to values.
When Pythonflow evaluates an operation, it will check whether the current context
provides a value for the operation and return it immediately if possible. If the current context
does not provide a value for the operation, Pythonflow will evaluate the dependencies of the operation, evaluate the operation of interest, store the computed value in the context, and return the value.
Referring to operations¶
Operations can be referred to using their python instances or their unique name
attribute. By default, name
is set to a random but unique identifier. The name of an operation can be specified when they are created, e.g.
with pf.Graph() as graph:
a = pf.constant(4)
b = pf.constant(38)
x = pf.add(a, b, name='my_addition')
>>> x.name
'my_addition'
Or the name of an operation can be changed after it has been created (as long as the name is unique within each graph), e.g.
with pf.Graph() as graph:
a = pf.constant(4)
b = pf.constant(38)
x = a + b
x.name = 'my_addition'
Finally, you can change an operation’s name using its set_name
method, e.g.
with pf.Graph() as graph:
a = pf.constant(4)
b = pf.constant(38)
x = (a + b).set_name('my_addition')
Once a name has been set, an operation can be evaluated like so
>>> graph('my_addition')
42
Pythonflow will enforce that names are indeed unique.
with pf.Graph() as graph:
a = pf.constant(4, name='constant1')
b = pf.constant(38, name='constant1')
Traceback (most recent call last):
ValueError: duplicate name 'constant1'
Providing inputs¶
Inputs for the dataflow graph can be provided using placeholders.
with pf.Graph() as graph:
a = pf.placeholder(name='first_input')
b = pf.constant(4)
x = a + b
>>> graph(x, {a: 5})
9
>>> graph(x, {'first_input': 8})
12
>>> graph(x, first_input=7)
11
The latter two options are only available if the operation has been given a sensible name. Pythonflow will make sure that you do not provide inconsistent inputs:
>>> graph(x, {a: 5}, first_input=7)
Traceback (most recent call last):
ValueError: duplicate value for operation '<pf.placeholder 'first_input'>'
And that all necessary placeholders have been specified:
>>> graph(x)
Traceback (most recent call last):
ValueError: missing value for placeholder 'first_input'
Handling sequences¶
Unfortunately, Pythonflow does not support list comprehensions but the same results can be achieved using pf.map_
, pf.list_
, pf.tuple_
, pf.zip_
, pf.sum_
, pf.filter_
and other operations. Suppose we want to find the surnames of all artists whose first name begins with an A
.
with pf.Graph() as graph:
artists = pf.placeholder(name='artists')
filtered = pf.filter_(lambda artist: artist['first'].startswith('A'), artists)
surnames = pf.map_(lambda artist: artist['last'], filtered)
# Convert to a list to evaluate the map call
surnames = pf.list_(surnames)
>>> graph(surnames, artists=[
... {
... 'first': 'Ariana',
... 'last': 'Grande'
... },
... {
... 'first': 'Justin',
... 'last': 'Bieber'
... }
... ])
['Grande']
Adding your own operations¶
Sometimes the operations that come out of the box aren’t enough for your needs and you want to build something more sophisticated. There are three different options for adding new operations and we will cover each in turn.
Turning functions into operations¶
You can use the pf.func_op
class to create an operation from a callable. The syntax is identical to partial
functions except that the arguments are operations rather than values.
import random
random.seed(1)
with pf.Graph() as graph:
uniform = pf.func_op(random.uniform, 0, 1)
scaled_uniform = 10 * uniform
>>> graph([uniform, scaled_uniform])
(0.13436424411240122, 1.3436424411240122)
The example above not only shows how to use existing functions as operations but also illustrates that each operation is evaluated at most once when you call graph
. Consequently, any computationally expensive operations are automatically cached.
Writing new operations using decorators¶
If you only ever intend to use a callable as an operation, you can implement the operation using a decorator:
@pf.opmethod(length=2)
def split_in_two(x):
num = len(x)
return x[:num // 2], x[num // 2:]
with pf.Graph() as graph:
x = pf.placeholder()
y, z = split_in_two(x)
>>> graph([y, z], {x: 'Hello World!'})
('Hello ', 'World!')
You may use the opmethod
decorator with or without parameters. Specifying the length
parameter enables unpacking of operations as illustrated above. However, this means that your operations must not have a parameter called length that you set using a keyword argument (positional arguments are fine). If you are wrapping an existing method that takes a length
argument in a func_op
, use a lambda
function to rename the parameter like so.
def existing_function_you_cannot_change(length):
return 'a' * length
with pf.Graph() as graph:
length = pf.placeholder()
# Rename the keyword argument using a lambda function
y = pf.func_op(lambda length_: existing_function_you_cannot_change(length_), length_=length)
# Positional arguments don't cause any trouble
z = pf.func_op(existing_function_you_cannot_change, length)
>>> graph([y, z], {length: 3})
('aaa', 'aaa')
Writing new operations using pf.Operation
¶
If you want to create stateful operations, you need to dig a bit deeper into pythonflow. For example, stateful operations may be useful when you need to access a database but don’t want to open a new connection every time you send a request. Stateful operations are implemented by inheriting from pf.Operation and implementing the _evaluate
method like so.
import sqlite3
class SqliteOperation(pf.Operation):
def __init__(self, database, query):
# Pass on the query as an operation
super(SqliteOperation, self).__init__(query)
# Open a new database connection
self.database = database
self.connection = sqlite3.connect(self.database)
def _evaluate(self, query):
# The `_evaluate` method takes the same arguments as the `__init__` method
# of the super class. Whereas the `__init__` method of the superclass receives
# operations as arguments, the `__call__` method receives the evaluated
# operations
return self.connection.execute(query)
with pf.Graph() as graph:
query = pf.placeholder(name='query')
response = SqliteOperation(':memory:', query)
>>> graph(response, query='CREATE TABLE Companies (name VARCHAR)')
<sqlite3.Cursor object at ...>
>>> graph(response, query="INSERT INTO Companies (name) VALUES ('Spotify')")
<sqlite3.Cursor object at ...>
>>> graph(response, query="SELECT * FROM Companies").fetchall()
[('Spotify',)]
Conditional operations¶
Sometimes you may want to evaluate different parts of the DAG depending on a condition. For example, you may want to apply the same operations to data but switch between training and validation data like so.
with pf.Graph() as graph:
training_data = pf.placeholder("training")
validation_data = pf.placeholder("validation")
condition = pf.placeholder("condition")
data = pf.conditional(condition, training_data, validation_data)
>>> graph(data, condition=True, training=4)
4
Note that the pf.conditional
operation only evaluates the part of the DAG it requires to return a value. If it evaluated the entire graph, the evaluation above would have raised a ValueError
because we did not provide a value for the placeholder validation_data
.
Try-except-finally operations¶
The try_
operation allows you to evaluate an operation and fall back to alternatives if the operation fails. For example, you may want to handle divisions by zero like so.
with pf.Graph() as graph:
a = pf.placeholder('a')
b = pf.placeholder('b')
c = pf.try_(a / b, [(ZeroDivisionError, "check your inputs")])
>>> graph(c, a=3, b=2)
1.5
>>> graph(c, a=3, b=0)
'check your inputs'
You can also use the finally_
argument to ensure an operation is evaluated irrespective of whether another operation succeeds or fails.
Explicitly controlling dependencies¶
Pythonflow automatically determines the operations it needs to evaluate to return the desired output. But sometimes it is desirable to explicitly specify operations that should be evaluated. For example, you may want to print a value for debugging purposes like so.
with pf.Graph() as graph:
x = pf.placeholder('x')
y = pf.mul(2, x, dependencies=[pf.print_(pf.str_format("placeholder value: {}", x))])
>>> graph(y, x=4)
placeholder value: 4
8
You may also use the context manager control_dependencies
to specifiy explicit dependencies like so.
with pf.Graph() as graph:
x = pf.placeholder('x')
with pf.control_dependencies([pf.print_(pf.str_format("placeholder value: {}", x))]):
y = 2 * x
>>> graph(y, x=9)
placeholder value: 9
18
Assertions¶
When you’re developing your graphs, you probably want to make sure that everything is behaving as you expect. You can check that values conform to your expectations like so.
with pf.Graph() as graph:
mass = pf.placeholder('mass')
height = pf.placeholder('height')
assertions = [
pf.assert_(mass > 0, "mass must be positive but got %f", mass),
pf.assert_(height > 0, "height must be positive but got %f", height)
]
with pf.control_dependencies(assertions):
bmi = mass / height ** 2
>>> graph(bmi, mass=72, height=-1.8)
Traceback (most recent call last):
AssertionError: height must be positive but got -1.800000
To make the definition of graphs less verbose, you can also specify the return value of an assertion should it succeed using the value keyword argument like so.
with pf.Graph() as graph:
mass = pf.placeholder('mass')
height = pf.placeholder('height')
mass = pf.assert_(mass > 0, "mass must be positive but got %f", mass, value=mass)
height = pf.assert_(height > 0, "height must be positive but got %f", height, value=height)
bmi = mass / height ** 2
>>> graph(bmi, mass=72, height=-1.8)
Traceback (most recent call last):
AssertionError: height must be positive but got -1.800000
Profiling and callbacks¶
If your graph doesn’t perform as well as you would like, you can gain some insight into where it’s spending its time by attaching a profiler. For example, the following graph loads an image and applies a few transformations.
import pythonflow as pf
with pf.Graph() as graph:
# Only load the libraries when necessary
imageio = pf.import_('imageio')
ndimage = pf.import_('scipy.ndimage')
np = pf.import_('numpy')
filename = pf.placeholder('filename')
image = (imageio.imread(filename).set_name('imread')[..., :3] / 255.0).set_name('image')
noise_scale = pf.constant(.25, name='noise_scale')
noise = (1 - np.random.uniform(0, noise_scale, image.shape)).set_name('noise')
noisy_image = (image * noise).set_name('noisy_image')
angle = np.random.uniform(-45, 45)
rotated_image = ndimage.rotate(noisy_image, angle, reshape=False).set_name('rotated_image')
profiler = pf.Profiler()
graph('rotated_image', filename='docs/spotify.png', callback=profiler)
Printing the profiler shows the ten most expensive operations. More detailed information can be retrieved by accessing the times
attribute or calling get_slow_operations
.
>>> print(profiler)
<pf.func_op 'rotated_image' target=<function call at 0x7fa6fff3abf8> args=<3 items> kwargs=<1 items>>: 0.014486551284790039
<pf.func_op 'imread' target=<function call at 0x7fa6fff3abf8> args=<2 items> kwargs=<0 items>>: 0.003396749496459961
<pf.func_op '214e8b1e1db94dfa9840d9cc4e510c25' target=<function call at 0x7fa6fff3abf8> args=<4 items> kwargs=<0 items>>: 0.0013699531555175781
<pf.func_op 'image' target=<built-in function truediv> args=<2 items> kwargs=<0 items>>: 0.0005080699920654297
<pf.func_op 'noisy_image' target=<built-in function mul> args=<2 items> kwargs=<0 items>>: 0.00014448165893554688
<pf.func_op 'noise' target=<built-in function sub> args=<2 items> kwargs=<0 items>>: 0.000125885009765625
<pf.func_op '2acc029ccdb34007a826a3af3230a483' target=<function import_module at 0x7fa71a86f488> args=<1 items> kwargs=<0 items>>: 3.838539123535156e-05
<pf.func_op '5b342c50ea0c48d0b136cb6d93fdc579' target=<function import_module at 0x7fa71a86f488> args=<1 items> kwargs=<0 items>>: 2.8133392333984375e-05
<pf.func_op '583f053d792245c4bad181e2430eb8d2' target=<built-in function getitem> args=<2 items> kwargs=<0 items>>: 2.0265579223632812e-05
<pf.func_op '9f39f2b5e0d74fe695099cd78d8ecd11' target=<function import_module at 0x7fa71a86f488> args=<1 items> kwargs=<0 items>>: 1.9788742065429688e-05
Rotating and reading the image from disk are the two most expensive operations.
The profiler is implemented as a callback that is passed to the graph when fetches are evaluated. Callbacks are context managers whose context is entered before an operation is evaluated and exited after the operation has been evaluated. Callbacks must accept exactly two arguments: the operation under consideration and the context. Here is an example that prints some information about the evaluation of the fetches.
import contextlib
@contextlib.contextmanager
def print_summary(operation, context):
print("About to evaluate '%s', %d values in the context." % (operation.name, len(context)))
yield
print("Evaluated '%s', %d values in the context." % (operation.name, len(context)))
>>> graph('rotated_image', filename='docs/spotify.png', callback=print_summary)
About to evaluate '2acc029ccdb34007a826a3af3230a483', 1 values in the context.
Evaluated '2acc029ccdb34007a826a3af3230a483', 2 values in the context.
About to evaluate '60f266f8dd69441eb25dea412a92dbb0', 2 values in the context.
Evaluated '60f266f8dd69441eb25dea412a92dbb0', 3 values in the context.
About to evaluate '9f39f2b5e0d74fe695099cd78d8ecd11', 3 values in the context.
Evaluated '9f39f2b5e0d74fe695099cd78d8ecd11', 4 values in the context.
About to evaluate '7e6579c9d3784500a1dd555e3ea81bde', 4 values in the context.
Evaluated '7e6579c9d3784500a1dd555e3ea81bde', 5 values in the context.
About to evaluate 'imread', 5 values in the context.
Evaluated 'imread', 6 values in the context.
About to evaluate '583f053d792245c4bad181e2430eb8d2', 6 values in the context.
Evaluated '583f053d792245c4bad181e2430eb8d2', 7 values in the context.
About to evaluate 'image', 7 values in the context.
Evaluated 'image', 8 values in the context.
About to evaluate '5b342c50ea0c48d0b136cb6d93fdc579', 8 values in the context.
Evaluated '5b342c50ea0c48d0b136cb6d93fdc579', 9 values in the context.
About to evaluate '748ad66b9338417594015a71486b324f', 9 values in the context.
Evaluated '748ad66b9338417594015a71486b324f', 10 values in the context.
About to evaluate '240dcda966e34cfb957d4b91e4e31a4f', 10 values in the context.
Evaluated '240dcda966e34cfb957d4b91e4e31a4f', 11 values in the context.
About to evaluate 'noise_scale', 11 values in the context.
Evaluated 'noise_scale', 12 values in the context.
About to evaluate 'c44468bd3aac44ddb8b3e8bd2b9eb832', 12 values in the context.
Evaluated 'c44468bd3aac44ddb8b3e8bd2b9eb832', 13 values in the context.
About to evaluate '214e8b1e1db94dfa9840d9cc4e510c25', 13 values in the context.
Evaluated '214e8b1e1db94dfa9840d9cc4e510c25', 14 values in the context.
About to evaluate 'noise', 14 values in the context.
Evaluated 'noise', 15 values in the context.
About to evaluate 'noisy_image', 15 values in the context.
Evaluated 'noisy_image', 16 values in the context.
About to evaluate '08bd356d6d464837a6a367c34591c961', 16 values in the context.
Evaluated '08bd356d6d464837a6a367c34591c961', 17 values in the context.
About to evaluate '8fa2493c78174b2f9d9849afa4e7e1f5', 17 values in the context.
Evaluated '8fa2493c78174b2f9d9849afa4e7e1f5', 18 values in the context.
About to evaluate '6c1d0576cadc418b8f037455fec5c749', 18 values in the context.
Evaluated '6c1d0576cadc418b8f037455fec5c749', 19 values in the context.
About to evaluate 'rotated_image', 19 values in the context.
Evaluated 'rotated_image', 20 values in the context.
Distributed data preparation¶
Training data for machine learning models is often transformed before the learning process starts. But sometimes it is more convenient to generate the training data online rather than precomputing it: Maybe the data we want to present to the model depends on the current state of the model, or maybe we want to experiment with different data preparation techniques without having to rerun pipelines. This document is concerned with helping you speed up the data preparation by distributing it across multiple cores or machines.
Data preprocessing can generally be expressed as sink = map(transformation, source)
, where source
is a generator of input data, transformation
is the transformation to be applied, and sink
is an iterable over the transformed data. We use ZeroMQ to distribute the data processing using a load-balancing message broker illustrated below. Each task (also known as a client) sends one or more messages to the broker from a source
. The broker distributes the messages amongst a set of workers that apply the desired transformation
. Finally, the broker collects the results and forwards them to the relevant task which act as a sink
.
Building an image transformation graph¶
For example, you may want to speed up the process of applying a transformation to an image using the following graph.
import pythonflow as pf
with pf.Graph() as graph:
# Only load the libraries when necessary
imageio = pf.import_('imageio')
ndimage = pf.import_('scipy.ndimage')
np = pf.import_('numpy')
filename = pf.placeholder('filename')
image = (imageio.imread(filename).set_name('imread')[..., :3] / 255.0).set_name('image')
noise_scale = pf.constant(.25, name='noise_scale')
noise = (1 - np.random.uniform(0, noise_scale, image.shape)).set_name('noise')
noisy_image = (image * noise).set_name('noisy_image')
angle = np.random.uniform(-45, 45)
rotated_image = ndimage.rotate(noisy_image, angle, reshape=False).set_name('rotated_image')
Let’s run the default pipeline using the Spotify logo.
context = {'filename': 'docs/spotify.png'}
graph('rotated_image', context)
Because the context keeps track of all computation steps, we have access to the original image, the noisy image, and the rotated image:

Distributing the workload¶
Pythonflow provides a straightforward interface to turn your graph into a processor for distributed data preparation:
from pythonflow import pfmq
backend_address = 'tcp://address-that-workers-should-connect-to'
worker = pfmq.Worker.from_graph(graph, backend_address)
worker.run()
Running the processors is up to you and you can use your favourite framework such as ipyparallel or Foreman.
Consuming the data¶
Once you have started one or more processors, you can create a message broker to facilitate communication between tasks and workers.
broker = pfmq.MessageBroker(backend_address)
broker.run_async()
request = {
'fetches': 'rotated_image',
'context': {'filename': 'docs/spotify.png'}
}
rotated_image = broker.apply(request)
The call to run_async
starts the message broker in a background thread. To avoid serialization overhead, only the explicitly requested fetches
are sent over the wire and the context
is not updated as in the example above.
Calling the consumer directly is useful for debugging, but in most applications you probably want to process more than one example as illustrated below.
iterable = broker.imap(requests)
Using imap
rather than using the built-in map
applied to broker.apply
has significant performance benefits: the message broker will dispatch as many messages as there are connected workers. Using the built-in map
will only use one worker at a given time.
Note
By default, the consumer and processors will use pickle
to (de)serialize all messages. You may want to consider your own serialization format or use msgpack.
Note
Pythonflow does not resend lost messages and your program will not recover if a message is lost.
pythonflow package¶
-
exception
pythonflow.core.
EvaluationError
[source]¶ Bases:
RuntimeError
Failed to evaluate an operation.
-
class
pythonflow.core.
Graph
[source]¶ Bases:
object
Data flow graph constituting a directed acyclic graph of operations.
-
apply
(fetches, context=None, *, callback=None, **kwargs)[source]¶ Evaluate one or more operations given a context.
Note
This function modifies the context in place. Use
context=context.copy()
to avoid the context being modified.Parameters: - fetches (list[str or Operation] or str or Operation) – One or more Operation instances or names to evaluate.
- context (dict or None) – Context in which to evaluate the operations.
- callback (callable or None) – Callback to be evaluated when an operation is evaluated.
- kwargs (dict) – Additional context information keyed by variable name.
Returns: values – Output of the operations given the context.
Return type: Raises: ValueError
– If fetches is not an Operation instance, operation name, or a sequence thereof.ValueError
– If context is not a mapping.
-
static
get_active_graph
(graph=None)[source]¶ Obtain the currently active graph instance by returning the explicitly given graph or using the default graph.
Parameters: graph (Graph or None) – Graph to return or None to use the default graph. Raises: ValueError
– If no Graph instance can be obtained.
-
normalize_context
(context, **kwargs)[source]¶ Normalize a context by replacing all operation names with operation instances.
Note
This function modifies the context in place. Use
context=context.copy()
to avoid the context being modified.Parameters: Returns: normalized_context – Normalized context whose keys are operation instances.
Return type: Raises: ValueError
– If the context specifies more than one value for any operation.ValueError
– If context is not a mapping.
-
normalize_operation
(operation)[source]¶ Normalize an operation by resolving its name if necessary.
Parameters: operation (Operation or str) – Operation instance or name of an operation.
Returns: normalized_operation – Operation instance.
Return type: Raises: ValueError
– If operation is not an Operation instance or an operation name.RuntimeError
– If operation is an Operation instance but does not belong to this graph.KeyError
– If operation is an operation name that does not match any operation of this graph.
-
-
class
pythonflow.core.
Operation
(*args, length=None, graph=None, name=None, dependencies=None, **kwargs)[source]¶ Bases:
object
Base class for operations.
Parameters: - args (tuple) – Positional arguments passed to the _evaluate method.
- kwargs (dict) – Keyword arguments passed to the _evaluate method.
- length (int or None) – Optional number of values returned by the operation. The length only needs to be specified if the operation should support iterable [unpacking](https://www.python.org/dev/peps/pep-3132/).
- graph (Graph or None) – Data flow graph for this operation or None to use the default graph.
- name (str or None) – Name of the operation or None to use a random, unique identifier.
- dependencies (list) – Explicit sequence of operations to evaluate before evaluating this operation.
-
evaluate
(context, callback=None)[source]¶ Evaluate the operation given a context.
Parameters: Returns: value – Output of the operation given the context.
Return type:
-
evaluate_dependencies
(context, callback=None)[source]¶ Evaluate the dependencies of this operation and discard the values.
Parameters:
-
classmethod
evaluate_operation
(operation, context, **kwargs)[source]¶ Evaluate an operation or constant given a context.
-
set_name
(name)[source]¶ Set the name of the operation and update the graph.
Parameters: value (str) – Unique name of the operation.
Returns: self – This operation.
Return type: Raises: ValueError
– If an operation with value already exists in the associated graph.KeyError
– If the current name of the operation cannot be found in the associated graph.
-
pythonflow.core.
call
(func, *args, **kwargs)[source]¶ Call func with positional arguments args and keyword arguments kwargs.
Parameters:
-
pythonflow.core.
control_dependencies
(dependencies, graph=None)[source]¶ Ensure that all dependencies are executed before any operations in this scope.
Parameters: dependencies (list) – Sequence of operations to be evaluted before evaluating any operations defined in this scope.
-
class
pythonflow.core.
func_op
(target, *args, **kwargs)[source]¶ Bases:
pythonflow.core.Operation
Operation wrapper for stateless functions.
Parameters:
-
pythonflow.core.
opmethod
(target=None, **kwargs)[source]¶ Decorator for creating operations from functions.
-
pythonflow.operations.
assert_
(condition, message=None, *args, value=None)[source]¶ Return value if the condition is satisfied and raise an AssertionError with the specified message and args if not.
-
pythonflow.operations.
cache
(operation, get, put, key=None)[source]¶ Cache the values of operation.
Parameters: - operation (Operation) – Operation to cache.
- get (callable(object)) – Callable to retrieve an item from the cache. Should throw KeyError or FileNotFoundError if the item is not in the cache.
- put (callable(object, object)) – Callable that adds an item to the cache. The first argument is the key, the seconde the value.
- key (Operation) – Key for looking up an item in the cache. Defaults to a simple hash of the arguments of operation.
Returns: cached_operation – Cached operation.
Return type:
-
pythonflow.operations.
cache_file
(operation, filename_template, load=None, dump=None, key=None)[source]¶ Cache the values of operation in a file.
Parameters: - operation (Operation) – Operation to cache.
- filename_template (str) – Template for the filename taking a single key parameter.
- load (callable(str)) – Callable to retrieve an item from a given file. Should throw FileNotFoundError if the file does not exist.
- dump (callable(object, str)) – Callable to save the item to a file. The order of arguments differs from the put argument of cache to be compatible with pickle.dump, numpy.save, etc.
- key (Operation) – Key for looking up an item in the cache. Defaults to a simple hash of the arguments of operation.
Returns: cached_operation – Cached operation.
Return type:
-
class
pythonflow.operations.
conditional
(predicate, x, y=None, *, length=None, name=None, dependencies=None)[source]¶ Bases:
pythonflow.core.Operation
Return x if predicate is True and y otherwise.
Note
The conditional operation will only execute one branch of the computation graph depending on predicate.
-
pythonflow.operations.
constant
(value)¶ Operation returning the input value.
-
class
pythonflow.operations.
lazy_constant
(target, **kwargs)[source]¶ Bases:
pythonflow.core.Operation
Operation that returns the output of target lazily.
Parameters: - target (callable) – Function to evaluate when the operation is evaluated.
- kwargs (dict) – Keyword arguments passed to the constructor of Operation.
-
class
pythonflow.operations.
placeholder
(name=None, **kwargs)[source]¶ Bases:
pythonflow.core.Operation
Placeholder that needs to be given in the context to be evaluated.
-
pythonflow.operations.
str_format
(format_string, *args, **kwargs)[source]¶ Use python’s advanced string formatting to convert the format string and arguments.
References
-
class
pythonflow.operations.
try_
(operation, except_=None, finally_=None, **kwargs)[source]¶ Bases:
pythonflow.core.Operation
Try to evaluate operation, fall back to alternative operations in except_, and ensure that finally_ is evaluated.
Note
The alternative operations will only be executed if the target operation fails.
Parameters:
-
class
pythonflow.util.
Profiler
[source]¶ Bases:
object
Callback for profiling computational graphs.
-
get_slow_operations
(num_operations=None)[source]¶ Get the slowest operations.
Parameters: num_operations (int or None) – Maximum number of operations to return or None Returns: times – Mapping of execution times keyed by operations. Return type: collections.OrderedDict
-