npstreams: streaming NumPy functions

npstreams is an open-source Python package for streaming NumPy array operations. The goal is to provide tested, (almost) drop-in replacements for NumPy functions (where possible) that operate on streams of arrays instead of dense arrays.

npstreams also provides some utilities for parallelization. These parallelization generators can be combined with the streaming functions to drastically improve performance in some cases.

The code presented herein has been in use at some point by the Siwick research group.

Example

Consider the following snippet to combine 50 images from an iterable source:

import numpy as np

images = np.empty( shape = (2048, 2048, 50) )
from index, im in enumerate(source):
    images[:,:,index] = im

avg = np.average(images, axis = 2)

If the source iterable provided 10000 images, the above routine would not work on most machines. Moreover, what if we want to transform the images one by one before averaging them? What about looking at the average while it is being computed? Let’s look at an example:

import numpy as np
from npstreams import iaverage
from scipy.misc import imread

stream = map(imread, list_of_filenames)
averaged = iaverage(stream)

At this point, the generators map() and iaverage() are ‘wired’ but will not compute anything until it is requested. We can look at the average evolve:

import matplotlib.pyplot as plt
for avg in average:
    plt.imshow(avg); plt.show()

We can also use last() to get at the final average:

from npstreams import last

total = last(averaged) # average of the entire stream. See also npstreams.average

Benchmark

npstreams provides a function for benchmarking common use cases.

To run the benchmark with default parameters, from the interpreter:

from npstreams import benchmark
benchmark()

From a command-line terminal:

python -c 'import npstreams; npstreams.benchmark()'

The results will be printed to the screen.

General Documentation

Installation

Requirements

npstreams works on Linux, Mac OS X and Windows. It requires Python 3.6+ as well as numpy. scipy is an optional dependency that is only used in tests; however, if SciPy cannot be imported, tests will not fail.

To get access to the npstreams.cuda module, which contains CUDA-enabled routines, PyCUDA must be installed as well.

Install npstreams

npstreams is available on PyPI; it can be installed with pip:

python -m pip install npstreams

npstreams can also be installed with the conda package manager, from the conda-forge channel:

conda config --add channels conda-forge
conda install npstreams

You can install the latest developer version of npstreams by cloning the git repository:

git clone https://github.com/LaurentRDC/npstreams.git

…then installing the package with:

cd npstreams
python setup.py install

Testing

If you want to check that all the tests are running correctly with your Python configuration, type:

python setup.py test

Conventions

Stream Conventions

Most (all?) functions in npstreams are designed to work on streams, or iterables of NumPy arrays. These iterables can be infinite. The quintessential example is a stream of images progressively read from disk. These streams of arrays must contain arrays that all have the same shape and data-type, unless specified otherwise.

An example of a function that operates on a stream of arrays of different shapes is ieinsum()

A single NumPy array can be passed where a stream is expected; the array will be repackaged into a stream of a single array.

Naming Conventions

In order to facilitate documentation, functions in npstreams follow the following conventions:

  • Routines are named after their closest equivalent in numpy and scipy.
  • Routines with names starting with ‘i’ (e.g. iprod()) are generator functions; they yield running results as they are being computer. Usually, these functions have a non-generator equivalent that consumes the entire stream (e.g. iaverage() vs. average()).
  • Routines with names starting with ‘c’ (e.g. csum()) are CUDA-enabled (requires pycuda)
  • Routines with names starting with ‘p’ (e.g. pmap()) can be parallelized. The default behavior is always to not use multiple cores. For example, the default behavior of pmap() is to behave like map().

Axis Conventions

NumPy arrays provide operations along axes. Similarly, npstreams also exposes the axis keyword in some (most?) reduction functions like isum() and iprod().

