npstreams: streaming NumPy functions¶
npstreams
is an open-source Python package for streaming NumPy array operations.
The goal is to provide tested, (almost) drop-in replacements for NumPy functions (where possible)
that operate on streams of arrays instead of dense arrays.
npstreams
also provides some utilities for parallelization. These parallelization
generators can be combined with the streaming functions to drastically improve performance
in some cases.
The code presented herein has been in use at some point by the Siwick research group.
Example¶
Consider the following snippet to combine 50 images
from an iterable source
:
import numpy as np
images = np.empty( shape = (2048, 2048, 50) )
from index, im in enumerate(source):
images[:,:,index] = im
avg = np.average(images, axis = 2)
If the source
iterable provided 10000 images, the above routine would
not work on most machines. Moreover, what if we want to transform the images
one by one before averaging them? What about looking at the average while it
is being computed? Let’s look at an example:
import numpy as np
from npstreams import iaverage
from scipy.misc import imread
stream = map(imread, list_of_filenames)
averaged = iaverage(stream)
At this point, the generators map()
and iaverage()
are ‘wired’
but will not compute anything until it is requested. We can look at the average evolve:
import matplotlib.pyplot as plt
for avg in average:
plt.imshow(avg); plt.show()
We can also use last()
to get at the final average:
from npstreams import last
total = last(averaged) # average of the entire stream. See also npstreams.average
Benchmark¶
npstreams provides a function for benchmarking common use cases.
To run the benchmark with default parameters, from the interpreter:
from npstreams import benchmark
benchmark()
From a command-line terminal:
python -c 'import npstreams; npstreams.benchmark()'
The results will be printed to the screen.
Links¶
General Documentation¶
Installation¶
Requirements¶
npstreams works on Linux, Mac OS X and Windows. It requires Python 3.6+ as well as numpy. scipy is an optional dependency that is only used in tests; however, if SciPy cannot be imported, tests will not fail.
To get access to the npstreams.cuda
module, which contains CUDA-enabled routines,
PyCUDA must be installed as well.
Install npstreams¶
npstreams is available on PyPI; it can be installed with pip:
python -m pip install npstreams
npstreams can also be installed with the conda package manager, from the conda-forge channel:
conda config --add channels conda-forge
conda install npstreams
You can install the latest developer version of npstreams by cloning the git repository:
git clone https://github.com/LaurentRDC/npstreams.git
…then installing the package with:
cd npstreams
python setup.py install
Testing¶
If you want to check that all the tests are running correctly with your Python configuration, type:
python setup.py test
Conventions¶
Stream Conventions¶
Most (all?) functions in npstreams
are designed to work on streams, or
iterables of NumPy arrays. These iterables can be infinite.
The quintessential example is a stream of images progressively read from disk.
These streams of arrays must contain arrays that all have the same shape and data-type,
unless specified otherwise.
An example of a function that operates on a stream of arrays of different shapes is ieinsum()
A single NumPy array can be passed where a stream is expected; the array will be repackaged into a stream of a single array.
Naming Conventions¶
In order to facilitate documentation, functions in npstreams
follow the following conventions:
- Routines are named after their closest equivalent in
numpy
andscipy
.- Routines with names starting with ‘i’ (e.g.
iprod()
) are generator functions; they yield running results as they are being computer. Usually, these functions have a non-generator equivalent that consumes the entire stream (e.g.iaverage()
vs.average()
).- Routines with names starting with ‘c’ (e.g.
csum()
) are CUDA-enabled (requirespycuda
)- Routines with names starting with ‘p’ (e.g.
pmap()
) can be parallelized. The default behavior is always to not use multiple cores. For example, the default behavior ofpmap()
is to behave likemap()
.
Axis Conventions¶
NumPy arrays provide operations along axes. Similarly, npstreams
also
exposes the axis
keyword in some (most?) reduction functions like isum()
and iprod()
.
The convention for specification of the axis
parameter is as follows:
- If
axis = None
, arrays are flattened before being combined. The result will be a scalar of a 0d array.- The default (
axis = -1
) always corresponds to combining arrays along a new axis. For example, summing images together alongaxis = -1
is equivalent to stacking images along a new axis, then averaging along this new axis- if
axis
is anint
, then arrays are reduced according to this axis, and then combined.
CUDA-enabled functions¶
Some functions are implemented using CUDA
Reference/API¶
Click on any function below to see detailed information.
Creation of Streams¶
Decorator for streaming functions which guarantees that the stream elements will be converted to arrays.
array_stream (func) |
Decorates streaming functions to make sure that the stream is a stream of ndarrays. |
The array_stream()
decorator wraps iterables into an ArrayStream
iterator. This is not
required to use the functions defined here, but it provides some nice guarantees.
ArrayStream (stream) |
Iterator of arrays. |
Statistical Functions¶
imean (arrays[, axis, ignore_nan]) |
Streaming mean of arrays. |
iaverage (arrays[, axis, weights, ignore_nan]) |
Streaming (weighted) average of arrays. |
istd (arrays[, axis, ddof, weights, ignore_nan]) |
Streaming standard deviation of arrays. |
ivar (arrays[, axis, ddof, weights, ignore_nan]) |
Streaming variance of arrays. |
isem (arrays[, axis, ddof, weights, ignore_nan]) |
Streaming standard error in the mean (SEM) of arrays. |
ihistogram (arrays, bins) |
Streaming histogram calculation. |
The following functions consume entire streams. By avoiding costly intermediate steps, they can perform much faster than their generator versions.
mean (arrays[, axis, ignore_nan]) |
Mean of a stream of arrays. |
average (arrays[, axis, weights, ignore_nan]) |
Average (weighted) of a stream of arrays. |
std (arrays[, axis, ddof, weights, ignore_nan]) |
Total standard deviation of arrays. |
var (arrays[, axis, ddof, weights, ignore_nan]) |
Total variance of a stream of arrays. |
sem (arrays[, axis, ddof, weights, ignore_nan]) |
Standard error in the mean (SEM) of a stream of arrays. |
Numerics¶
isum (arrays[, axis, dtype, ignore_nan]) |
Streaming sum of array elements. |
iprod (arrays[, axis, dtype, ignore_nan]) |
Streaming product of array elements. |
isub (arrays[, axis, dtype]) |
Subtract elements in a reduction fashion. |
sum (arrays[, axis, dtype, ignore_nan]) |
Sum of arrays in a stream. |
prod (arrays[, axis, dtype, ignore_nan]) |
Product of arrays in a stream. |
Linear Algebra¶
idot (arrays) |
Yields the cumulative array inner product (dot product) of arrays. |
iinner (arrays) |
Cumulative inner product of all arrays in a stream. |
itensordot (arrays[, axes]) |
Yields the cumulative array inner product (dot product) of arrays. |
ieinsum (arrays, subscripts, **kwargs) |
Evaluates the Einstein summation convention on the operands. |
Control Flow¶
ipipe (*args, **kwargs) |
Pipe arrays through a sequence of functions. |
iload (files, load_func, **kwargs) |
Create a stream of arrays from files, which are loaded lazily. |
pload (files, load_func[, processes]) |
Create a stream of arrays from files, which are loaded lazily from multiple processes. |
Comparisons¶
iany (arrays[, axis]) |
Test whether any array elements along a given axis evaluate to True. |
iall (arrays[, axis]) |
Test whether all array elements along a given axis evaluate to True |
imax (arrays, axis[, ignore_nan]) |
Maximum of a stream of arrays along an axis. |
imin (arrays, axis[, ignore_nan]) |
Minimum of a stream of arrays along an axis. |
Parallelization¶
pmap (func, iterable[, args, kwargs, …]) |
Parallel application of a function with keyword arguments. |
pmap_unordered (func, iterable[, args, …]) |
Parallel application of a function with keyword arguments in no particular order. |
preduce (func, iterable[, args, kwargs, …]) |
Parallel application of the reduce function, with keyword arguments. |
Iterator Utilities¶
last (stream) |
Retrieve the last item from a stream/iterator, consuming iterables in the process. |
cyclic (iterable) |
Yields cyclic permutations of an iterable. |
itercopy (iterable[, copies]) |
Split iterable into ‘copies’. |
chunked (iterable, chunksize) |
Generator yielding multiple iterables of length ‘chunksize’. |
linspace (start, stop, num[, endpoint]) |
Generate linear space. |
multilinspace (start, stop, num[, endpoint]) |
Generate multilinear space, for joining the values in two iterables. |
peek (iterable) |
Peek ahead in an iterable. |
primed (gen) |
Decorator that primes a generator function, i.e. |
length_hint (obj[, default]) |
Return an estimate of the number of items in obj . |
Array Utilities¶
nan_to_num (array[, fill_value, copy]) |
Replace NaNs with another fill value. |
CUDA support¶
What is CUDA¶
CUDA is a computing platform taking advantage of Nvidia hardware. It effectively allows for array computations on Graphical Processing Units (GPU).
npstreams
relies on the (optional) PyCUDA library
to access CUDA functionality.
Advantages of CUDA¶
TODO: benchmarks
CUDA in npstreams¶
PyCUDA is an optional dependency. Therefore, the CUDA-enabled functions are located in a separate
module, the npstreams.cuda
submodule.
Importing from npstreams.cuda
submodule¶
Importing anything from the npstreams.cuda
submodule will raise an ImportError
in the following cases:
- PyCUDA is not installed;
- No GPUs are available;
- CUDA compilation backend is not available, possibly due to incomplete installation.
With this in mind, it is wise to wrap import statements from npstreams.cuda
in a try/except
block.
CUDA-enabled routines¶
A limited set of functions implemented in npstreams also have CUDA-enabled equivalents. For performance reasons, all CUDA-enabled routines operate along the ‘stream’ axis, i.e. as if the arrays had been stacked along a new dimension.
Control Flow¶
Streaming array pipelines¶
Before reducing your stream of arrays (e.g. averaging them together), you may want to
transform them. This can be done with the ipipe()
function:
-
npstreams.
ipipe
(*args, **kwargs) Pipe arrays through a sequence of functions. For example:
pipe(f, g, h, stream)
is equivalent tofor arr in stream: yield f(g(h(arr)))
Parameters: - *funcs (callable) – Callable that support Numpy arrays in their first argument. These should NOT be generator functions.
- arrays (iterable) – Stream of arrays to be passed.
- processes (int or None, optional, keyword-only) – Number of processes to use. If None, maximal number of processes is used. Default is one.
- ntotal (int or None, optional, keyword-only) – If the length of arrays is known, but passing arrays as a list
would take too much memory, the total number of arrays ntotal can be specified. This
allows for pmap to chunk better in case of
processes > 1
.
Yields: piped (ndarray)
Imagine we have the following pipeline, in which we want processes images in some iterable arrays
as follows:
- Remove negative pixel intensity values;
- Adjust the gamma value of images (from Scikit-image’s
exposure
module); - Average the result together.
The following lines will do the trick:
from functools import partial
from npstreams import ipipe, iaverage, last
from skimage.exposure import adjust_gamma
def remove_negative(arr):
arr[arr < 0] = 0
return arr
pipeline = ipipe(adjust_gamma, remove_negative, arrays)
avgs = last(iaverage(pipeline))
If the pipeline is computationally intensive, we can also pipe arrays in parallel using the
keyword-only processes
:
pipeline = ipipe(adjust_gamma, remove_negative, arrays, processes = 4) # 4 cores will be used
avgs = last(iaverage(pipeline))
Since ipipe()
uses pmap()
under the hood, we can also use all available cores
by passing processes = None
.
Making your own Streaming Reduction Function¶
The ireduce_ufunc()
generator function¶
You can assemble your own streaming reduction function from a binary NumPy ufunc using the following generator function:
npstreams.
ireduce_ufunc
(arrays, ufunc, axis=-1, dtype=None, ignore_nan=False, **kwargs)¶Streaming reduction generator function from a binary NumPy ufunc. Generator version of reduce_ufunc.
ufunc
must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use ofnumpy.greater
, for example.Note that performance is much better for the default
axis = -1
. In such a case, reduction operations can occur in-place. This also allows to operate in constant-memory.
Parameters:
- arrays (iterable) – Arrays to be reduced.
- ufunc (numpy.ufunc) – Binary universal function.
- axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support
axis = None
, e.g.numpy.subtract
.- dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.
- ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g.
numpy.add.identity
is 0), then NaNs are replaced with this identity. An error is raised ifufunc
has no identity (e.g.numpy.maximum.identity
isNone
).- kwargs – Keyword arguments are passed to
ufunc
. Note that some valid ufunc keyword arguments (e.g.keepdims
) are not valid for all streaming functions. Also, contrary to NumPy v. 1.10+,casting = 'unsafe
is the default in npstreams.Yields: reduced (ndarray or scalar)
Raises:
- TypeError : if
ufunc
is not NumPy ufunc.- ValueError : if
ignore_nan
is True butufunc
has no identity- ValueError : if
ufunc
is not a binary ufunc- ValueError : if
ufunc
does not have the same input type as output type
The non-generator version is also available:
npstreams.
reduce_ufunc
(arrays, ufunc, axis=-1, dtype=None, ignore_nan=False, **kwargs)¶Reduce a stream using a binary NumPy ufunc. Function version of
ireduce_ufunc
.
ufunc
must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use ofnumpy.greater
, for example.Note that performance is much better for the default
axis = -1
. In such a case, reduction operations can occur in-place. This also allows to operate in constant-memory.
Parameters:
- arrays (iterable) – Arrays to be reduced.
- ufunc (numpy.ufunc) – Binary universal function.
- axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support
axis = None
, e.g.numpy.subtract
.- dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.
- ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g.
numpy.add.identity
is 0), then NaNs are replaced with this identity. An error is raised ifufunc
has no identity (e.g.numpy.maximum.identity
isNone
).- kwargs – Keyword arguments are passed to
ufunc
. Note that some valid ufunc keyword arguments (e.g.keepdims
) are not valid for all streaming functions. Note that contrary to NumPy v. 1.10+,casting = 'unsafe
is the default in npstreams.Returns: reduced
Return type: ndarray or scalar
Raises:
- TypeError : if
ufunc
is not NumPy ufunc.- ValueError : if
ignore_nan
is True butufunc
has no identity- ValueError: if
ufunc
is not a binary ufunc- ValueError: if
ufunc
does not have the same input type as output type
Note that while all NumPy ufuncs have a reduce()
method, not all of them are useful.
This is why ireduce_ufunc()
and reduce_ufunc()
will only work with binary ufuncs,
most of which are listed below. For performance reasons, we further restrict the use of
ireduce_ufunc()
and reduce_ufunc()
to ufuncs that have the same input types
as output types. Therefore, for example, numpy.greater()
cannot be made to work with
ireduce_ufunc()
and reduce_ufunc()
.
NaNs handling¶
NumPy ufuncs can have an identity value, that is, a value such that ufunc(x1, identity)
is always x1
. For such ufuncs,
ireduce_ufunc()
and reduce_ufunc()
can replace NaNs in the stream with the ufunc’s identity value, if ignore_nan = True
.
Note that not all ufuncs have an identity value; for example, how would you define the identity value of numpy.maximum
? There is no answer.
NumPy Binary Ufuncs¶
ireduce_ufunc()
is tested to work on the following binary ufuncs, which are available in NumPy.
Arithmetics¶
numpy.add |
add(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.subtract |
subtract(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.multiply |
multiply(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.divide |
true_divide(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.logaddexp |
logaddexp(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.logaddexp2 |
logaddexp2(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.true_divide |
true_divide(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.floor_divide |
floor_divide(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.power |
power(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.remainder |
remainder(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.mod |
remainder(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.fmod |
fmod(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
Trigonometric functions¶
numpy.arctan2 |
arctan2(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.hypot |
hypot(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
Bit-twiddling functions¶
numpy.bitwise_and |
bitwise_and(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.bitwise_or |
bitwise_or(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.bitwise_xor |
bitwise_xor(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.left_shift |
left_shift(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.right_shift |
right_shift(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
Comparison functions¶
numpy.maximum |
maximum(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.fmax |
fmax(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.minimum |
minimum(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.fmin |
fmin(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
Floating functions¶
numpy.copysign |
copysign(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.nextafter |
nextafter(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
numpy.ldexp |
ldexp(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj]) |
Example: Streaming Maximum¶
Let’s create a streaming maximum function for a stream. First, we have to choose
how to handle NaNs; since numpy.maximum
does not have an identity value, we must find
another way. We can proceed as follows:
- If we want to propagate NaNs, we should use
numpy.maximum()
- If we want to ignore NaNs, we should use
numpy.fmax()
Both of those functions are binary ufuncs, so we can use ireduce_ufunc()
. Note that any function based
on ireduce_ufunc()
or reduce_ufunc()
will automatically work on streams of numbers thanks to the
array_stream()
decorator.
Putting it all together:
from npstreams import ireduce_ufunc
from numpy import maximum, fmax
def imax(arrays, axis = -1, ignore_nan = False, **kwargs):
"""
Streaming cumulative maximum along an axis.
Parameters
----------
arrays : iterable
Stream of arrays to be compared.
axis : int or None, optional
Axis along which to compute the maximum. If None,
arrays are flattened before reduction.
ignore_nan : bool, optional
If True, NaNs are ignored. Default is False.
Yields
------
online_max : ndarray
"""
ufunc = fmax if ignore_nan else maximum
yield from ireduce_ufunc(arrays, ufunc, axis = axis, **kwargs)
This will provide us with a streaming function, meaning that we can look at the progress
as it is being computed. We can also create a function that returns the max of the stream
like numpy.ndarray.max()
using the reduce_ufunc()
function:
from npstreams import reduce_ufunc
def smax(*args, **kwargs): # s for stream
"""
Maximum of a stream along an axis.
Parameters
----------
arrays : iterable
Stream of arrays to be compared.
axis : int or None, optional
Axis along which to compute the maximum. If None,
arrays are flattened before reduction.
ignore_nan : bool, optional
If True, NaNs are ignored. Default is False.
Yields
------
max : ndarray
"""
return reduce_ufunc(*args, **kwargs)
Recipes¶
Single-pass mean and error calculation¶
Here is a snipped for a function that computes a mean and standard error in the mean (SEM) in a single pass:
from npstreams import imean, isem, array_stream, itercopy
# The `array_stream` decorator ensures that the elements of
# the iterable `arrays` will be converted to ndarrays if possible
# This decorator is not required.
@array_stream
def mean_and_error(arrays, axis = -1):
""" Yields (mean, error) pairs from a stream of arrays """
# itercopy creates a copy of the original stream
# The elements are only generated once, and then fed
# to those two copies; much more efficient than
# creating two streams from scratch.
arrays_for_mean, arrays_for_sem = itercopy(arrays)
means = imean(arrays_for_mean, axis = axis)
errors = isem(arrays_for_sem, axis = axis)
yield from zip(means, errors)
Authors¶
- Laurent P. René de Cotret