The convention for specification of the axis parameter is as follows:

  • If axis = None, arrays are flattened before being combined. The result will be a scalar of a 0d array.
  • The default (axis = -1) always corresponds to combining arrays along a new axis. For example, summing images together along axis = -1 is equivalent to stacking images along a new axis, then averaging along this new axis
  • if axis is an int, then arrays are reduced according to this axis, and then combined.

CUDA-enabled functions

Some functions are implemented using CUDA

Reference/API

Click on any function below to see detailed information.

Creation of Streams

Decorator for streaming functions which guarantees that the stream elements will be converted to arrays.

array_stream(func) Decorates streaming functions to make sure that the stream is a stream of ndarrays.

The array_stream() decorator wraps iterables into an ArrayStream iterator. This is not required to use the functions defined here, but it provides some nice guarantees.

ArrayStream(stream) Iterator of arrays.

Statistical Functions

imean(arrays[, axis, ignore_nan]) Streaming mean of arrays.
iaverage(arrays[, axis, weights, ignore_nan]) Streaming (weighted) average of arrays.
istd(arrays[, axis, ddof, weights, ignore_nan]) Streaming standard deviation of arrays.
ivar(arrays[, axis, ddof, weights, ignore_nan]) Streaming variance of arrays.
isem(arrays[, axis, ddof, weights, ignore_nan]) Streaming standard error in the mean (SEM) of arrays.
ihistogram(arrays, bins) Streaming histogram calculation.

The following functions consume entire streams. By avoiding costly intermediate steps, they can perform much faster than their generator versions.

mean(arrays[, axis, ignore_nan]) Mean of a stream of arrays.
average(arrays[, axis, weights, ignore_nan]) Average (weighted) of a stream of arrays.
std(arrays[, axis, ddof, weights, ignore_nan]) Total standard deviation of arrays.
var(arrays[, axis, ddof, weights, ignore_nan]) Total variance of a stream of arrays.
sem(arrays[, axis, ddof, weights, ignore_nan]) Standard error in the mean (SEM) of a stream of arrays.

Numerics

isum(arrays[, axis, dtype, ignore_nan]) Streaming sum of array elements.
iprod(arrays[, axis, dtype, ignore_nan]) Streaming product of array elements.
isub(arrays[, axis, dtype]) Subtract elements in a reduction fashion.
sum(arrays[, axis, dtype, ignore_nan]) Sum of arrays in a stream.
prod(arrays[, axis, dtype, ignore_nan]) Product of arrays in a stream.

Linear Algebra

idot(arrays) Yields the cumulative array inner product (dot product) of arrays.
iinner(arrays) Cumulative inner product of all arrays in a stream.
itensordot(arrays[, axes]) Yields the cumulative array inner product (dot product) of arrays.
ieinsum(arrays, subscripts, **kwargs) Evaluates the Einstein summation convention on the operands.

Control Flow

ipipe(*args, **kwargs) Pipe arrays through a sequence of functions.
iload(files, load_func, **kwargs) Create a stream of arrays from files, which are loaded lazily.
pload(files, load_func[, processes]) Create a stream of arrays from files, which are loaded lazily from multiple processes.

Comparisons

iany(arrays[, axis]) Test whether any array elements along a given axis evaluate to True.
iall(arrays[, axis]) Test whether all array elements along a given axis evaluate to True
imax(arrays, axis[, ignore_nan]) Maximum of a stream of arrays along an axis.
imin(arrays, axis[, ignore_nan]) Minimum of a stream of arrays along an axis.

Parallelization

pmap(func, iterable[, args, kwargs, …]) Parallel application of a function with keyword arguments.
pmap_unordered(func, iterable[, args, …]) Parallel application of a function with keyword arguments in no particular order.
preduce(func, iterable[, args, kwargs, …]) Parallel application of the reduce function, with keyword arguments.

Stacking

stack(arrays[, axis]) Stack of all arrays from a stream.

Iterator Utilities

last(stream) Retrieve the last item from a stream/iterator, consuming iterables in the process.
cyclic(iterable) Yields cyclic permutations of an iterable.
itercopy(iterable[, copies]) Split iterable into ‘copies’.
chunked(iterable, chunksize) Generator yielding multiple iterables of length ‘chunksize’.
linspace(start, stop, num[, endpoint]) Generate linear space.
multilinspace(start, stop, num[, endpoint]) Generate multilinear space, for joining the values in two iterables.
peek(iterable) Peek ahead in an iterable.
primed(gen) Decorator that primes a generator function, i.e.
length_hint(obj[, default]) Return an estimate of the number of items in obj.

Array Utilities

nan_to_num(array[, fill_value, copy]) Replace NaNs with another fill value.

Benchmarking

benchmark([funcs, ufuncs, shapes, file]) Benchmark npstreams against numpy and print the results.

CUDA support

What is CUDA

CUDA is a computing platform taking advantage of Nvidia hardware. It effectively allows for array computations on Graphical Processing Units (GPU).

npstreams relies on the (optional) PyCUDA library to access CUDA functionality.

Advantages of CUDA

TODO: benchmarks

CUDA in npstreams

PyCUDA is an optional dependency. Therefore, the CUDA-enabled functions are located in a separate module, the npstreams.cuda submodule.

Importing from npstreams.cuda submodule

Importing anything from the npstreams.cuda submodule will raise an ImportError in the following cases:

  • PyCUDA is not installed;
  • No GPUs are available;
  • CUDA compilation backend is not available, possibly due to incomplete installation.

With this in mind, it is wise to wrap import statements from npstreams.cuda in a try/except block.

CUDA-enabled routines

A limited set of functions implemented in npstreams also have CUDA-enabled equivalents. For performance reasons, all CUDA-enabled routines operate along the ‘stream’ axis, i.e. as if the arrays had been stacked along a new dimension.

Control Flow

Streaming array pipelines

Before reducing your stream of arrays (e.g. averaging them together), you may want to transform them. This can be done with the ipipe() function:

npstreams.ipipe(*args, **kwargs)

Pipe arrays through a sequence of functions. For example:

pipe(f, g, h, stream) is equivalent to

for arr in stream:
    yield f(g(h(arr)))
Parameters:
  • *funcs (callable) – Callable that support Numpy arrays in their first argument. These should NOT be generator functions.
  • arrays (iterable) – Stream of arrays to be passed.
  • processes (int or None, optional, keyword-only) – Number of processes to use. If None, maximal number of processes is used. Default is one.
  • ntotal (int or None, optional, keyword-only) – If the length of arrays is known, but passing arrays as a list would take too much memory, the total number of arrays ntotal can be specified. This allows for pmap to chunk better in case of processes > 1.
Yields:

piped (ndarray)

Imagine we have the following pipeline, in which we want processes images in some iterable arrays as follows:

  • Remove negative pixel intensity values;
  • Adjust the gamma value of images (from Scikit-image’s exposure module);
  • Average the result together.

The following lines will do the trick:

from functools import partial
from npstreams import ipipe, iaverage, last
from skimage.exposure import adjust_gamma

def remove_negative(arr):
    arr[arr < 0] = 0
    return arr

pipeline = ipipe(adjust_gamma, remove_negative, arrays)
avgs = last(iaverage(pipeline))

If the pipeline is computationally intensive, we can also pipe arrays in parallel using the keyword-only processes:

pipeline = ipipe(adjust_gamma, remove_negative, arrays, processes = 4)  # 4 cores will be used
avgs = last(iaverage(pipeline))

Since ipipe() uses pmap() under the hood, we can also use all available cores by passing processes = None.

Making your own Streaming Reduction Function

The ireduce_ufunc() generator function

You can assemble your own streaming reduction function from a binary NumPy ufunc using the following generator function:

npstreams.ireduce_ufunc(arrays, ufunc, axis=-1, dtype=None, ignore_nan=False, **kwargs)

Streaming reduction generator function from a binary NumPy ufunc. Generator version of reduce_ufunc.

ufunc must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use of numpy.greater, for example.

Note that performance is much better for the default axis = -1. In such a case, reduction operations can occur in-place. This also allows to operate in constant-memory.

Parameters:
  • arrays (iterable) – Arrays to be reduced.
  • ufunc (numpy.ufunc) – Binary universal function.
  • axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support axis = None, e.g. numpy.subtract.
  • dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.
  • ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g. numpy.add.identity is 0), then NaNs are replaced with this identity. An error is raised if ufunc has no identity (e.g. numpy.maximum.identity is None).
  • kwargs – Keyword arguments are passed to ufunc. Note that some valid ufunc keyword arguments (e.g. keepdims) are not valid for all streaming functions. Also, contrary to NumPy v. 1.10+, casting = 'unsafe is the default in npstreams.
Yields:

reduced (ndarray or scalar)

Raises:
  • TypeError : if ufunc is not NumPy ufunc.
  • ValueError : if ignore_nan is True but ufunc has no identity
  • ValueError : if ufunc is not a binary ufunc
  • ValueError : if ufunc does not have the same input type as output type

The non-generator version is also available:

npstreams.reduce_ufunc(arrays, ufunc, axis=-1, dtype=None, ignore_nan=False, **kwargs)

Reduce a stream using a binary NumPy ufunc. Function version of ireduce_ufunc.

ufunc must be a NumPy binary Ufunc (i.e. it takes two arguments). Moreover, for performance reasons, ufunc must have the same return types as input types. This precludes the use of numpy.greater, for example.

Note that performance is much better for the default axis = -1. In such a case, reduction operations can occur in-place. This also allows to operate in constant-memory.

Parameters:
  • arrays (iterable) – Arrays to be reduced.
  • ufunc (numpy.ufunc) – Binary universal function.
  • axis (int or None, optional) – Reduction axis. Default is to reduce the arrays in the stream as if they had been stacked along a new axis, then reduce along this new axis. If None, arrays are flattened before reduction. If axis is an int larger that the number of dimensions in the arrays of the stream, arrays are reduced along the new axis. Note that not all of NumPy Ufuncs support axis = None, e.g. numpy.subtract.
  • dtype (numpy.dtype or None, optional) – Overrides the dtype of the calculation and output arrays.
  • ignore_nan (bool, optional) – If True and ufunc has an identity value (e.g. numpy.add.identity is 0), then NaNs are replaced with this identity. An error is raised if ufunc has no identity (e.g. numpy.maximum.identity is None).
  • kwargs – Keyword arguments are passed to ufunc. Note that some valid ufunc keyword arguments (e.g. keepdims) are not valid for all streaming functions. Note that contrary to NumPy v. 1.10+, casting = 'unsafe is the default in npstreams.
Returns:

reduced

Return type:

ndarray or scalar

Raises:
  • TypeError : if ufunc is not NumPy ufunc.
  • ValueError : if ignore_nan is True but ufunc has no identity
  • ValueError: if ufunc is not a binary ufunc
  • ValueError: if ufunc does not have the same input type as output type

Note that while all NumPy ufuncs have a reduce() method, not all of them are useful. This is why ireduce_ufunc() and reduce_ufunc() will only work with binary ufuncs, most of which are listed below. For performance reasons, we further restrict the use of ireduce_ufunc() and reduce_ufunc() to ufuncs that have the same input types as output types. Therefore, for example, numpy.greater() cannot be made to work with ireduce_ufunc() and reduce_ufunc().

NaNs handling

NumPy ufuncs can have an identity value, that is, a value such that ufunc(x1, identity) is always x1. For such ufuncs, ireduce_ufunc() and reduce_ufunc() can replace NaNs in the stream with the ufunc’s identity value, if ignore_nan = True. Note that not all ufuncs have an identity value; for example, how would you define the identity value of numpy.maximum? There is no answer.

NumPy Binary Ufuncs

ireduce_ufunc() is tested to work on the following binary ufuncs, which are available in NumPy.

Arithmetics
numpy.add add(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.subtract subtract(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.multiply multiply(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.divide true_divide(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.logaddexp logaddexp(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.logaddexp2 logaddexp2(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.true_divide true_divide(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.floor_divide floor_divide(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.power power(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.remainder remainder(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.mod remainder(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.fmod fmod(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
Trigonometric functions
numpy.arctan2 arctan2(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.hypot hypot(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
Bit-twiddling functions
numpy.bitwise_and bitwise_and(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.bitwise_or bitwise_or(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.bitwise_xor bitwise_xor(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.left_shift left_shift(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.right_shift right_shift(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
Comparison functions
numpy.maximum maximum(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.fmax fmax(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.minimum minimum(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.fmin fmin(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
Floating functions
numpy.copysign copysign(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.nextafter nextafter(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])
numpy.ldexp ldexp(x1, x2, /, out=None, *, where=True, casting=’same_kind’, order=’K’, dtype=None, subok=True[, signature, extobj])

Example: Streaming Maximum

Let’s create a streaming maximum function for a stream. First, we have to choose how to handle NaNs; since numpy.maximum does not have an identity value, we must find another way. We can proceed as follows:

  • If we want to propagate NaNs, we should use numpy.maximum()
  • If we want to ignore NaNs, we should use numpy.fmax()

Both of those functions are binary ufuncs, so we can use ireduce_ufunc(). Note that any function based on ireduce_ufunc() or reduce_ufunc() will automatically work on streams of numbers thanks to the array_stream() decorator.

Putting it all together:

from npstreams import ireduce_ufunc
from numpy import maximum, fmax

def imax(arrays, axis = -1, ignore_nan = False, **kwargs):
    """
    Streaming cumulative maximum along an axis.

    Parameters
    ----------
    arrays : iterable
        Stream of arrays to be compared.
    axis : int or None, optional
        Axis along which to compute the maximum. If None,
        arrays are flattened before reduction.
    ignore_nan : bool, optional
        If True, NaNs are ignored. Default is False.

    Yields
    ------
    online_max : ndarray
    """
    ufunc = fmax if ignore_nan else maximum
    yield from ireduce_ufunc(arrays, ufunc, axis = axis, **kwargs)

This will provide us with a streaming function, meaning that we can look at the progress as it is being computed. We can also create a function that returns the max of the stream like numpy.ndarray.max() using the reduce_ufunc() function:

from npstreams import reduce_ufunc

def smax(*args, **kwargs):  # s for stream
    """
    Maximum of a stream along an axis.

    Parameters
    ----------
    arrays : iterable
        Stream of arrays to be compared.
    axis : int or None, optional
        Axis along which to compute the maximum. If None,
        arrays are flattened before reduction.
    ignore_nan : bool, optional
        If True, NaNs are ignored. Default is False.

    Yields
    ------
    max : ndarray
    """
    return reduce_ufunc(*args, **kwargs)

Recipes

Single-pass mean and error calculation

Here is a snipped for a function that computes a mean and standard error in the mean (SEM) in a single pass:

from npstreams import imean, isem, array_stream, itercopy

# The `array_stream` decorator ensures that the elements of
# the iterable `arrays` will be converted to ndarrays if possible
# This decorator is not required.
@array_stream
def mean_and_error(arrays, axis = -1):
    """ Yields (mean, error) pairs from a stream of arrays """
    # itercopy creates a copy of the original stream
    # The elements are only generated once, and then fed
    # to those two copies; much more efficient than
    # creating two streams from scratch.
    arrays_for_mean, arrays_for_sem = itercopy(arrays)

    means = imean(arrays_for_mean, axis = axis)
    errors = isem(arrays_for_sem, axis = axis)

    yield from zip(means, errors)

Authors

  • Laurent P. René de Cotret