Dask¶
Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.
On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.
Users interact with dask either by making graphs directly or through the dask collections which provide larger-than-memory counterparts to existing popular libraries:
- dask.array = numpy + threading
- dask.bag = map, filter, toolz + multiprocessing
- dask.dataframe = pandas + threading
Dask primarily targets parallel computations that run on a single machine. It integrates nicely with the existing PyData ecosystem and is trivial to setup and use:
conda install dask
or
pip install dask
Operations on dask collections (array, bag, dataframe) produce task graphs that encode blocked algorithms. Task schedulers execute these task graphs in parallel in a variety of contexts.

Collections:
Dask collections are the main interaction point for users. They look like NumPy and pandas but generate dask graphs internally. If you are a dask user then you should start here.
Array¶
Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using dask graphs.
Overview¶
Dask arrays implement a subset of the NumPy interface on large arrays using blocked algorithms and task scheduling.
Scope¶
The dask.array library supports the following interface from numpy.
- Arithmetic and scalar mathematics, +, *, exp, log, ...
- Reductions along axes, sum(), mean(), std(), sum(axis=0), ...
- Tensor contractions / dot products / matrix multiply, tensordot
- Axis reordering / transpose, transpose
- Slicing, x[:100, 500:100:-2]
- Fancy indexing along single axes with lists or numpy arrays, x[:, [10, 1, 5]]
- The array protocol __array__
These operations should match the NumPy interface precisely.
Construct¶
We can construct dask array objects from other array objects that support numpy-style slicing. Here we wrap a dask array around an HDF5 dataset, chunking that dataset into blocks of size (1000, 1000).
>>> import h5py
>>> f = h5py.File('myfile.hdf5')
>>> dset = f['/data/path']
>>> import dask.array as da
>>> x = da.from_array(dset, chunks=(1000, 1000))
Often we have many such datasets. We can use the stack or concatenate functions to bind many dask arrays into one.
>>> dsets = [h5py.File(fn)['/data'] for fn in sorted(glob('myfiles.*.hdf5')]
>>> arrays = [da.from_array(dset, chunks=(1000, 1000))
for dset in dsets]
>>> x = da.stack(arrays, axis=0) # Stack along a new first axis
Interact¶
Dask copies the NumPy API for an important subset of operations, including arithmetic operators, ufuncs, slicing, dot products, and reductions.
>>> y = log(x + 1)[:5].sum(axis=1)
Store¶
In Memory¶
If your data is small you can call np.array on your dask array to turn it in to a normal NumPy array.
>>> x = da.arange(6, chunks=3)
>>> y = x**2
>>> np.array(y)
array([0, 1, 4, 9, 16, 25])
HDF5¶
Use the to_hdf5 function to store data into HDF5 using h5py.
>>> da.to_hdf5('myfile.hdf5', '/y', y) # doctest: +SKIP
Store several arrays in one computation with the function da.to_hdf5 by passing in a dict.
>>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y}) # doctest: +SKIP
Other On-Disk Storage¶
Alternatively you can store dask arrays in any object that supports numpy-style slice assignment like h5py.Dataset, or bcolz.carray.
>>> import bcolz # doctest: +SKIP
>>> out = bcolz.zeros(shape=y.shape, rootdir='myfile.bcolz') # doctest: +SKIP
>>> da.store(y, out) # doctest: +SKIP
You can store several arrays in one computation by passing lists of sources and destinations.
>>> da.store([array1, array2], [output1, outpu2])
On-Disk Storage¶
In the example above we used h5py but dask.array works equally well with pytables, bcolz, or any library that provides an array object from which we can slice out numpy arrays.
>>> x = dataset[1000:2000, :2000] # pull out numpy array from on-disk object
This API has become a standard in Scientific Python. Dask works with any object that supports this operation and the equivalent assignment syntax.
>>> dataset[1000:2000, :2000] = x # Store numpy array in on-disk object
Limitations¶
Dask.array does not implement the entire numpy interface. Users expecting this will be disappointed. Notably dask.array has the following failings:
- Dask does not implement all of np.linalg. This has been done by a number of excellent BLAS/LAPACK implementations and is the focus of numerous ongoing academic research projects.
- Dask.array does not support any operation where the resulting shape depends on the values of the array. In order to form the dask graph we must be able to infer the shape of the array before actually executing the operation. This precludes operations like indexing one dask array with another or operations like np.where.
- Dask.array does not attempt operations like sort which are notoriously difficult to do in parallel and are of somewhat diminished value on very large data (you rarely actually need a full sort). Often we include parallel-friendly alternatives like topk.
- Dask development is driven by immediate need, and so many lesser used functions have not been implemented. Community contributions are encouraged.
Internal Design¶

Dask arrays define a large array with a grid of blocks of smaller arrays. These arrays may be concrete, or functions that produce arrays. We define a dask array with from the following components
- A dask with a special set of keys designating blocks e.g. ('x', 0, 0), ('x', 0, 1), ...
- A sequence of chunk sizes along each dimension called chunks e.g. ((5, 5, 5, 5), (8, 8, 8))
- A name to identify which keys in the dask refer to this array, e.g. 'x'
Keys of the dask graph¶
By special convention we refer to each block of the array with a tuple of the form (name, i, j, k) for i, j, k being the indices of the block, ranging from 0 to the number of blocks in that dimension. The dask must hold key-value pairs referring to these keys. It likely also holds other key-value pairs required to eventually compute the desired values, e.g.
{
('x', 0, 0): (add, 1, ('y', 0, 0)),
('x', 0, 1): (add, 1, ('y', 0, 1)),
...
('y', 0, 0): (getitem, dataset, (slice(0, 1000), slice(0, 1000))),
('y', 0, 1): (getitem, dataset, (slice(0, 1000), slice(1000, 2000)))
...
}
The name of an Array object can be found in the name attribute. One can get a nested list of keys with the ._keys() method. One can flatten down this list with dask.array.core.flatten(); this is sometimes useful when building new dictionaries.
Chunks¶
We also store the size of each block along each axis. This is a tuple of tuples such that the length of the outer tuple is equal to the dimension and the lengths of the inner tuples are equal to the number of blocks along each dimension. In the example illustrated above this value is as follows:
chunks = ((5, 5, 5, 5), (8, 8, 8))
Note that these numbers do not necessarily need to be regular. We often create regularly sized grids but blocks change shape after complex slicing. Beware that some operations do expect certain symmetries in the block-shapes. For example matrix multiplication requires that blocks on each side have anti-symmetric shapes.
Some ways in which chunks reflects properties of our array
len(x.chunks) == x.ndims: The length of chunks is the number of dimensions
map(sum, chunks) == shape: The sum of each internal chunk, is the length of that dimension.
The length of each internal chunk is the number of keys in that dimension, e.g. for chunks == ((a, b), (d, e, f)) and name == 'x' our array has tasks with the following keys:
('x', 0, 0), ('x', 0, 1), ('x', 0, 2) ('x', 1, 0), ('x', 1, 1), ('x', 1, 2)
Create an Array Object¶
So to create an da.Array object we need a dictionary with these special keys
dsk = {('x', 0, 0): ...}
a name specifying to which keys this array refers
name = 'x'
and a chunks tuple:
chunks = ((5, 5, 5, 5), (8, 8, 8))
Then one can construct an array:
x = da.Array(dsk, name, chunks)
So dask.array operations update dask dictionaries and track chunks shapes.
Example - eye function¶
As an example lets build the np.eye function for dask.array to make the identity matrix
def eye(n, blocksize):
chunks = ((blocksize,) * n // blocksize,
(blocksize,) * n // blocksize)
name = 'eye' + next(tokens) # unique identifier
dsk = {(name, i, j): (np.eye, blocksize)
if i == j else
(np.zeros, (blocksize, blocksize))
for i in range(n // blocksize)
for j in range(n // blocksize)}
dtype = np.eye(0).dtype # take dtype default from numpy
return Array(dsk, name, chunks, dtype)
Create Dask Arrays¶
We store and manipulate large arrays in a wide variety of ways. There are some standards like HDF5 and NetCDF but just as often people use custom storage solutions. This page talks about how to build dask graphs to interact with your array.
In principle we need functions that return NumPy arrays. These functions and their arrangement can be as simple or as complex as the situation dictates.
Simple case - Format Supports NumPy Slicing¶
Many formats have Python projects that expose storage using NumPy slicing syntax. For example the HDF5 file format has the h5py project, which provides a Dataset object into which we can slice in NumPy fashion
>>> import h5py
>>> f = h5py.File('myfile.hdf5') # HDF5 file
>>> d = f['/data/path'] # Pointer on on-disk array
>>> d.shape # d can be very large
(1000000, 1000000)
>>> x = d[:5, :5] # We slice to get numpy arrays
It is common for Python wrappers of on-disk array formats to present a NumPy slicing syntax. The full dataset looks like a NumPy array with .shape and .dtype attributes even though the data hasn’t yet been loaded in and still lives on disk. Slicing in to this array-like object fetches the appropriate data from disk and returns that region as an in-memory NumPy array.
For this common case dask.array presents the convenience function da.from_array
>>> import dask.array as da
>>> x = da.from_array(d, chunks=(1000, 1000))
Concatenation and Stacking¶
Often we store data in several different locations and want to stitch them together. For this case please see docs on concatenation and stacking.
Complex case¶
If your format does not provide a convenient slicing solution you will need to dive down one layer to interact with dask dictionaries. Your goal is to create a dictionary with tasks that create NumPy arrays, see docs on array design before continuing with this subsection.
To construct a dask array manually you need a dict with tasks that form numpy arrays
dsk = {('x', 0): (f, ...),
('x', 1), (f, ...),
('x', 2): (f, ...)}
And a chunks tuple that defines the shapes of your blocks along each dimension
chunks = [(1000, 1000, 1000)]
For the tasks (f, ...) your choice of function f and arguments ... is up to you. You have the full freedom of the Python language here as long as your function, when run with those arguments, produces the appropriate NumPy array.
Chunks¶
We always specify a chunks argument to tell dask.array how to break up the underlying array into chunks. This strongly impacts performance. We can specify chunks in one of three ways
- a blocksize like 1000
- a blockshape like (1000, 1000)
- explicit sizes of all blocks along all dimensions, like ((1000, 1000, 500), (400, 400))
Your chunks input will be normalized and stored in the third and most explicit form.
Example¶
As an example we might load a grid of pickle files known to contain 1000 by 1000 NumPy arrays
def load(fn):
with open(fn) as f:
result = pickle.load(f)
return result
dsk = {('x', 0, 0): (load, 'block-0-0.pkl'),
('x', 0, 1): (load, 'block-0-1.pkl'),
('x', 0, 2): (load, 'block-0-2.pkl'),
('x', 1, 0): (load, 'block-1-0.pkl'),
('x', 1, 1): (load, 'block-1-1.pkl'),
('x', 1, 2): (load, 'block-1-2.pkl')}
chunks = ((1000, 1000), (1000, 1000, 1000))
x = da.Array(dsk, 'x', chunks)
API¶
Create and Store Arrays¶
from_array(x, chunks[, name, lock]) | Create dask array from something that looks like an array |
store(sources, targets, **kwargs) | Store dask arrays in array-like objects, overwrite data in target |
Array.to_hdf5(filename, datapath, **kwargs) | Store array in HDF5 file |
Specialized Functions for Dask.Array¶
Array.map_blocks(func[, chunks, dtype, name]) | Map a function across all blocks of a dask array |
Array.map_overlap(func, depth[, boundary, trim]) | Map a function over blocks of the array with some overlap |
topk(k, x) | The top k elements of an array |
coarsen(reduction, x, axes[, trim_excess]) | Coarsen array by applying reduction to fixed size neighborhoods |
stack(seq[, axis]) | Stack arrays along a new axis |
concatenate(seq[, axis]) | Concatenate arrays along an existing axis |
Array Methods¶
- class dask.array.core.Array(dask, name, chunks, dtype=None, shape=None)¶
Parallel Array
Parameters: dask : dict
Task dependency graph
name : string
Name of array in dask
shape : tuple of ints
Shape of the entire array
chunks: iterable of tuples
block sizes along each dimension
Attributes
T chunks dtype imag nbytes Number of bytes in array ndim numblocks real shape size Number of elements in array vindex Methods
all([axis, keepdims]) Test whether all array elements along a given axis evaluate to True. any([axis, keepdims]) Test whether any array element along a given axis evaluates to True. argmax([axis]) Indices of the maximum values along an axis. argmin([axis]) Return the indices of the minimum values along an axis. astype(dtype, **kwargs) Copy of the array, cast to a specified type cache([store]) Evaluate and cache array compute(**kwargs) conj() dot(a, b[, out]) Dot product of two arrays. map_blocks(func[, chunks, dtype, name]) Map a function across all blocks of a dask array map_overlap(func, depth[, boundary, trim]) Map a function over blocks of the array with some overlap max([axis, keepdims]) Return the maximum of an array or maximum along an axis. mean([axis, dtype, keepdims]) Compute the arithmetic mean along the specified axis. min([axis, keepdims]) Return the minimum of an array or minimum along an axis. moment(order[, axis, dtype, keepdims, ddof]) Calculate the nth centralized moment. prod([axis, dtype, keepdims]) Return the product of array elements over a given axis. rechunk(chunks) squeeze() Remove single-dimensional entries from the shape of an array. std([axis, dtype, keepdims, ddof]) Compute the standard deviation along the specified axis. store(target, **kwargs) Store dask arrays in array-like objects, overwrite data in target sum([axis, dtype, keepdims]) Sum of array elements over a given axis. to_hdf5(filename, datapath, **kwargs) Store array in HDF5 file topk(k) The top k elements of an array transpose([axes]) Permute the dimensions of an array. var([axis, dtype, keepdims, ddof]) Compute the variance along the specified axis. visualize([filename, optimize_graph]) vnorm([ord, axis, keepdims]) Vector norm - all(axis=None, keepdims=False)¶
Test whether all array elements along a given axis evaluate to True.
Parameters: a : array_like
Input array or object that can be converted to an array.
axis : None or int or tuple of ints, optional
Axis or axes along which a logical AND reduction is performed. The default (axis = None) is perform a logical OR over all the dimensions of the input array. axis may be negative, in which case it counts from the last to the first axis.
New in version 1.7.0.
If this is a tuple of ints, a reduction is performed on multiple axes, instead of a single axis or all the axes as before.
out : ndarray, optional
Alternate output array in which to place the result. It must have the same shape as the expected output and its type is preserved (e.g., if dtype(out) is float, the result will consist of 0.0’s and 1.0’s). See doc.ufuncs (Section “Output arguments”) for more details.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original arr.
Returns: all : ndarray, bool
A new boolean or array is returned unless out is specified, in which case a reference to out is returned.
See also
- ndarray.all
- equivalent method
- any
- Test whether any element along a given axis evaluates to True.
Notes
Not a Number (NaN), positive infinity and negative infinity evaluate to True because these are not equal to zero.
Examples
>>> np.all([[True,False],[True,True]]) False
>>> np.all([[True,False],[True,True]], axis=0) array([ True, False], dtype=bool)
>>> np.all([-1, 4, 5]) True
>>> np.all([1.0, np.nan]) True
>>> o=np.array([False]) >>> z=np.all([-1, 4, 5], out=o) >>> id(z), id(o), z (28293632, 28293632, array([ True], dtype=bool))
- any(axis=None, keepdims=False)¶
Test whether any array element along a given axis evaluates to True.
Returns single boolean unless axis is not None
Parameters: a : array_like
Input array or object that can be converted to an array.
axis : None or int or tuple of ints, optional
Axis or axes along which a logical OR reduction is performed. The default (axis = None) is perform a logical OR over all the dimensions of the input array. axis may be negative, in which case it counts from the last to the first axis.
New in version 1.7.0.
If this is a tuple of ints, a reduction is performed on multiple axes, instead of a single axis or all the axes as before.
out : ndarray, optional
Alternate output array in which to place the result. It must have the same shape as the expected output and its type is preserved (e.g., if it is of type float, then it will remain so, returning 1.0 for True and 0.0 for False, regardless of the type of a). See doc.ufuncs (Section “Output arguments”) for details.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original arr.
Returns: any : bool or ndarray
A new boolean or ndarray is returned unless out is specified, in which case a reference to out is returned.
See also
- ndarray.any
- equivalent method
- all
- Test whether all elements along a given axis evaluate to True.
Notes
Not a Number (NaN), positive infinity and negative infinity evaluate to True because these are not equal to zero.
Examples
>>> np.any([[True, False], [True, True]]) True
>>> np.any([[True, False], [False, False]], axis=0) array([ True, False], dtype=bool)
>>> np.any([-1, 0, 5]) True
>>> np.any(np.nan) True
>>> o=np.array([False]) >>> z=np.any([-1, 4, 5], out=o) >>> z, o (array([ True], dtype=bool), array([ True], dtype=bool)) >>> # Check now that z is a reference to o >>> z is o True >>> id(z), id(o) # identity of z and o (191614240, 191614240)
- argmax(axis=None)¶
Indices of the maximum values along an axis.
Parameters: a : array_like
Input array.
axis : int, optional
By default, the index is into the flattened array, otherwise along the specified axis.
Returns: index_array : ndarray of ints
Array of indices into the array. It has the same shape as a.shape with the dimension along axis removed.
See also
ndarray.argmax, argmin
- amax
- The maximum value along a given axis.
- unravel_index
- Convert a flat index into an index tuple.
Notes
In case of multiple occurrences of the maximum values, the indices corresponding to the first occurrence are returned.
Examples
>>> a = np.arange(6).reshape(2,3) >>> a array([[0, 1, 2], [3, 4, 5]]) >>> np.argmax(a) 5 >>> np.argmax(a, axis=0) array([1, 1, 1]) >>> np.argmax(a, axis=1) array([2, 2])
>>> b = np.arange(6) >>> b[1] = 5 >>> b array([0, 5, 2, 3, 4, 5]) >>> np.argmax(b) # Only the first occurrence is returned. 1
- argmin(axis=None)¶
Return the indices of the minimum values along an axis.
See also
- argmax
- Similar function. Please refer to numpy.argmax for detailed documentation.
- astype(dtype, **kwargs)¶
Copy of the array, cast to a specified type
- cache(store=None, **kwargs)¶
Evaluate and cache array
Parameters: store: MutableMapping or ndarray-like
Place to put computed and cached chunks
kwargs:
Keyword arguments to pass on to get function for scheduling
This triggers evaluation and store the result in either
1. An ndarray object supporting setitem (see da.store)
2. A MutableMapping like a dict or chest
It then returns a new dask array that points to this store.
This returns a semantically equivalent dask array.
>>> import dask.array as da
>>> x = da.arange(5, chunks=2)
>>> y = 2*x + 1
>>> z = y.cache() # triggers computation
>>> y.compute() # Does entire computation
array([1, 3, 5, 7, 9])
>>> z.compute() # Just pulls from store
array([1, 3, 5, 7, 9])
You might base a cache off of an array like a numpy array or
h5py.Dataset.
>>> cache = np.empty(5, dtype=x.dtype)
>>> z = y.cache(store=cache)
>>> cache
array([1, 3, 5, 7, 9])
Or one might use a MutableMapping like a dict or chest
>>> cache = dict()
>>> z = y.cache(store=cache)
>>> cache # doctest: +SKIP
{(‘x’, 0): array([1, 3]),
(‘x’, 1): array([5, 7]), (‘x’, 2): array([9])}
- dot(a, b, out=None)¶
Dot product of two arrays.
For 2-D arrays it is equivalent to matrix multiplication, and for 1-D arrays to inner product of vectors (without complex conjugation). For N dimensions it is a sum product over the last axis of a and the second-to-last of b:
dot(a, b)[i,j,k,m] = sum(a[i,j,:] * b[k,:,m])
Parameters: a : array_like
First argument.
b : array_like
Second argument.
out : ndarray, optional
Output argument. This must have the exact kind that would be returned if it was not used. In particular, it must have the right type, must be C-contiguous, and its dtype must be the dtype that would be returned for dot(a,b). This is a performance feature. Therefore, if these conditions are not met, an exception is raised, instead of attempting to be flexible.
Returns: output : ndarray
Returns the dot product of a and b. If a and b are both scalars or both 1-D arrays then a scalar is returned; otherwise an array is returned. If out is given, then it is returned.
Raises: ValueError
If the last dimension of a is not the same size as the second-to-last dimension of b.
See also
- vdot
- Complex-conjugating dot product.
- tensordot
- Sum products over arbitrary axes.
- einsum
- Einstein summation convention.
Examples
>>> np.dot(3, 4) 12
Neither argument is complex-conjugated:
>>> np.dot([2j, 3j], [2j, 3j]) (-13+0j)
For 2-D arrays it’s the matrix product:
>>> a = [[1, 0], [0, 1]] >>> b = [[4, 1], [2, 2]] >>> np.dot(a, b) array([[4, 1], [2, 2]])
>>> a = np.arange(3*4*5*6).reshape((3,4,5,6)) >>> b = np.arange(3*4*5*6)[::-1].reshape((5,4,6,3)) >>> np.dot(a, b)[2,3,2,1,2,2] 499128 >>> sum(a[2,3,2,:] * b[1,2,:,2]) 499128
- map_blocks(func, chunks=None, dtype=None, name=None)¶
Map a function across all blocks of a dask array
You must also specify the chunks of the resulting array. If you don’t then we assume that the resulting array has the same block structure as the input.
>>> import dask.array as da >>> x = da.arange(6, chunks=3)
>>> x.map_blocks(lambda x: x * 2).compute() array([ 0, 2, 4, 6, 8, 10])
The da.map_blocks function can also accept multiple arrays
>>> d = da.arange(5, chunks=2) >>> e = da.arange(5, chunks=2)
>>> f = map_blocks(lambda a, b: a + b**2, d, e) >>> f.compute() array([ 0, 2, 6, 12, 20])
If function changes shape of the blocks then please provide chunks explicitly.
>>> y = x.map_blocks(lambda x: x[::2], chunks=((2, 2),))
Your block function can learn where in the array it is if it supports a block_id keyword argument. This will receive entries like (2, 0, 1), the position of the block in the dask array.
>>> def func(block, block_id=None): ... pass
You may specify the name of the resulting task in the graph with the optional name keyword argument.
>>> y = x.map_blocks(lambda x: x + 1, name='increment')
- map_overlap(func, depth, boundary=None, trim=True, **kwargs)¶
Map a function over blocks of the array with some overlap
We share neighboring zones between blocks of the array, then map a function, then trim away the neighboring strips.
Parameters: func: function
The function to apply to each extended block
depth: int, tuple, or dict
The number of cells that each block should share with its neighbors If a tuple or dict this can be different per axis
boundary: str, tuple, dict
how to handle the boundaries. Values include ‘reflect’, ‘periodic’, ‘nearest’, ‘none’, or any constant value like 0 or np.nan
trim: bool
Whether or not to trim the excess after the map function. Set this to false if your mapping function does this for you.
**kwargs:
Other keyword arguments valid in map_blocks
Examples
>>> x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1]) >>> x = from_array(x, chunks=5) >>> def derivative(x): ... return x - np.roll(x, 1)
>>> y = x.map_overlap(derivative, depth=1, boundary=0) >>> y.compute() array([ 1, 0, 1, 1, 0, 0, -1, -1, 0])
>>> import dask.array as da >>> x = np.arange(16).reshape((4, 4)) >>> d = da.from_array(x, chunks=(2, 2)) >>> d.map_overlap(lambda x: x + x.size, depth=1).compute() array([[16, 17, 18, 19], [20, 21, 22, 23], [24, 25, 26, 27], [28, 29, 30, 31]])
>>> func = lambda x: x + x.size >>> depth = {0: 1, 1: 1} >>> boundary = {0: 'reflect', 1: 'none'} >>> d.map_overlap(func, depth, boundary).compute() array([[ 12., 13., 14., 15.], [ 16., 17., 18., 19.], [ 20., 21., 22., 23.], [ 24., 25., 26., 27.]])
- max(axis=None, keepdims=False)¶
Return the maximum of an array or maximum along an axis.
Parameters: a : array_like
Input data.
axis : int, optional
Axis along which to operate. By default, flattened input is used.
out : ndarray, optional
Alternative output array in which to place the result. Must be of the same shape and buffer length as the expected output. See doc.ufuncs (Section “Output arguments”) for more details.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original arr.
Returns: amax : ndarray or scalar
Maximum of a. If axis is None, the result is a scalar value. If axis is given, the result is an array of dimension a.ndim - 1.
See also
- amin
- The minimum value of an array along a given axis, propagating any NaNs.
- nanmax
- The maximum value of an array along a given axis, ignoring any NaNs.
- maximum
- Element-wise maximum of two arrays, propagating any NaNs.
- fmax
- Element-wise maximum of two arrays, ignoring any NaNs.
- argmax
- Return the indices of the maximum values.
nanmin, minimum, fmin
Notes
NaN values are propagated, that is if at least one item is NaN, the corresponding max value will be NaN as well. To ignore NaN values (MATLAB behavior), please use nanmax.
Don’t use amax for element-wise comparison of 2 arrays; when a.shape[0] is 2, maximum(a[0], a[1]) is faster than amax(a, axis=0).
Examples
>>> a = np.arange(4).reshape((2,2)) >>> a array([[0, 1], [2, 3]]) >>> np.amax(a) # Maximum of the flattened array 3 >>> np.amax(a, axis=0) # Maxima along the first axis array([2, 3]) >>> np.amax(a, axis=1) # Maxima along the second axis array([1, 3])
>>> b = np.arange(5, dtype=np.float) >>> b[2] = np.NaN >>> np.amax(b) nan >>> np.nanmax(b) 4.0
- mean(axis=None, dtype=None, keepdims=False)¶
Compute the arithmetic mean along the specified axis.
Returns the average of the array elements. The average is taken over the flattened array by default, otherwise over the specified axis. float64 intermediate and return values are used for integer inputs.
Parameters: a : array_like
Array containing numbers whose mean is desired. If a is not an array, a conversion is attempted.
axis : int, optional
Axis along which the means are computed. The default is to compute the mean of the flattened array.
dtype : data-type, optional
Type to use in computing the mean. For integer inputs, the default is float64; for floating point inputs, it is the same as the input dtype.
out : ndarray, optional
Alternate output array in which to place the result. The default is None; if provided, it must have the same shape as the expected output, but the type will be cast if necessary. See doc.ufuncs for details.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original arr.
Returns: m : ndarray, see dtype parameter above
If out=None, returns a new array containing the mean values, otherwise a reference to the output array is returned.
Notes
The arithmetic mean is the sum of the elements along the axis divided by the number of elements.
Note that for floating-point input, the mean is computed using the same precision the input has. Depending on the input data, this can cause the results to be inaccurate, especially for float32 (see example below). Specifying a higher-precision accumulator using the dtype keyword can alleviate this issue.
Examples
>>> a = np.array([[1, 2], [3, 4]]) >>> np.mean(a) 2.5 >>> np.mean(a, axis=0) array([ 2., 3.]) >>> np.mean(a, axis=1) array([ 1.5, 3.5])
In single precision, mean can be inaccurate:
>>> a = np.zeros((2, 512*512), dtype=np.float32) >>> a[0, :] = 1.0 >>> a[1, :] = 0.1 >>> np.mean(a) 0.546875
Computing the mean in float64 is more accurate:
>>> np.mean(a, dtype=np.float64) 0.55000000074505806
- min(axis=None, keepdims=False)¶
Return the minimum of an array or minimum along an axis.
Parameters: a : array_like
Input data.
axis : int, optional
Axis along which to operate. By default, flattened input is used.
out : ndarray, optional
Alternative output array in which to place the result. Must be of the same shape and buffer length as the expected output. See doc.ufuncs (Section “Output arguments”) for more details.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original arr.
Returns: amin : ndarray or scalar
Minimum of a. If axis is None, the result is a scalar value. If axis is given, the result is an array of dimension a.ndim - 1.
See also
- amax
- The maximum value of an array along a given axis, propagating any NaNs.
- nanmin
- The minimum value of an array along a given axis, ignoring any NaNs.
- minimum
- Element-wise minimum of two arrays, propagating any NaNs.
- fmin
- Element-wise minimum of two arrays, ignoring any NaNs.
- argmin
- Return the indices of the minimum values.
nanmax, maximum, fmax
Notes
NaN values are propagated, that is if at least one item is NaN, the corresponding min value will be NaN as well. To ignore NaN values (MATLAB behavior), please use nanmin.
Don’t use amin for element-wise comparison of 2 arrays; when a.shape[0] is 2, minimum(a[0], a[1]) is faster than amin(a, axis=0).
Examples
>>> a = np.arange(4).reshape((2,2)) >>> a array([[0, 1], [2, 3]]) >>> np.amin(a) # Minimum of the flattened array 0 >>> np.amin(a, axis=0) # Minima along the first axis array([0, 1]) >>> np.amin(a, axis=1) # Minima along the second axis array([0, 2])
>>> b = np.arange(5, dtype=np.float) >>> b[2] = np.NaN >>> np.amin(b) nan >>> np.nanmin(b) 0.0
- moment(order, axis=None, dtype=None, keepdims=False, ddof=0)¶
Calculate the nth centralized moment.
Parameters: order : int
Order of the moment that is returned, must be >= 2.
axis : int, optional
Axis along which the central moment is computed. The default is to compute the moment of the flattened array.
dtype : data-type, optional
Type to use in computing the moment. For arrays of integer type the default is float64; for arrays of float types it is the same as the array type.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original array.
ddof : int, optional
“Delta Degrees of Freedom”: the divisor used in the calculation is N - ddof, where N represents the number of elements. By default ddof is zero.
Returns: moment : ndarray
References
[R1] Pebay, Philippe (2008), “Formulas for Robust, One-Pass Parallel Computation of Covariances and Arbitrary-Order Statistical Moments” (PDF), Technical Report SAND2008-6212, Sandia National Laboratories
- nbytes¶
Number of bytes in array
- prod(axis=None, dtype=None, keepdims=False)¶
Return the product of array elements over a given axis.
Parameters: a : array_like
Input data.
axis : None or int or tuple of ints, optional
Axis or axes along which a product is performed. The default (axis = None) is perform a product over all the dimensions of the input array. axis may be negative, in which case it counts from the last to the first axis.
New in version 1.7.0.
If this is a tuple of ints, a product is performed on multiple axes, instead of a single axis or all the axes as before.
dtype : data-type, optional
The data-type of the returned array, as well as of the accumulator in which the elements are multiplied. By default, if a is of integer type, dtype is the default platform integer. (Note: if the type of a is unsigned, then so is dtype.) Otherwise, the dtype is the same as that of a.
out : ndarray, optional
Alternative output array in which to place the result. It must have the same shape as the expected output, but the type of the output values will be cast if necessary.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original arr.
Returns: product_along_axis : ndarray, see dtype parameter above.
An array shaped as a but with the specified axis removed. Returns a reference to out if specified.
See also
- ndarray.prod
- equivalent method
- numpy.doc.ufuncs
- Section “Output arguments”
Notes
Arithmetic is modular when using integer types, and no error is raised on overflow. That means that, on a 32-bit platform:
>>> x = np.array([536870910, 536870910, 536870910, 536870910]) >>> np.prod(x) #random 16
Examples
By default, calculate the product of all elements:
>>> np.prod([1.,2.]) 2.0
Even when the input array is two-dimensional:
>>> np.prod([[1.,2.],[3.,4.]]) 24.0
But we can also specify the axis over which to multiply:
>>> np.prod([[1.,2.],[3.,4.]], axis=1) array([ 2., 12.])
If the type of x is unsigned, then the output type is the unsigned platform integer:
>>> x = np.array([1, 2, 3], dtype=np.uint8) >>> np.prod(x).dtype == np.uint True
If x is of a signed integer type, then the output type is the default platform integer:
>>> x = np.array([1, 2, 3], dtype=np.int8) >>> np.prod(x).dtype == np.int True
- size¶
Number of elements in array
- squeeze()¶
Remove single-dimensional entries from the shape of an array.
Parameters: a : array_like
Input data.
axis : None or int or tuple of ints, optional
New in version 1.7.0.
Selects a subset of the single-dimensional entries in the shape. If an axis is selected with shape entry greater than one, an error is raised.
Returns: squeezed : ndarray
The input array, but with with all or a subset of the dimensions of length 1 removed. This is always a itself or a view into a.
Examples
>>> x = np.array([[[0], [1], [2]]]) >>> x.shape (1, 3, 1) >>> np.squeeze(x).shape (3,) >>> np.squeeze(x, axis=(2,)).shape (1, 3)
- std(axis=None, dtype=None, keepdims=False, ddof=0)¶
Compute the standard deviation along the specified axis.
Returns the standard deviation, a measure of the spread of a distribution, of the array elements. The standard deviation is computed for the flattened array by default, otherwise over the specified axis.
Parameters: a : array_like
Calculate the standard deviation of these values.
axis : int, optional
Axis along which the standard deviation is computed. The default is to compute the standard deviation of the flattened array.
dtype : dtype, optional
Type to use in computing the standard deviation. For arrays of integer type the default is float64, for arrays of float types it is the same as the array type.
out : ndarray, optional
Alternative output array in which to place the result. It must have the same shape as the expected output but the type (of the calculated values) will be cast if necessary.
ddof : int, optional
Means Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. By default ddof is zero.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original arr.
Returns: standard_deviation : ndarray, see dtype parameter above.
If out is None, return a new array containing the standard deviation, otherwise return a reference to the output array.
Notes
The standard deviation is the square root of the average of the squared deviations from the mean, i.e., std = sqrt(mean(abs(x - x.mean())**2)).
The average squared deviation is normally calculated as x.sum() / N, where N = len(x). If, however, ddof is specified, the divisor N - ddof is used instead. In standard statistical practice, ddof=1 provides an unbiased estimator of the variance of the infinite population. ddof=0 provides a maximum likelihood estimate of the variance for normally distributed variables. The standard deviation computed in this function is the square root of the estimated variance, so even with ddof=1, it will not be an unbiased estimate of the standard deviation per se.
Note that, for complex numbers, std takes the absolute value before squaring, so that the result is always real and nonnegative.
For floating-point input, the std is computed using the same precision the input has. Depending on the input data, this can cause the results to be inaccurate, especially for float32 (see example below). Specifying a higher-accuracy accumulator using the dtype keyword can alleviate this issue.
Examples
>>> a = np.array([[1, 2], [3, 4]]) >>> np.std(a) 1.1180339887498949 >>> np.std(a, axis=0) array([ 1., 1.]) >>> np.std(a, axis=1) array([ 0.5, 0.5])
In single precision, std() can be inaccurate:
>>> a = np.zeros((2,512*512), dtype=np.float32) >>> a[0,:] = 1.0 >>> a[1,:] = 0.1 >>> np.std(a) 0.45172946707416706
Computing the standard deviation in float64 is more accurate:
>>> np.std(a, dtype=np.float64) 0.44999999925552653
- store(target, **kwargs)¶
Store dask arrays in array-like objects, overwrite data in target
This stores dask arrays into object that supports numpy-style setitem indexing. It stores values chunk by chunk so that it does not have to fill up memory. For best performance you can align the block size of the storage target with the block size of your array.
If your data fits in memory then you may prefer calling np.array(myarray) instead.
Parameters: sources: Array or iterable of Arrays
targets: array-like or iterable of array-likes
These should support setitem syntax target[10:20] = ...
Examples
>>> x = ...
>>> import h5py >>> f = h5py.File('myfile.hdf5') >>> dset = f.create_dataset('/data', shape=x.shape, ... chunks=x.chunks, ... dtype='f8')
>>> store(x, dset)
Alternatively store many arrays at the same time
>>> store([x, y, z], [dset1, dset2, dset3])
- sum(axis=None, dtype=None, keepdims=False)¶
Sum of array elements over a given axis.
Parameters: a : array_like
Elements to sum.
axis : None or int or tuple of ints, optional
Axis or axes along which a sum is performed. The default (axis = None) is perform a sum over all the dimensions of the input array. axis may be negative, in which case it counts from the last to the first axis.
New in version 1.7.0.
If this is a tuple of ints, a sum is performed on multiple axes, instead of a single axis or all the axes as before.
dtype : dtype, optional
The type of the returned array and of the accumulator in which the elements are summed. By default, the dtype of a is used. An exception is when a has an integer type with less precision than the default platform integer. In that case, the default platform integer is used instead.
out : ndarray, optional
Array into which the output is placed. By default, a new array is created. If out is given, it must be of the appropriate shape (the shape of a with axis removed, i.e., numpy.delete(a.shape, axis)). Its type is preserved. See doc.ufuncs (Section “Output arguments”) for more details.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original arr.
Returns: sum_along_axis : ndarray
An array with the same shape as a, with the specified axis removed. If a is a 0-d array, or if axis is None, a scalar is returned. If an output array is specified, a reference to out is returned.
See also
- ndarray.sum
- Equivalent method.
- cumsum
- Cumulative sum of array elements.
- trapz
- Integration of array values using the composite trapezoidal rule.
mean, average
Notes
Arithmetic is modular when using integer types, and no error is raised on overflow.
Examples
>>> np.sum([0.5, 1.5]) 2.0 >>> np.sum([0.5, 0.7, 0.2, 1.5], dtype=np.int32) 1 >>> np.sum([[0, 1], [0, 5]]) 6 >>> np.sum([[0, 1], [0, 5]], axis=0) array([0, 6]) >>> np.sum([[0, 1], [0, 5]], axis=1) array([1, 5])
If the accumulator is too small, overflow occurs:
>>> np.ones(128, dtype=np.int8).sum(dtype=np.int8) -128
- to_hdf5(filename, datapath, **kwargs)¶
Store array in HDF5 file
>>> x.to_hdf5('myfile.hdf5', '/x')
Optionally provide arguments as though to h5py.File.create_dataset
>>> x.to_hdf5('myfile.hdf5', '/x', compression='lzf', shuffle=True)
See also
da.store, h5py.File.create_dataset
- topk(k)¶
The top k elements of an array
Returns the k greatest elements of the array in sorted order. Only works on arrays of a single dimension.
>>> x = np.array([5, 1, 3, 6]) >>> d = from_array(x, chunks=2) >>> d.topk(2).compute() array([6, 5])
Runs in near linear time, returns all results in a single chunk so all k elements must fit in memory.
- transpose(axes=None)¶
Permute the dimensions of an array.
Parameters: a : array_like
Input array.
axes : list of ints, optional
By default, reverse the dimensions, otherwise permute the axes according to the values given.
Returns: p : ndarray
a with its axes permuted. A view is returned whenever possible.
See also
rollaxis
Examples
>>> x = np.arange(4).reshape((2,2)) >>> x array([[0, 1], [2, 3]])
>>> np.transpose(x) array([[0, 2], [1, 3]])
>>> x = np.ones((1, 2, 3)) >>> np.transpose(x, (1, 0, 2)).shape (2, 1, 3)
- var(axis=None, dtype=None, keepdims=False, ddof=0)¶
Compute the variance along the specified axis.
Returns the variance of the array elements, a measure of the spread of a distribution. The variance is computed for the flattened array by default, otherwise over the specified axis.
Parameters: a : array_like
Array containing numbers whose variance is desired. If a is not an array, a conversion is attempted.
axis : int, optional
Axis along which the variance is computed. The default is to compute the variance of the flattened array.
dtype : data-type, optional
Type to use in computing the variance. For arrays of integer type the default is float32; for arrays of float types it is the same as the array type.
out : ndarray, optional
Alternate output array in which to place the result. It must have the same shape as the expected output, but the type is cast if necessary.
ddof : int, optional
“Delta Degrees of Freedom”: the divisor used in the calculation is N - ddof, where N represents the number of elements. By default ddof is zero.
keepdims : bool, optional
If this is set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the original arr.
Returns: variance : ndarray, see dtype parameter above
If out=None, returns a new array containing the variance; otherwise, a reference to the output array is returned.
Notes
The variance is the average of the squared deviations from the mean, i.e., var = mean(abs(x - x.mean())**2).
The mean is normally calculated as x.sum() / N, where N = len(x). If, however, ddof is specified, the divisor N - ddof is used instead. In standard statistical practice, ddof=1 provides an unbiased estimator of the variance of a hypothetical infinite population. ddof=0 provides a maximum likelihood estimate of the variance for normally distributed variables.
Note that for complex numbers, the absolute value is taken before squaring, so that the result is always real and nonnegative.
For floating-point input, the variance is computed using the same precision the input has. Depending on the input data, this can cause the results to be inaccurate, especially for float32 (see example below). Specifying a higher-accuracy accumulator using the dtype keyword can alleviate this issue.
Examples
>>> a = np.array([[1,2],[3,4]]) >>> np.var(a) 1.25 >>> np.var(a, axis=0) array([ 1., 1.]) >>> np.var(a, axis=1) array([ 0.25, 0.25])
In single precision, var() can be inaccurate:
>>> a = np.zeros((2,512*512), dtype=np.float32) >>> a[0,:] = 1.0 >>> a[1,:] = 0.1 >>> np.var(a) 0.20405951142311096
Computing the variance in float64 is more accurate:
>>> np.var(a, dtype=np.float64) 0.20249999932997387 >>> ((1-0.55)**2 + (0.1-0.55)**2)/2 0.20250000000000001
- vnorm(ord=None, axis=None, keepdims=False)¶
Vector norm
Other functions¶
- dask.array.core.from_array(x, chunks, name=None, lock=False)¶
Create dask array from something that looks like an array
Input must have a .shape and support numpy-style slicing.
The chunks argument must be one of the following forms:
- a blocksize like 1000
- a blockshape like (1000, 1000)
- explicit sizes of all blocks along all dimensions like ((1000, 1000, 500), (400, 400)).
Examples
>>> x = h5py.File('...')['/data/path'] >>> a = da.from_array(x, chunks=(1000, 1000))
If your underlying datastore does not support concurrent reads then include the lock=True keyword argument or lock=mylock if you want multiple arrays to coordinate around the same lock.
>>> a = da.from_array(x, chunks=(1000, 1000), lock=True)
- dask.array.core.store(sources, targets, **kwargs)¶
Store dask arrays in array-like objects, overwrite data in target
This stores dask arrays into object that supports numpy-style setitem indexing. It stores values chunk by chunk so that it does not have to fill up memory. For best performance you can align the block size of the storage target with the block size of your array.
If your data fits in memory then you may prefer calling np.array(myarray) instead.
Parameters: sources: Array or iterable of Arrays
targets: array-like or iterable of array-likes
These should support setitem syntax target[10:20] = ...
Examples
>>> x = ...
>>> import h5py >>> f = h5py.File('myfile.hdf5') >>> dset = f.create_dataset('/data', shape=x.shape, ... chunks=x.chunks, ... dtype='f8')
>>> store(x, dset)
Alternatively store many arrays at the same time
>>> store([x, y, z], [dset1, dset2, dset3])
- dask.array.core.topk(k, x)¶
The top k elements of an array
Returns the k greatest elements of the array in sorted order. Only works on arrays of a single dimension.
>>> x = np.array([5, 1, 3, 6]) >>> d = from_array(x, chunks=2) >>> d.topk(2).compute() array([6, 5])
Runs in near linear time, returns all results in a single chunk so all k elements must fit in memory.
- dask.array.core.coarsen(reduction, x, axes, trim_excess=False)¶
Coarsen array by applying reduction to fixed size neighborhoods
Parameters: reduction: function
Function like np.sum, np.mean, etc...
x: np.ndarray
Array to be coarsened
axes: dict
Mapping of axis to coarsening factor
- dask.array.core.stack(seq, axis=0)¶
Stack arrays along a new axis
Given a sequence of dask Arrays form a new dask Array by stacking them along a new dimension (axis=0 by default)
See also
Examples
Create slices
>>> import dask.array as da >>> import numpy as np
>>> data = [from_array(np.ones((4, 4)), chunks=(2, 2)) ... for i in range(3)]
>>> x = da.stack(data, axis=0) >>> x.shape (3, 4, 4)
>>> da.stack(data, axis=1).shape (4, 3, 4)
>>> da.stack(data, axis=-1).shape (4, 4, 3)
Result is a new dask Array
- dask.array.core.concatenate(seq, axis=0)¶
Concatenate arrays along an existing axis
Given a sequence of dask Arrays form a new dask Array by stacking them along an existing dimension (axis=0 by default)
See also
Examples
Create slices
>>> import dask.array as da >>> import numpy as np
>>> data = [from_array(np.ones((4, 4)), chunks=(2, 2)) ... for i in range(3)]
>>> x = da.concatenate(data, axis=0) >>> x.shape (12, 4)
>>> da.concatenate(data, axis=1).shape (4, 12)
Result is a new dask Array
Slicing¶
Dask.array supports most of the NumPy slicing syntax. In particular it supports the following:
- Slicing by integers and slices x[0, :5]
- Slicing by lists/arrays of integers x[[1, 2, 4]]
- Slicing by lists/arrays of booleans x[[False, True, True, False, True]]
It does not currently support the following:
- Slicing one dask.array with another x[x > 0]
- Slicing with lists in multiple axes x[[1, 2, 3], [3, 2, 1]]
Both of these are straightforward to add though. If you have a use case then raise an issue.
Efficiency¶
The normal dask schedulers are smart enough to compute only those blocks that are necessary to achieve the desired slicing. So large operations may be cheap if only a small output is desired.
In the example below we create a trillion element dask array in million element blocks. We then operate on the entire array and finally slice out only a portion of the output.
>>> Trillion element array of ones, in 1000 by 1000 blocks
>>> x = da.ones((1000000, 1000000), chunks=(1000, 1000))
>>> da.exp(x)[:1500, :1500]
...
This only needs to compute the top-left four blocks to achieve the result. We are still slightly wasteful on those blocks where we need only partial results. We are also a bit wasteful in that we still need to manipulate the dask-graph with a million or so tasks in it. This can cause an interactive overhead of a second or two.
But generally, slicing works well.
Stack and Concatenate¶
Often we have many arrays stored on disk that we want to stack together and think of as one large array. This is common with geospatial data in which we might have many HDF5/NetCDF files on disk, one for every day, but we want to do operations that span multiple days.
To solve this problem we use the functions da.stack and da.concatenate.
Stack¶
We stack many existing dask Arrays into a new array, creating a new dimension as we go.
>>> import dask.array as da
>>> data = [from_array(np.ones((4, 4)), chunks=(2, 2))
... for i in range(3)] # A small stack of dask arrays
>>> x = da.stack(data, axis=0)
>>> x.shape
(3, 4, 4)
>>> da.stack(data, axis=1).shape
(4, 3, 4)
>>> da.stack(data, axis=-1).shape
(4, 4, 3)
This creates a new dimension with length equal to the number of slices
Concatenate¶
We concatenate existing arrays into a new array, extending them along an existing dimension
>>> import dask.array as da
>>> import numpy as np
>>> data = [from_array(np.ones((4, 4)), chunks=(2, 2))
... for i in range(3)] # small stack of dask arrays
>>> x = da.concatenate(data, axis=0)
>>> x.shape
(12, 4)
>>> da.concatenate(data, axis=1).shape
(4, 12)
dask.array.random¶
dask.array copies the numpy.random module for all univariate distributions. The interface to each function is identical except for the addition of a new chunks= keyword argument.
import numpy as np
x = np.random.normal(10, 0.1, size=(10, 10))
import dask.array as da
x = da.random.normal(10, 0.1, size=(10, 10), chunks=(5, 5))
Overlapping Blocks with Ghost Cells¶
Some array operations require communication of borders between neighboring blocks. Example operations include the following:
- Convolve a filter across an image
- Sliding sum/mean/max, ...
- Search for image motifs like a Gaussian blob that might span the border of a block
- Evaluate a partial derivative
- Play the game of Life
Dask.array supports these operations by creating a new dask.array where each block is slightly expanded by the borders of its neighbors. This costs an excess copy and the communication of many small chunks but allows localized functions to evaluate in an embarrassing manner. We call this process ghosting.
Ghosting¶
Consider two neighboring blocks in a dask array.

We extend each block by trading thin nearby slices between arrays

We do this in all directions, including also diagonal interactions with the ghost function:

>>> import dask.array as da
>>> import numpy as np
>>> x = np.arange(64).reshape((8, 8))
>>> d = da.from_array(x, chunks=(4, 4))
>>> d.chunks
((4, 4), (4, 4))
>>> g = da.ghost.ghost(d, depth={0: 2, 1: 1},
... boundary={0: 100, 1: 'reflect'})
>>> g.chunks
((8, 8), (6, 6))
>>> np.array(g)
array([[100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
[100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
[ 0, 0, 1, 2, 3, 4, 3, 4, 5, 6, 7, 7],
[ 8, 8, 9, 10, 11, 12, 11, 12, 13, 14, 15, 15],
[ 16, 16, 17, 18, 19, 20, 19, 20, 21, 22, 23, 23],
[ 24, 24, 25, 26, 27, 28, 27, 28, 29, 30, 31, 31],
[ 32, 32, 33, 34, 35, 36, 35, 36, 37, 38, 39, 39],
[ 40, 40, 41, 42, 43, 44, 43, 44, 45, 46, 47, 47],
[ 16, 16, 17, 18, 19, 20, 19, 20, 21, 22, 23, 23],
[ 24, 24, 25, 26, 27, 28, 27, 28, 29, 30, 31, 31],
[ 32, 32, 33, 34, 35, 36, 35, 36, 37, 38, 39, 39],
[ 40, 40, 41, 42, 43, 44, 43, 44, 45, 46, 47, 47],
[ 48, 48, 49, 50, 51, 52, 51, 52, 53, 54, 55, 55],
[ 56, 56, 57, 58, 59, 60, 59, 60, 61, 62, 63, 63],
[100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
[100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100]])
Boundaries¶
While ghosting you can specify how to handle the boundaries. Current policies include the following:
- periodic - wrap borders around to the other side
- reflect - reflect each border outwards
- any-constant - pad the border with this value
So an example boundary kind argument might look like the following
{0: 'periodic',
1: 'reflect',
2: np.nan}
Alternatively you can use functions like da.fromfunction and da.concatenate to pad arbitrarily.
Map a function across blocks¶
Ghosting goes hand-in-hand with mapping a function across blocks. This function can now use the additional information copied over from the neighbors that is not stored locally in each block
>>> from scipy.ndimage.filters import gaussian_filter
>>> def func(block):
... return gaussian_filter(block, sigma=1)
>>> filt = g.map_blocks(func)
While in this case we used a SciPy function above this could have been any arbitrary function. This is a good interaction point with Numba.
If your function does not preserve the shape of the block then you will need to provide a chunks keyword argument. If your block sizes are regular then this can be a blockshape, e.g. (1000, 1000) or if your blocks are irregular then this must be a full chunks tuple, e.g. ((1000, 700, 1000), (200, 300)).
>>> g.map_blocks(myfunc, chunks=(5, 5))
If your function needs to know the location of the block on which it operates you can give your function a keyword argument block_id
def func(block, block_id=None):
...
This extra keyword argument will be given a tuple that provides the block location like (0, 0) for the upper right block or (0, 1) for the block just to the right of that block.
Trim Excess¶
After mapping a blocked function you may want to trim off the borders from each block by the same amount by which they were expanded. The function trim_internal is useful here and takes the same depth argument given to ghost.
>>> x.chunks
((10, 10, 10, 10), (10, 10, 10, 10))
>>> da.ghost.trim_internal(x, {0: 2, 1: 1})
((6, 6, 6, 6), (8, 8, 8, 8))
Note: at the moment ``trim`` cuts indiscriminately from the boundaries as well. If you don’t specify a boundary kind then this may not be desired.
Full Workflow¶
And so a pretty typical ghosting workflow includes ghost, map_blocks, and trim_internal
>>> x = ...
>>> g = da.ghost.ghost(x, depth={0: 2, 1: 2},
... boundary={0: 'periodic', 1: 'periodic'})
>>> g2 = g.map_blocks(myfunc)
>>> result = da.ghost.trim_internal(g2, {0: 2, 1: 2})
Blaze¶
Difference between Dask Arrays and Blaze¶
Blaze and Dask.array both provide array abstractions over biggish data, what is the difference?
In short, Blaze is one level more abstract than Dask. Blaze is an expression system and thinks about syntax trees while Dask is a scheduling system and thinks about blocked algorithms and directed acyclic graphs.
Blaze reasons about and optimizes the expressions that a user types in, optimizing order of execution, operator fusion, checking type errors, etc.. Blaze applies these optimizations and then translates to a variety of computational systems, passing work off to them. One such computational system is dask.array.
Dask.arrays are fundamentally a way to create task schedules that execute blocked matrix algorithms. Dask.array does not think or optimize at the expression level like Blaze. Instead each operation on dask.arrays produces a new dask.array with its own task directed acyclic graph. Dask.arrays then optimize this graph in ways very different from how Blaze might act on an expression tree.
Finally, Blaze is general while Dask.array is specific to blocked NumPy algorithms.
Example¶
Consider the following scalar expression on the array x.
>>> (((x + 1) * 2) ** 3)
If x is a dask.array with 1000 blocks then each binary operation adds 1000 new tasks to the task graph. Dask is unable to reason effectively about the expression that the user has typed in.
However if x is a Blaze symbol then this graph only has a few nodes (x, 1, x + 1, ...) and so Blaze is able to wrap this tree up into a fused scalar operation. If we then decide to execute the expression against dask.array then Blaze can intelligently craft Numba ufuncs for dask.array to use.
Why use Blaze?¶
Blaze and Dask have orthogonal sets of optimizations. When we use them together we can optimize first the expression tree and then translate to dask and optimize the task dependency graph.
Currently Blaze offers the following concrete benefits:
- Smoother immediate feedback for type and shape errors
- dtype tracking
- Numba integration for element-wise operations
- Integration with other Blaze projects like Blaze Server
However this comes at a cost of indirection and potential confusion.
These different projects (Blaze -> dask.array -> NumPy -> Numba) act as different stages in a compiler. They start at abstract syntax trees, move to task DAGs, then to in-core computations, finally to LLVM and beyond. For simple problems you may only need to think about the middle of this chain (NumPy, dask.array) but as you require more performance optimizations you extend your interest to the outer edges (Blaze, Numba).
How to use Blaze with Dask¶
We can drive dask arrays with Blaze.
>>> x = da.from_array(...) # Make a dask array
>>> from blaze import Data, log, compute
>>> d = Data(x) # Wrap with Blaze
>>> y = log(d + 1)[:5].sum(axis=1) # Do work as usual
>>> result = compute(y) # Fall back to dask
If you’re comfortable using Blaze and into you can jump directly from the blaze expression to storage, leaving it to handle dataset creation.
>>> from blaze import Data, log, into
>>> d = Data(x)
>>> y = log(d + 1)[:5].sum(axis=1)
>>> into('myfile.hdf5::/data', y)
Extend Dask Array¶
As discussed in the array design document to create a dask Array object you need the following:
- A dask graph
- A name specifying a set of keys within that graph
- A chunks tuple giving chunk shape information
- A dtype
Often dask.array functions take other Array objects as inputs along with parameters, add tasks to a new dask dictionary, create a new chunks tuple, and then construct and return a new Array object. The hard parts are invariably creating the right tasks and creating a new chunks tuple. Careful review of the array design document is suggested.
Example eye¶
Consider this simple example with the eye function.
from dask.array.core import tokens
def eye(n, blocksize):
chunks = ((blocksize,) * n // blocksize,
(blocksize,) * n // blocksize)
name = 'eye' + next(tokens) # unique identifier
dsk = {(name, i, j): (np.eye, blocksize)
if i == j else
(np.zeros, (blocksize, blocksize))
for i in range(n // blocksize)
for j in range(n // blocksize)}
dtype = np.eye(0).dtype # take dtype default from numpy
return Array(dsk, name, chunks, dtype)
This example is particularly simple because it doesn’t take any Array objects as input.
Example diag¶
Consider the function diag that takes a 1d vector and produces a 2d matrix with the values of the vector along the diagonal. Consider the case where the input is a 1d array with chunk sizes (2, 3, 4) in the first dimension like this:
[x_0, x_1], [x_2, x_3, x_4], [x_5, x_6, x_7, x_8]
We need to create a 2d matrix with chunks equal to ((2, 3, 4), (2, 3, 4)) where the ith block along the diagonal of the output is the result of calling np.diag on the ith block of the input and all other blocks are zero.
def diag(v):
"""Construct a diagonal array, with ``v`` on the diagonal."""
chunks = (v.chunks[0], v.chunks[0]) # repeat chunks twice
name = 'diag' + next(tokens) # unique identifier
dsk = {(name, i, j): (np.diag, (v.name, i))
if i == j else
(np.zeros, (v.chunks[0][i], v.chunks[0][j]))
for i in range(len(v.chunks[0]))
for j in range(len(v.chunks[0]))}
dsk.update(v.dask) # include dask graph of the input
dtype = v.dtype # output has the same dtype as the input
return Array(dsk, name, chunks, dtype)
>>> x = da.arange(9, chunks=((2, 3, 4),))
>>> x
dask.array<arange-1, shape=(9,), chunks=((2, 3, 4)), dtype=int64>
>>> M = diag(x)
>>> M
dask.array<diag-2, shape=(9, 9), chunks=((2, 3, 4), (2, 3, 4)), dtype=int64>
>>> M.compute()
array([[0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 2, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 3, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 4, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 5, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 6, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 7, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 8]])
Bag¶
Dask.Bag parallelizes computations across a large collection of generic Python objects. It is particularly useful when dealing with large quantities of semi-structured data like JSON blobs or log files.
Name¶
Bag is an abstract collection, like list or set. It is a friendly synonym to multiset.
- list: ordered collection with repeats, [1, 2, 3, 2]
- set: unordered collection without repeats, {1, 2, 3}
- bag: unordered collection with repeats, {1, 2, 2, 3}
So a bag is like a list but doesn’t guarantee an ordering among elements. There can be repeated elements but you can’t ask for a particular element.
Example¶
We commonly use dask.bag to process unstructured or semi-structured data.
>>> import dask.bag as db
>>> import json
>>> js = db.from_filenames('logs/2015-*.json.gz').map(json.loads)
>>> js.take(2)
({'name': 'Alice', 'location': {'city': 'LA', 'state': 'CA'}},
{'name': 'Bob', 'location': {'city': 'NYC', 'state': 'NY'})
>>> result = js.pluck('name').frequencies() # just another Bag
>>> dict(result) # Evaluate Result
{'Alice': 10000, 'Bob': 5555, 'Charlie': ...}
Create Bags¶
There are several ways to create dask.bags around your data
db.from_sequence¶
You can create a bag from an existing Python sequence.
>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])
You can control the number of partitions into which this data is binned.
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)
This controls the granularity of the parallelism that you expose. By default dask will try to partition your data into about 100 partitions.
Warning: you should not load your data into Python and then load that data into dask.bag. Instead, you should use dask.bag to load your data. This parallelizes the loading step and reduces inter-worker communication.
>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)
db.from_filenames¶
Dask.bag can load data from textfiles directly. You can pass either a single filename, a list of filenames, or a globstring. The resulting bag will have one item per line, one file per partition.
>>> b = db.from_filenames('myfile.json')
>>> b = db.from_filenames(['myfile.1.json', 'myfile.2.json', ...])
>>> b = db.from_filenames('myfile.*.json')
Dask.bag handles standard compression libraries, notably gzip and bz2, based on the filename extension.
>>> b = db.from_filenames('myfile.*.json.gz')
The resulting items in the bag are strings. You may want to parse them using functions like json.loads
>>> import json
>>> b = db.from_filenames('myfile.*.json.gz').map(json.loads)
Or do string munging tasks. For convenience there is a string namespace attached directly to bags with .str.methodname.
>>> b = db.from_filenames('myfile.*.csv.gz').str.strip().str.split(',')
db.from_hdfs¶
Dask.bag can use WebHDFS to load text data from HDFS
>>> from pywebhdfs.webhdfs import PyWebHdfsClient
>>> hdfs = PyWebHdfsClient(host='hostname', user_name='hdfs')
>>> b = db.from_hdfs('/user/username/data/2015/06/', hdfs=hdfs)
If the input is a directory then we return all data underneath that directory and all subdirectories.
This uses WebHDFS to pull data from HDFS and so only works if that is enabled. It does not require your computer to actually be on HDFS, merely that you have network access. Data will be downloaded to memory, decompressed, used, and cleaned up as necessary.
Notably, this function does not tightly integrate dask.bag with a Hadoop cluster. Computation is not guaranteed (or likely) to be local to the node that has the data. This functionality is not the same as what you would get with Hadoop or Spark. No dask scheduler currently integrates nicely with data-local file systems like HDFS.
Execution¶
Execution on bags provide two benefits
- Streaming: data processes lazily, allowing smooth execution of larger-than-memory data
- Parallel: data is split up, allowing multiple cores to execute in parallel
Trigger Evaluation¶
Bags have a .compute() method to trigger computation.
>>> c = b.map(func)
>>> c.compute()
[1, 2, 3, 4, ...]
You must ensure that your result will fit in memory.
Bags also support the __iter__ protocol and so work well with pythonic collections like list, tuple, set, dict. Converting your object into a list or dict can look more Pythonic than calling .compute()
>>> list(b.map(lambda x: x + 1))
[1, 2, 3, 4, ...]
>>> dict(b.frequencies())
{'Alice': 100, 'Bob': 200, ...}
Default scheduler¶
By default dask.bag uses dask.multiprocessing for computation. As a benefit dask bypasses the GIL and uses multiple cores on Pure Python objects. As a drawback dask.bag doesn’t perform well on computations that include a great deal of inter-worker communication. For common operations this is rarely an issue as most dask.bag workflows are embarrassingly parallel or result in reductions with little data moving between workers.
Additionally, using multiprocessing opens up potential problems with function serialization (see below).
Shuffle¶
Some operations, like full groupby and bag-to-bag join do require substantial inter-worker communication. These are handled specially by shuffle operations that use disk and a central memory server as a central point of communication.
Shuffle operations are expensive and better handled by projects like dask.dataframe. It is best to use dask.bag to clean and process data then transform it into an array or dataframe before embarking on the more complex operations that require shuffle steps.
Dask.bag uses partd to perform efficient, parallel, spill-to-disk shuffles.
Function Serialization and Error Handling¶
Dask.bag uses dill to serialize functions to send to worker processes. Dill supports almost any kind of function, including lambdas, closures, partials and functions defined interactively.
When an error occurs in a remote process the dask schedulers record the Exception and the traceback and delivers these to the main process. These tracebacks can not be navigated (i.e. you can’t use pdb) but still contain valuable contextual information.
These two features are arguably the most important when comparing dask.bag to direct use of multiprocessing.
If you would like to turn off multiprocessing you can do so by setting the default get function to the synchronous single-core scheduler
>>> from dask.async import get_sync
>>> b.compute(get=get_sync)
or
>>> import dask
>>> dask.set_options(get=get_sync) # set global
>>> list(b) # uses synchronous scheduler
Known Limitations¶
Bags provide very general computation (any Python function.) This generality comes at cost. Bags have the following known limitations
- By default they rely on the multiprocessing scheduler, which has its own set of known limitations (see shared)
- Bag operations tend to be slower than array/dataframe computations in the same way that Python tends to be slower than NumPy/pandas
- Bag.groupby is slow. You should try to use Bag.foldby if possible. Using Bag.foldby requires more thought.
- The implementation backing Bag.groupby is under heavy churn.
API¶
Create Bags¶
from_sequence(seq[, partition_size, npartitions]) | Create dask from Python sequence |
from_filenames(filenames[, chunkbytes, encoding]) | Create dask by loading in lines from many files |
from_hdfs(path[, hdfs, host, port, user_name]) | Create dask by loading in files from HDFS |
concat(bags) | Concatenate many bags together, unioning all elements |
Turn Bags into other things¶
Bag.to_textfiles(path[, name_function, encoding]) | Write bag to disk, one filename per partition, one line per element |
Bag.to_dataframe([columns]) | Convert Bag to dask.dataframe |
Bag Methods¶
- class dask.bag.core.Bag(dsk, name, npartitions)¶
Parallel collection of Python objects
Methods
all((iterable) -> bool) Return True if bool(x) is True for all values x in the iterable. any((iterable) -> bool) Return True if bool(x) is True for any x in the iterable. compute(**kwargs) concat() Concatenate nested lists into one long list count() Count the number of elements distinct() Distinct elements of collection filter(predicate) Filter elements in collection by a predicate function fold(binop[, combine, initial]) Parallelizable reduction foldby(key, binop[, initial, combine, ...]) Combined reduction and groupby frequencies() Count number of occurrences of each distinct element from_filenames(*args, **kwargs) from_sequence(*args, **kwargs) groupby(grouper[, npartitions, blocksize]) Group collection by key function join(other, on_self[, on_other]) Join collection with another collection map(func) Map a function across all elements in collection map_partitions(func) Apply function to every partition within collection max((iterable[[, key]) max(a, b, c, ...[, key=func]) -> value mean() Arithmetic mean min((iterable[[, key]) min(a, b, c, ...[, key=func]) -> value pluck(key[, default]) Select item from all tuples/dicts in collection product(other) Cartesian product between two bags reduction(perpartition, aggregate) Reduce collection with reduction operators remove(predicate) Remove elements in collection that match predicate std([ddof]) Standard deviation sum((sequence[, start]) -> value) Return the sum of a sequence of numbers (NOT strings) plus the value of parameter ‘start’ (which defaults to 0). take(k[, compute]) Take the first k elements to_dataframe([columns]) Convert Bag to dask.dataframe to_textfiles(path[, name_function, encoding]) Write bag to disk, one filename per partition, one line per element topk(k[, key]) K largest elements in collection var([ddof]) Variance visualize([filename, optimize_graph]) - all(iterable) → bool¶
Return True if bool(x) is True for all values x in the iterable. If the iterable is empty, return True.
- any(iterable) → bool¶
Return True if bool(x) is True for any x in the iterable. If the iterable is empty, return False.
- concat()¶
Concatenate nested lists into one long list
>>> b = from_sequence([[1], [2, 3]]) >>> list(b) [[1], [2, 3]]
>>> list(b.concat()) [1, 2, 3]
- count()¶
Count the number of elements
- distinct()¶
Distinct elements of collection
Unordered without repeats.
>>> b = from_sequence(['Alice', 'Bob', 'Alice']) >>> sorted(b.distinct()) ['Alice', 'Bob']
- filter(predicate)¶
Filter elements in collection by a predicate function
>>> def iseven(x): ... return x % 2 == 0
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.filter(iseven)) [0, 2, 4]
- fold(binop, combine=None, initial='__no__default__')¶
Parallelizable reduction
Fold is like the builtin function reduce except that it works in parallel. Fold takes two binary operator functions, one to reduce each partition of our dataset and another to combine results between partitions
- binop: Binary operator to reduce within each partition
- combine: Binary operator to combine results from binop
Sequentially this would look like the following:
>>> intermediates = [reduce(binop, part) for part in partitions] >>> final = reduce(combine, intermediates)
If only one function is given then it is used for both functions binop and combine as in the following example to compute the sum:
>>> def add(x, y): ... return x + y
>>> b = from_sequence(range(5)) >>> b.fold(add).compute() 10
In full form we provide both binary operators as well as their default arguments
>>> b.fold(binop=add, combine=add, initial=0).compute() 10
More complex binary operators are also doable
>>> def add_to_set(acc, x): ... ''' Add new element x to set acc ''' ... return acc | set([x]) >>> b.fold(add_to_set, set.union, initial=set()).compute() {1, 2, 3, 4, 5}
See also
- foldby(key, binop, initial='__no__default__', combine=None, combine_initial='__no__default__')¶
Combined reduction and groupby
Foldby provides a combined groupby and reduce for efficient parallel split-apply-combine tasks.
The computation
>>> b.foldby(key, binop, init)
is equivalent to the following:
>>> def reduction(group): ... return reduce(binop, group, init)
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))
But uses minimal communication and so is much faster.
>>> b = from_sequence(range(10)) >>> iseven = lambda x: x % 2 == 0 >>> add = lambda x, y: x + y >>> dict(b.foldby(iseven, add)) {True: 20, False: 25}
See also
toolz.reduceby, pyspark.combineByKey
- frequencies()¶
Count number of occurrences of each distinct element
>>> b = from_sequence(['Alice', 'Bob', 'Alice']) >>> dict(b.frequencies()) {'Alice': 2, 'Bob', 1}
- groupby(grouper, npartitions=None, blocksize=1048576)¶
Group collection by key function
Note that this requires full dataset read, serialization and shuffle. This is expensive. If possible you should use foldby.
>>> b = from_sequence(range(10)) >>> dict(b.groupby(lambda x: x % 2 == 0)) {True: [0, 2, 4, 6, 8], False: [1, 3, 5, 7, 9]}
See also
- join(other, on_self, on_other=None)¶
Join collection with another collection
Other collection must be an Iterable, and not a Bag.
>>> people = from_sequence(['Alice', 'Bob', 'Charlie']) >>> fruit = ['Apple', 'Apricot', 'Banana'] >>> list(people.join(fruit, lambda x: x[0])) [('Apple', 'Alice'), ('Apricot', 'Alice'), ('Banana', 'Bob')]
- map(func)¶
Map a function across all elements in collection
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.map(lambda x: x * 10)) [0, 10, 20, 30, 40]
- map_partitions(func)¶
Apply function to every partition within collection
Note that this requires you to understand how dask.bag partitions your data and so is somewhat internal.
>>> b.map_partitions(myfunc)
- max(iterable[, key=func]) → value¶
max(a, b, c, ...[, key=func]) -> value
With a single iterable argument, return its largest item. With two or more arguments, return the largest argument.
- mean()¶
Arithmetic mean
- min(iterable[, key=func]) → value¶
min(a, b, c, ...[, key=func]) -> value
With a single iterable argument, return its smallest item. With two or more arguments, return the smallest argument.
- pluck(key, default='__no__default__')¶
Select item from all tuples/dicts in collection
>>> b = from_sequence([{'name': 'Alice', 'credits': [1, 2, 3]}, ... {'name': 'Bob', 'credits': [10, 20]}]) >>> list(b.pluck('name')) ['Alice', 'Bob'] >>> list(b.pluck('credits').pluck(0)) [1, 10]
- product(other)¶
Cartesian product between two bags
- reduction(perpartition, aggregate)¶
Reduce collection with reduction operators
Parameters: perpartition: function
reduction to apply to each partition
aggregate: function
reduction to apply to the results of all partitions
- remove(predicate)¶
Remove elements in collection that match predicate
>>> def iseven(x): ... return x % 2 == 0
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.remove(iseven)) [1, 3]
- std(ddof=0)¶
Standard deviation
- sum(sequence[, start]) → value¶
Return the sum of a sequence of numbers (NOT strings) plus the value of parameter ‘start’ (which defaults to 0). When the sequence is empty, return start.
- take(k, compute=True)¶
Take the first k elements
Evaluates by default, use compute=False to avoid computation. Only takes from the first partition
>>> b = from_sequence(range(10)) >>> b.take(3) (0, 1, 2)
- to_dataframe(columns=None)¶
Convert Bag to dask.dataframe
Bag should contain tuple or dict records.
Provide columns= keyword arg to specify column names.
Index will not be particularly meaningful. Use reindex afterwards if necessary.
- to_textfiles(path, name_function=<type 'str'>, encoding='ascii')¶
Write bag to disk, one filename per partition, one line per element
Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.
Use a globstring
>>> b.to_textfiles('/path/to/data/*.json.gz')
The * will be replaced by the increasing sequence 1, 2, ...
/path/to/data/0.json.gz /path/to/data/1.json.gz
Use a globstring and a name_function= keyword argument. The name_function function should expect an integer and produce a string.
>>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0) '2015-01-01' >>> name(15) '2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)
/path/to/data/2015-01-01.json.gz /path/to/data/2015-01-02.json.gz ...
You can also provide an explicit list of paths.
>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...] >>> b.to_textfiles(paths)
Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.
- topk(k, key=None)¶
K largest elements in collection
Optionally ordered by some key function
>>> b = from_sequence([10, 3, 5, 7, 11, 4]) >>> list(b.topk(2)) [11, 10]
>>> list(b.topk(2, lambda x: -x)) [3, 4]
- var(ddof=0)¶
Variance
Other functions¶
- dask.bag.core.from_sequence(seq, partition_size=None, npartitions=None)¶
Create dask from Python sequence
This sequence should be relatively small in memory. Dask Bag works best when it handles loading your data itself. Commonly we load a sequence of filenames into a Bag and then use .map to open them.
Parameters: seq: Iterable
A sequence of elements to put into the dask
partition_size: int (optional)
The length of each partition
npartitions: int (optional)
The number of desired partitions
It is best to provide either ``partition_size`` or ``npartitions``
(though not both.)
- dask.bag.core.from_filenames(filenames, chunkbytes=None, encoding='ascii')¶
Create dask by loading in lines from many files
Provide list of filenames
>>> b = from_filenames(['myfile.1.txt', 'myfile.2.txt'])
Or a globstring
>>> b = from_filenames('myfiles.*.txt')
Parallelize a large files by providing the number of uncompressed bytes to load into each partition.
>>> b = from_filenames('largefile.txt', chunkbytes=1e7)
- See also:
- from_sequence: A more generic bag creation function
- dask.bag.core.from_hdfs(path, hdfs=None, host='localhost', port='50070', user_name=None)¶
Create dask by loading in files from HDFS
Provide an hdfs directory and credentials
>>> b = from_hdfs('home/username/data/', host='localhost', user_name='ubuntu')
Alternatively provide an instance of pywebhdfs.webhdfs.PyWebHdfsClient
>>> from pywebhdfs.webhdfs import PyWebHdfsClient >>> hdfs = PyWebHdfsClient(host='hostname', user_name='username')
>>> b = from_hdfs('home/username/data/', hdfs=hdfs)
- dask.bag.core.concat(bags)¶
Concatenate many bags together, unioning all elements
>>> import dask.bag as db >>> a = db.from_sequence([1, 2, 3]) >>> b = db.from_sequence([4, 5, 6]) >>> c = db.concat([a, b])
>>> list(c) [1, 2, 3, 4, 5, 6]
DataFrame¶
Dask dataframes look and feel like pandas dataframes but operate on datasets larger than memory using multiple threads. Dask.dataframe does not implement the complete pandas interface.
The dask.dataframe module implements a blocked parallel DataFrame that mimics a subset of the pandas DataFrame. One dask DataFrame is comprised of several in-memory pandas DataFrames separated along the index. An operation on one dask DataFrame triggers many pandas operations on the constituent pandas DataFrames in a way that is mindful of potential parallelism and memory constraints.
Dask.dataframe copies the pandas API¶
Because the dask.dataframe API is a subset of the pandas API it should be familiar to pandas users. There are some slight alterations due to the parallel nature of dask.
>>> import dask.dataframe as dd
>>> df = dd.read_csv('2014-*.csv.gz', compression='gzip')
>>> df.head()
x y
0 1 a
1 2 b
2 3 c
3 4 a
4 5 b
5 6 c
>>> df2 = df[df.y == 'a'].x + 1
As with all dask collections (e.g. Array, Bag, DataFrame) one triggers computation by calling the .compute() method.
>>> df2.compute()
0 2
3 5
Name: x, dtype: int64
Threaded Scheduling¶
By default dask.dataframe uses the multi-threaded scheduler. This exposes some parallelism when pandas or the underlying numpy operations release the GIL. Generally pandas is more GIL bound than NumPy, so multi-core speedups are not as pronounced for dask.dataframe as they are for dask.array. This is changing and the pandas development team is actively working on releasing the GIL.
What doesn’t work?¶
Dask.dataframe only covers a small but well-used portion of the pandas API. This limitation is for two reasons:
- The pandas API is huge
- Some operations are genuinely hard to do in parallel (e.g. sort)
Additionally, some important operations like set_index work, but are slower than in pandas because they may write out to disk.
What definitely works?¶
- Trivially parallelizable operations (fast):
- Elementwise operations: df.x + df.y, df * df
- Row-wise selections: df[df.x > 0]
- Loc: df.loc[4.0:10.5]
- Common aggregations: df.x.max(), df.max()
- Is in: df[df.x.isin([1, 2, 3])]
- Datetime/string accessors: df.timestamp.month
- Cleverly parallelizable operations (also fast):
- groupby-aggregate (with common aggregations): df.groupby(df.x).y.max(), df.groupby('x').max()
- value_counts: df.x.value_counts()
- Drop duplicates: df.x.drop_duplicates()
- Join on index: dd.merge(df1, df2, left_index=True, right_index=True)
- Operations requiring a shuffle (slow-ish, unless on index)
- Set index: df.set_index(df.x)
- groupby-apply (with anything): df.groupby(df.x).apply(myfunc)
- Join not on the index: pd.merge(df1, df2, on='name')
- Elementwise operations with different partitions / divisions: df1.x + df2.y
- Ingest operations
- CSVs: dd.read_csv
- pandas: dd.from_pandas
- Anything supporting numpy slicing: dd.from_array
- Dask.bag: mybag.to_dataframe(columns=[...])
Partitions¶
Internally a dask dataframe is split into many partitions, each partition is one pandas dataframe. These dataframes are split vertically along the index. When our index is sorted and we know the values of the divisions of our partitions then we can be clever and efficient.
For example, if we have a time-series index then our partitions might be divided by month. All of January will live in one partition while all of February will live in the next. In these cases operations like loc, groupby, and join/merge along the index can be much more efficient than would otherwise be possible in parallel. You can view the number of partitions and divisions of your dataframe with the following fields
>>> df.npartitions
4
>>> df.divisions
['2015-01-01', '2015-02-01', '2015-03-01', '2015-04-01', '2015-04-31']
Divisions includes the minimum value of every partition’s index and the maximum value of the last partition’s index. In the example above if the user searches for a specific datetime range then we know which partitions we need to inspect and which we can drop.
>>> df.loc['2015-01-20': '2015-02-10'] # Must inspect first two partitions
Often we do not have such information about our partitions. When reading CSV files for example we do not know, without extra user input, how the data is divided. In this case .divisions will be all None.
>>> df.divisions
[None, None, None, None, None]
Graphs:
Dask graphs encode algorithms in a simple format involving Python dicts, tuples, and functions. This graph format can be used in isolation from the dask collections. If you are a developer then you should start here.
Motivation¶
An explanation of dask task graphs.
Motivation¶
Normally humans write programs and then compilers/interpreters interpret them (e.g. python, javac, clang). Sometimes humans disagree with how these compilers/interpreters choose to interpret and execute their programs. In these cases humans often bring the analysis, optimization, and execution of code into the code itself.
Commonly a desire for parallel execution causes this shift of responsibility from compiler to human developer. In these cases we often represent the structure of our program explicitly as data within the program itself.
A common approach to parallel execution in user-space is task scheduling. In task scheduling we break our program into many medium-sized tasks or units of computation (often a function call on a non-trivial amount of data). We represent these tasks as nodes in a graph with edges between nodes if one task depends on data produced by another. We call upon a task scheduler to execute this graph in a way that respects these data dependencies and leverages parallelism where possible (e.g. multiple independent tasks can be run simultaneously.)
Many solutions exist. This is a common approach in parallel execution frameworks. Often task scheduling logic hides within other larger frameworks (Luigi, Storm, Spark, IPython Parallel, etc.) and so is often reinvented.
Dask is a specification that encodes task schedules with minimal incidental complexity using terms common to all Python projects, namely dicts, tuples, and callables. Ideally this minimum solution is easy to adopt and understand by a broad community.
Example¶

Consider the following simple program
def inc(i):
return i + 1
def add(a, b):
return a + b
x = 1
y = inc(x)
z = add(y, 10)
We encode this as a dictionary in the following way
d = {'x': 1,
'y': (inc, 'x'),
'z': (add, 'y', 10)}
While less pleasant than our original code this representation can be analyzed and executed by other Python code, not just the CPython interpreter. We don’t recommend that users write code in this way, but rather that it is an appropriate target for automated systems. Also, in non-toy examples the execution times are likely much larger than for inc and add, warranting the extra complexity.
Schedulers¶
The dask library currently contains a few schedulers to execute these graphs. Each scheduler works differently, providing different performance guarantees and operating in different contexts. These implementations are not special and others can write different schedulers better suited to other applications or architectures easily. Systems that emit dask graphs (like dask.array, dask.bag, etc.) may leverage the appropriate scheduler for the application and hardware.
Specification¶
We represent a computation as a directed acyclic graph of tasks with data dependencies. Dask is a specification to encode such a graph using ordinary Python data structures, namely dicts, tuples, functions, and arbitrary Python values.
Definitions¶
A dask graph is a dictionary mapping data-keys to values or tasks.
{'x': 1,
'y': 2,
'z': (add, 'x', 'y'),
'w': (sum, ['x', 'y', 'z'])}
A key is any hashable value that is not a task.
'x'
('x', 2, 3)
A task is a tuple with a callable first element. Tasks represent atomic units of work meant to be run by a single worker.
(add, 'x', 'y')
We represent a task as a tuple such that the first element is a callable function (like add), and the succeeding elements are arguments for that function.
An argument may be one of the following:
- Any key present in the dask like 'x'
- Any other value like 1, to be interpreted literally
- Other tasks like (inc, 'x')
- List of arguments, like [1, 'x', (inc, 'x')]
So all of the following are valid tasks
(add, 1, 2)
(add, 'x', 2)
(add, (inc, 'x'), 2)
(sum, [1, 2])
(sum, ['x', (inc, 'x')])
(np.dot, np.array([...]), np.array([...]))
To encode keyword arguments we recommend the use of functools.partial or toolz.curry.
What functions should expect¶
In cases like (add, 'x', 'y') functions like add receive concrete values instead of keys. A dask scheduler replaces keys (like 'x' and 'y') with their computed values (like 1, and 2) before calling the add function.
If the argument is a list then a function should expect an Iterator of concrete values.
Entry Point - The get function¶
The get function serves as entry point to computation. This function gets the value associated to the given key. That key may refer to stored data as is the case with 'x' or a task as is the case with 'z'. In the latter case get should perform all necessary computation to retrieve the computed value.
>>> dsk = {'x': 1,
... 'y': 2,
... 'z': (add, 'x', 'y'),
... 'w': (sum, ['x', 'y', 'z'])}
>>> get(dsk, 'x')
1
>>> get(dsk, 'z')
3
>>> get(dsk, 'w')
6
Additionally if given a list get should simultaneously acquire values for multiple keys
>>> get(dsk, ['x', 'y', 'z'])
[1, 2, 3]
Because we accept lists of keys as keys we support nested lists.
>>> get(dsk, [['x', 'y'], ['z', 'w']])
[[1, 2], [3, 6]]
Internally get can be arbitrarily complex, calling out to distributed computing, using caches, etc..
Why use tuples¶
With (add, 'x', 'y') we wish to encode “the result of calling add on the values corresponding to the keys 'x' and 'y'.
We intend the following meaning:
add('x', 'y') # after x and y have been replaced
But this will err because Python executes the function immediately before we know values for 'x' and 'y'.
We delay the execution by moving the opening parenthesis one term to the left, creating a tuple.
Before: add( 'x', 'y')
After: (add, 'x', 'y')
This lets us store the desired computation as data that we can analyze using other Python code rather than cause immediate execution.
LISP users will identify this as an s-expression or as a rudimentary form of quoting.
Custom Graphs¶
Sometimes you want parallel computing but your application doesn’t fit neatly into something like dask.array or dask.bag. In these cases you can interact with the dask schedulers directly. These schedulers operate well as standalone modules.
This separation provides a release valve for complex situations and allows advanced projects additional opportunities for parallel execution, even if those projects have an internal representation for their computations. As dask schedulers improve or expand to distributed memory, code written to use dask schedulers will advance as well.
Example¶

As discussed in the motivation and specification sections, the schedulers take a task graph (a dict of tuples of functions) and a list of desired keys from that graph
Here is a mocked out example building a graph for a traditional clean and analyze pipeline.
def load(filename):
...
def clean(data):
...
def analyze(sequence_of_data):
...
def store(result):
with open(..., 'w') as f:
f.write(result)
dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}
from dask.multiprocessing import get
get(dsk, 'store') # executes in parallel
Imperative¶
As discussed in the custom graphs section, sometimes problems don’t fit into one of the collections like dask.bag or dask.array. Instead of creating a dask directly using a dictionary, one can use the dask.imperative interface. This allows one to create graphs directly with a light annotation of normal python code.
Example¶
Rebuilding the example from custom graphs:
from dask.imperative import do, value
@do
def load(filename):
...
@do
def clean(data):
...
@do
def analyze(sequence_of_data):
...
@do
def store(result):
with open(..., 'w') as f:
f.write(result)
files = ['myfile.a.data', 'myfile.b.data', 'myfile.c.data']
loaded = [load(i) for i in files]
cleaned = [clean(i) for i in loaded]
analyzed = analyze(cleaned)
stored = store(analyzed)
stored.compute()
This builds the same graph as seen before, but using normal python syntax. In fact, the only difference between python code that would do this in serial, and the parallel version with dask is the do decorators on the functions, and the call to compute at the end.
How it works¶
The dask.imperative interface consists of two functions:
do
Wraps functions. Can be used as a decorator, or around function calls directly (i.e. do(foo)(a, b, c)). Outputs from functions wrapped in do are proxy objects of type Value that contain a graph of all operations done to get to this result.
value
Wraps objects. Used to create Value proxies directly.
Value objects can be thought of as representing a key in the dask. A Value supports most python operations, each of which creates another Value representing the result:
- Most operators (*, -, etc...)
- Item access and slicing (a[0])
- Attribute access (a.size)
- Method calls (a.index(0))
Operations that aren’t supported include:
- Mutating operators (a += 1)
- Mutating magics such as __setitem__/__setattr__ (a[0] = 1, a.foo = 1)
- Iteration. (for i in a: ...)
- Use as a predicate (if a: ...)
The last two in particular mean that Value objects cannot be used for control flow, meaning that no Value can appear in a loop or if statement. Even with this limitation, many workflows can be easily parallelized.
Example¶
Here we have a serial blocked computation for computing the mean of all positive elements in a large, on disk array.
x = h5py.File('myfile.hdf5')['/x'] # Trillion element array on disk
sums = []
counts = []
for i in range(1000000): # One million times
chunk = x[1000000*i:1000000*(i + 1)] # Pull out chunk
positive = chunk[chunk > 0] # Filter out negative elements
sums.append(positive.sum()) # Sum chunk
counts.append(positive.size) # Count chunk
result = sum(sums) / sum(counts) # Aggregate results
Below is the same code, parallelized using dask.imperative
x = value(h5py.File('myfile.hdf5')['/x']) # Trillion element array on disk
sums = []
counts = []
for i in range(1000000): # One million times
chunk = x[1000000*i:1000000*(i + 1)] # Pull out chunk
positive = chunk[chunk > 0] # Filter out negative elements
sums.append(positive.sum()) # Sum chunk
counts.append(positive.size) # Count chunk
result = do(sum)(sums) / do(sum)(counts) # Aggregate results
result.compute() # Perform the computation
Only 3 lines had to change to make this computation parallel instead of serial.
- Wrap the original array in value. This makes all the slices on it return Value objects.
- Wrap both calls to sum with do.
- Call the compute method on the result.
While the for loop above still iterates fully, it’s just building up a graph of the computation that needs to happen, without actually doing any computing.
Definitions¶
- dask.imperative.value(val, name=None)¶
Create a Value from a python object.
Parameters: val : object
Object to be wrapped.
name : string, optional
Name to be used in the resulting dask.
Examples
>>> a = value([1, 2, 3]) >>> a.compute() [1, 2, 3]
Values can act as a proxy to the underlying object. Many operators are supported:
>>> (a + [1, 2]).compute() [1, 2, 3, 1, 2] >>> a[1].compute() 2
Method and attribute access also works:
>>> a.count(2).compute() 1
Note that if a method doesn’t exist, no error will be thrown until runtime:
>>> res = a.not_a_real_method() >>> res.compute() AttributeError("'list' object has no attribute 'not_a_real_method'")
- dask.imperative.do()¶
Wraps a function so that it outputs a Value.
Examples
Can be used as a decorator:
>>> @do ... def add(a, b): ... return a + b >>> res = add(1, 2) >>> type(res) == Value True >>> res.compute() 3
For other cases, it may be cleaner to call do on a function at call time:
>>> res2 = do(sum)([res, 2, 3]) >>> res2.compute() 8
do also accepts an optional keyword pure. If False (default), then subsequent calls will always produce a different Value. This is useful for non-pure functions (such as time or random).
>>> from random import random >>> out1 = do(random)() >>> out2 = do(random)() >>> out1.key == out2.key False
If you know a function is pure (output only depends on the input, with no global state), then you can set pure=True. This will attempt to apply a consistent name to the output, but will fallback on the same behavior of pure=False if this fails.
>>> @do(pure=True) ... def add(a, b): ... return a + b >>> out1 = add(1, 2) >>> out2 = add(1, 2) >>> out1.key == out2.key True
- dask.imperative.compute(*args, **kwargs)¶
Evaluate several ``Value``s at once.
Note that the only difference between this function and dask.base.compute is that this implicitly converts python objects to ``Value``s, allowing for collections of dask objects to be computed.
Examples
>>> a = value(1) >>> b = a + 2 >>> c = a + 3 >>> compute(b, c) # Compute both simultaneously (3, 4) >>> compute(a, [b, c]) # Works for lists of Values (1, [3, 4])
Optimization¶
Small optimizations performed on the dask graph before calling the scheduler can significantly improve performance in different contexts. The dask.optimize module contains several functions to transform graphs in a variety of useful ways. In most cases, users won’t need to interact with these functions directly - specialized subsets of these transforms are done automatically in the dask collections (dask.array, dask.bag, and dask.dataframe). However, users working with custom graphs or computations may find that applying these methods results in substantial speedups.
In general, there are two goals when doing graph optimizations
- Simplify computation
- Improve parallelism
Simplifying computation can be done on a graph level by removing unnecessary tasks (cull), or on a task level by replacing expensive operations with cheaper ones (RewriteRule). Parallelism can be improved by reducing inter-task communication, whether by fusing many tasks into one (fuse), or by inlining cheap operations (inline, inline_functions).
Below, we show an example walking through the use of some of these to optimize a task graph.
Example¶
Suppose you had a custom dask graph for doing a word counting task.
>>> from __future__ import print_function
>>> def print_and_return(string):
print(string)
return string
>>> format_str = 'word list has {0} occurrences of {1}, out of {2} words'
>>> dsk = {'words': 'apple orange apple pear orange pear pear',
'nwords': (len, (str.split, 'words')),
'val1': 'orange',
'val2': 'apple',
'val3': 'pear',
'count1': (str.count, 'words', 'val1'),
'count2': (str.count, 'words', 'val2'),
'count3': (str.count, 'words', 'val3'),
'out1': (format_str.format, 'count1', 'val1', 'nwords'),
'out2': (format_str.format, 'count2', 'val2', 'nwords'),
'out3': (format_str.format, 'count3', 'val3', 'nwords'),
'print1': (print_and_return, 'out1'),
'print2': (print_and_return, 'out2'),
'print3': (print_and_return, 'out3')}

Here we’re counting the occurence of the words 'orange, 'apple', and 'pear' in the list of words, formatting an output string reporting the results, printing the output, then returning the output string.
To perform the computation, we pass the dask and the desired output keys to a scheduler get function.
>>> from dask.multiprocessing import get
>>> results = get(dsk, ['print1', 'print2'])
word list has 3 occurrences of pear, out of 7 words
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
>>> results
('word list has 2 occurrences of orange, out of 7 words',
'word list has 2 occurrences of apple, out of 7 words')
As can be seen above, the schedulers computed the whole graph before returning just a few of the outputs. This is because the schedulers will always compute all tasks, even if we only requested a few of the output keys. Before we pass the dask to get, we need to remove the unnecessary tasks from the graph. To do this, we can use the cull function.
>>> from dask.optimize import cull
>>> dsk1 = cull(dsk, ['print1', 'print2'])
>>> results = get(dsk1, ['print1', 'print2'])
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

Looking at the task graph above, there are multiple accesses to constants such as 'val1' or 'val2' in the dask. These can be inlined into the tasks to improve efficiency using the inline function.
>>> from dask.optimize import inline
>>> dsk2 = inline(dsk1)
>>> results = get(dsk2, ['print1', 'print2'])
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

Now we have two sets of almost linear task chains. The only link between them is the word counting function. For cheap operations like this, the serialization cost may be larger than the actual computation, so it may be faster to do the computation more than once, rather than passing the results to all nodes. To perform this function inlining, the inline_functions function can be used.
>>> from dask.optimize import inline_functions
>>> dsk3 = inline_functions(dsk2, [len, str.split])
>>> results = get(dsk3, ['print1', 'print2'])
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

Now we have a set of purely linear tasks. We’d like to have the scheduler run all of these on the same worker to reduce data serialization between workers. One option is just to merge these linear chains into one big task using the fuse function.
>>> from dask.optimize import fuse
>>> dsk4 = fuse(dsk3)
>>> results = get(dsk4, ['print1', 'print2'])
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words

Putting it all together:
>>> def optimize_and_get(dsk, keys):
dsk1 = cull(dsk, keys)
dsk2 = inline(dsk1)
dsk3 = inline_functions(dsk2, [len, str.split])
dsk4 = fuse(dsk2)
return get(dsk4, keys)
>>> optimize_and_get(dsk, ['print1', 'print2'])
word list has 2 occurrences of apple, out of 7 words
word list has 2 occurrences of orange, out of 7 words
In summary, the above operations:
- Removed tasks unncessary for the desired output using cull
- Inlined constants using inline
- Inlined cheap computations using inline_functions, improving parallelism
- Fused linear tasks together to ensure they run on the same worker, using fuse
These are just a few of the optimizations provided in dask.optimize, for more information see the api below. As stated previously, these optimizations are already performed automatically in the dask collections. Users not working with custom graphs or computations should have little reason to directly interact with them.
Rewrite Rules¶
For context based optimizations, dask.rewrite provides functionality for pattern matching and term rewriting. This is useful for replacing expensive computations with equivalent, cheaper computations. For example, dask.array uses the rewrite functionality to replace series of array slicing operations with a more efficient single slice.
The interface to the rewrite system consists of two classes:
RewriteRule(lhs, rhs, vars)
Given a left-hand-side (lhs), a right-hand-side (rhs), and a set of variables (vars), a rewrite rule declaratively encodes the following operation:
lhs -> rhs if task matches lhs over variables
RuleSet(*rules)
A collection of rewrite rules. The design of RuleSet class allows for efficient “many-to-one” pattern matching, meaning that there is minimal overhead for rewriting with multiple rules in a rule set.
Example¶
Here we create two rewrite rules expressing the following mathematical transformations:
- a + a -> 2*a
- a * a -> a**2
where 'a' is a variable.
>>> from dask.rewrite import RewriteRule, RuleSet
>>> from operator import add, mul, pow
>>> variables = ('a',)
>>> rule1 = RewriteRule((add, 'a', 'a'), (mul, 'a', 2), variables)
>>> rule2 = RewriteRule((mul, 'a', 'a'), (pow, 'a', 2), variables)
>>> rs = RuleSet(rule1, rule2)
The RewriteRule objects describe the desired transformations in a declarative way, and the RuleSet builds an efficient automata for applying that transformation. Rewriting can then be done using the rewrite method.
>>> rs.rewrite((add, 5, 5))
(mul, 1, 2)
>>> rs.rewrite((mul, 5, 5))
(pow, 5, 2)
>>> rs.rewrite((mul, (add, 3, 3), (add, 3, 3)))
(pow, (mul, 3, 2), 2)
The whole task is traversed by default. If you only want to apply a transform to the top-level of the task, you can pass in strategy='top_level'.
# Transforms whole task
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]))
(sum, [(mul, 3, 2), (pow, 3, 2)])
# Only applies to top level, no transform occurs
>>> rs.rewrite((sum, [(add, 3, 3), (mul, 3, 3)]), strategy='top_level')
(sum, [(add, 3, 3), (mul, 3, 3)])
The rewriting system provides a powerful abstraction for transforming computations at a task level, but for many users directly interacting with these transformations will be unnecessary.
API¶
Top level optimizations
cull(dsk, keys) | Return new dask with only the tasks required to calculate keys. |
fuse(dsk[, keys]) | Return new dask with linear sequence of tasks fused together. |
inline(dsk[, keys, inline_constants]) | Return new dask with the given keys inlined with their values. |
inline_functions(dsk[, fast_functions, ...]) | Inline cheap functions into larger operations |
Utility functions
dealias(dsk) | Remove aliases from dask |
dependency_dict(dsk) | Create a dict matching ordered dependencies to keys. |
equivalent(term1, term2[, subs]) | Determine if two terms are equivalent, modulo variable substitution. |
functions_of(task) | Set of functions contained within nested task |
merge_sync(dsk1, dsk2) | Merge two dasks together, combining equivalent tasks. |
sync_keys(dsk1, dsk2) | Return a dict matching keys in dsk2 to equivalent keys in dsk1. |
Rewrite Rules
RewriteRule(lhs, rhs[, vars]) | A rewrite rule. |
RuleSet(*rules) | A set of rewrite rules. |
Definitions¶
- dask.optimize.cull(dsk, keys)¶
Return new dask with only the tasks required to calculate keys.
In other words, remove unnecessary tasks from dask. keys may be a single key or list of keys.
Examples
>>> d = {'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)} >>> cull(d, 'out') {'x': 1, 'out': (add, 'x', 10)}
- dask.optimize.fuse(dsk, keys=None)¶
Return new dask with linear sequence of tasks fused together.
If specified, the keys in keys keyword argument are not fused.
This may be used as an optimization step.
Examples
>>> d = {'a': 1, 'b': (inc, 'a'), 'c': (inc, 'b')} >>> fuse(d) {'c': (inc, (inc, 1))} >>> fuse(d, keys=['b']) {'b': (inc, 1), 'c': (inc, 'b')}
- dask.optimize.inline(dsk, keys=None, inline_constants=True)¶
Return new dask with the given keys inlined with their values.
Inlines all constants if inline_constants keyword is True.
Examples
>>> d = {'x': 1, 'y': (inc, 'x'), 'z': (add, 'x', 'y')} >>> inline(d) {'y': (inc, 1), 'z': (add, 1, 'y')}
>>> inline(d, keys='y') {'z': (add, 1, (inc, 1))}
>>> inline(d, keys='y', inline_constants=False) {'x': 1, 'z': (add, 'x', (inc, 'x'))}
- dask.optimize.inline_functions(dsk, fast_functions=None, inline_constants=False)¶
Inline cheap functions into larger operations
Examples
>>> dsk = {'out': (add, 'i', 'd'), ... 'i': (inc, 'x'), ... 'd': (double, 'y'), ... 'x': 1, 'y': 1} >>> inline_functions(dsk, [inc]) {'out': (add, (inc, 'x'), 'd'), 'd': (double, 'y'), 'x': 1, 'y': 1}
- dask.optimize.dealias(dsk)¶
Remove aliases from dask
Removes and renames aliases using inline. Keeps aliases at the top of the DAG to ensure entry points stay the same.
Aliases are not expected by schedulers. It’s unclear that this is a legal state.
Examples
>>> dsk = {'a': (range, 5), ... 'b': 'a', ... 'c': 'b', ... 'd': (sum, 'c'), ... 'e': 'd', ... 'f': (inc, 'd')}
>>> dealias(dsk) {'a': (range, 5), 'd': (sum, 'a'), 'e': (identity, 'd'), 'f': (inc, 'd')}
- dask.optimize.dependency_dict(dsk)¶
Create a dict matching ordered dependencies to keys.
Examples
>>> from operator import add >>> dsk = {'a': 1, 'b': 2, 'c': (add, 'a', 'a'), 'd': (add, 'b', 'a')} >>> dependency_dict(dsk) {(): ['a', 'b'], ('a', 'a'): ['c'], ('b', 'a'): ['d']}
- dask.optimize.equivalent(term1, term2, subs=None)¶
Determine if two terms are equivalent, modulo variable substitution.
Equivalent to applying substitutions in subs to term2, then checking if term1 == term2.
If a subterm doesn’t support comparison (i.e. term1 == term2 errors), returns False.
Parameters: term1, term2 : terms
subs : dict, optional
Mapping of substitutions from term2 to term1
Examples
>>> from operator import add >>> term1 = (add, 'a', 'b') >>> term2 = (add, 'x', 'y') >>> subs = {'x': 'a', 'y': 'b'} >>> equivalent(term1, term2, subs) True >>> subs = {'x': 'a'} >>> equivalent(term1, term2, subs) False
- dask.optimize.functions_of(task)¶
Set of functions contained within nested task
Examples
>>> task = (add, (mul, 1, 2), (inc, 3)) >>> functions_of(task) set([add, mul, inc])
- dask.optimize.merge_sync(dsk1, dsk2)¶
Merge two dasks together, combining equivalent tasks.
If a task in dsk2 exists in dsk1, the task and key from dsk1 is used. If a task in dsk2 has the same key as a task in dsk1 (and they aren’t equivalent tasks), then a new key is created for the task in dsk2. This prevents name conflicts.
Parameters: dsk1, dsk2 : dict
Variable names in dsk2 are replaced with equivalent ones in dsk1 before merging.
Returns: new_dsk : dict
The merged dask.
key_map : dict
A mapping between the keys from dsk2 to their new names in new_dsk.
Examples
>>> from operator import add, mul >>> dsk1 = {'a': 1, 'b': (add, 'a', 10), 'c': (mul, 'b', 5)} >>> dsk2 = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)} >>> new_dsk, key_map = merge_sync(dsk1, dsk2) >>> new_dsk {'a': 1, 'b': (add, 'a', 10), 'c': (mul, 'b', 5), 'z': (mul, 'b', 2)} >>> key_map {'x': 'a', 'y': 'b', 'z': 'z'}
Conflicting names are replaced with auto-generated names upon merging.
>>> dsk1 = {'a': 1, 'res': (add, 'a', 1)} >>> dsk2 = {'x': 1, 'res': (add, 'x', 2)} >>> new_dsk, key_map = merge_sync(dsk1, dsk2) >>> new_dsk {'a': 1, 'res': (add, 'a', 1), 'merge_1': (add, 'a', 2)} >>> key_map {'x': 'a', 'res': 'merge_1'}
- dask.optimize.sync_keys(dsk1, dsk2)¶
Return a dict matching keys in dsk2 to equivalent keys in dsk1.
Parameters: dsk1, dsk2 : dict Examples
>>> from operator import add, mul >>> dsk1 = {'a': 1, 'b': (add, 'a', 10), 'c': (mul, 'b', 5)} >>> dsk2 = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)} >>> sync_keys(dsk1, dsk2) {'x': 'a', 'y': 'b'}
- dask.rewrite.RewriteRule(lhs, rhs, vars=())¶
A rewrite rule.
Expresses lhs -> rhs, for variables vars.
Parameters: lhs : task
The left-hand-side of the rewrite rule.
rhs : task or function
The right-hand-side of the rewrite rule. If it’s a task, variables in rhs will be replaced by terms in the subject that match the variables in lhs. If it’s a function, the function will be called with a dict of such matches.
vars: tuple, optional
Tuple of variables found in the lhs. Variables can be represented as any hashable object; a good convention is to use strings. If there are no variables, this can be omitted.
- dask.rewrite.RuleSet(*rules)¶
A set of rewrite rules.
Forms a structure for fast rewriting over a set of rewrite rules. This allows for syntactic matching of terms to patterns for many patterns at the same time.
Examples
>>> def f(*args): pass >>> def g(*args): pass >>> def h(*args): pass >>> from operator import add
>>> rs = RuleSet( # Make RuleSet with two Rules ... RewriteRule((add, 'x', 0), 'x', ('x',)), ... RewriteRule((f, (g, 'x'), 'y'), ... (h, 'x', 'y'), ... ('x', 'y')))
>>> rs.rewrite((add, 2, 0)) # Apply ruleset to single task 2
>>> rs.rewrite((f, (g, 'a', 3))) (h, 'a', 3)
>>> dsk = {'a': (add, 2, 0), # Apply ruleset to full dask graph ... 'b': (f, (g, 'a', 3))}
>>> from toolz import valmap >>> valmap(rs.rewrite, dsk) {'a': 2, 'b': (h, 'a', 3)}
Attributes
rules (list) A list of RewriteRule`s included in the `RuleSet.
Scheduling:
Schedulers execute task graphs. After a collection produces a graph we execute this graph in parallel, either using all of the cores on a single workstation or using a distributed cluster.
Distributed Scheduling¶
Dask.distributed has not been battle tested. It is for experimental use only
Dask includes a scheduler for distributed computation in dask.distributed. This executes dask graphs in parallel on multiple nodes. A centralized scheduler manages distributed workers communicating tasks and results over ZeroMQ sockets. Task management is centralized in a single scheduler but peers communicate data between each other in a distributed fashion.

Pieces¶
Dask.distributed consists of three pieces
- One centralized scheduler
- Many distributed workers
- Potentially many clients
The scheduler manages workers to execute dask graphs. The workers communicate to each other to share intermediate results. The clients communicate to the scheduler periodically when they deliver full dask graphs for scheduling. The clients do not talk directly to the workers.
Setup¶
The scheduler and workers should be close-by in order to minimize scheduling overhead from network latency. The client is generally on the users’ personal machines and does not need to be as close to the worker nodes.
Scheduler¶
When we build a scheduler we may specify two addresses, one to be given to the workers and one for the clients.
from dask.distributed import Scheduler
s = Scheduler(hostname='scheduler-hostname',
port_to_workers=4444,
port_to_clients=5555)
Alternatively ZeroMQ can choose these for you:
>>> s = Scheduler()
>>> s.address_to_workers
'tcp://scheduler-hostname:4444'
>>> s.address_to_clients
'tcp://scheduler-hostname:5555'
Schedulers start listening for communications once they’re started up. You can close down a scheduler with the close method:
s.close()
Workers¶
Workers must be given the address of the scheduler as well as a MutableMapping to store data. The mutable mapping can be a dict (default) or something more sophisticated like an on-disk chest You can optionally select an address for the worker:
from dask.distributed import Worker
w = Worker(scheduler='tcp://scheduler-hostname:4444',
hostname='worker-hostname', port_to_workers=1234,
data={})
Alternatively we can create a dict and address for you:
w = Worker(scheduler='tcp://scheduler-hostname:4444')
Workers register themselves with the scheduler once they start up and no further configuration is necessary. You may create new workers at any time, including before the scheduler is created as long as you coordinate the correct address.
You can shut down a worker with the close method:
w.close()
You will want to start up several workers.
Client¶
Clients, like workers, must know the address of the scheduler. Note that clients and workers connect to different addresses.
from dask.distributed import Client
c = Client('tcp://scheduler-hostname:5555')
Clients provide a get method to request the computation of a dask graph
>>> from operator import add
>>> dsk = {'x': 1, 'y': (add, 'x', 2)}
>>> c.get(dsk, 'y') # causes distributed work
3
Multiple clients can connect to the same scheduler.
Screencast¶
This screencast demonstrates how to set up and connect dask.distributed Schedulers, Clients, and Workers.
Store Collections¶
A Client can store a dask graph on the Scheduler for future use by others.
import dask.bag as db
b = db.from_sequence(range(5)).map(lambda x: x + 1)
from dask.distributed import Client
c = Client('tcp://scheduler-hostname:5555')
c.set_collection('mybag', b)
Other clients on different machines can retrieve this collection:
from dask.distributed import Client
c = Client('tcp://scheduler-hostname:5555')
b = c.get_collection('mybag')
This only stores the dask graph and not any underlying data that this graph might open. Usually these graphs are small and easy to pass around.
IPython.parallel¶
Users familiar with IPython.parallel can use an IPython.parallel.Client object, connected to a running ipcluster to bootstrap a dask distributed cluster.
# Setup your IPython cluster...
# Create a client.
from IPython.parallel import Client
ipclient = Client()
# Now use IPython parallel to set up dask.distributed
from dask.distributed import dask_client_from_ipclient
dclient = dask_client_from_ipclient(ipclient)
# Dask Client.get method computes dask graphs on the cluster.
dclient.get({'a': 41, 'b': (lambda x: x + 1, 'a')}, 'b')
More info about setting up an IPython cluster can be found at the IPython docs.
Known Limitations¶
- The distributed scheduler is new and buggy
- It is not fault tolerant. The failure of any worker is likely to crash the system.
- It assumes that workers can see each other over the network
- It does not fail gracefully in case of errors
- It does not think about data locality. Linear chains avoid this limitation by fusing into a single task beforehand but tasks with multiple inputs will run on whatever worker is available first and not necessarily on a worker that already has local data.
- It does not integrate natively with data-local file systems like HDFS
- It is a dynamic scheduler and will likely never reach the performance of hand-tuned MPI codes for HPC workloads
Other
Install Dask¶
You can install dask using conda:
conda install dask
You can install dask using pip:
pip install dask
Dask also ships with Anaconda so you may have it already.
Install parts of dask¶
Different components of dask have different dependencies that are only relevant for that component.
- dask.array: numpy
- dask.bag: dill
- dask.dataframe: pandas, bcolz (in development)
- dask.distributed: pyzmq (in development)
The base pip install of dask is fairly minimal. This is to protect lightweight dask.bag users from having to install heavyweight dependencies like bcolz or pandas. You can either install these dependencies separately or, when installing with pip you can specify which set of dependencies you would like as a parameter:
pip install dask[array]
pip install dask[bag]
pip install dask[dataframe]
pip install dask[complete]
How to inspect dask objects¶
Dask itself is just a specification on top of normal Python dictionaries. Objects like dask.Array are just a thin wrapper around these ``dict``s with a little bit of shape metadata.
Users should only have to interact with the higher-level Array objects. Developers may want to dive more deeply into the dictionaries/task graphs themselves
dask attribute¶
The first step is to look at the .dask attribute of an array
>>> import dask.array as da
>>> x = da.ones((5, 15), chunks=(5, 5))
>>> x.dask
{('wrapped_1', 0, 0): (ones, (5, 5)),
('wrapped_1', 0, 1): (ones, (5, 5)),
('wrapped_1', 0, 2): (ones, (5, 5))}
This attribute becomes more interesting as you perform operations on your Array objects
>>> (x + 1).dask
{('wrapped_1', 0, 0): (ones, (5, 5)),
('wrapped_1', 0, 1): (ones, (5, 5)),
('wrapped_1', 0, 2): (ones, (5, 5))
('x_1', 0, 0): (add, ('wrapped_1', 0, 0), 1),
('x_1', 0, 1): (add, ('wrapped_1', 0, 1), 1),
('x_1', 0, 2): (add, ('wrapped_1', 0, 2), 1)}
Visualize graphs with DOT¶

If you have basic graphviz tools like dot installed then dask can also generate visual graphs from your task graphs.
>>> d = (x + 1).dask
>>> from dask.dot import dot_graph
>>> dot_graph(d)
Writing graph to mydask.pdf
The result is shown to the right.
Diagnostics¶
Profiling parallel code can be tricky. dask.diagnostics provides functionality to aid in profiling and inspecting dask graph execution.
Scheduler Callbacks¶
Schedulers based on dask.async.get_async (currently dask.async.get_sync, dask.threaded.get, and dask.multiprocessing.get) accept four callbacks, allowing for inspection of dask execution. The callbacks are:
start(dask, state)
Run at the beginning of execution, right after the state is initialized. Receives the dask and the scheduler state.
pretask(key, dask, state)
Run every time a new task is started. Receives the key of the task to be run, the dask, and the scheduler state.
posttask(key, dask, state, id)
Run every time a task is finished. Receives the key of the task to be run, the dask, the scheduler state, and the id of the worker that ran the task.
finish(dask, state, errored)
Run at the end of execution, right before the result is returned. Receives the dask, the scheduler state, and a boolean indicating whether the exit was due to an error or not.
These are internally represented as tuples of length 4, stored in the order presented above. Callbacks for common use cases are provided in dask.diagnostics.
Profiler¶
The Profiler class builds on the scheduler callbacks described above to profile dask execution at the task level. This can be used as a contextmanager around calls to get or compute to profile the computation.
>>> from dask.diagnostics import Profiler
>>> import dask.array as da
>>> a = da.random.random(size=(10000,1000), chunks=(1000,1000))
>>> q, r = da.linalg.qr(a)
>>> a2 = q.dot(r)
>>> with Profiler() as prof:
... out = a2.compute()
During execution the profiler records the following information for each task:
- Key
- Task
- Start time in seconds since the epoch
- Finish time in seconds since the epoch
- Worker id
These results can then be accessed by the results method. This returns a list of namedtuple objects containing the data for each task.
>>> prof.results[0]
TaskData(key=('tsqr_1_QR_st1', 9, 0),
task=(qr, (_apply_random, 'random_sample', 1730327976, (1000, 1000), (), {})),
start_time=1435613641.833878,
end_time=1435613642.336109,
worker_id=4367847424)
These can be analyzed separately, or viewed in a bokeh plot using the provided visualize method.
>>> prof.visualize()
Progress Bar¶
The ProgressBar class displays a progress bar in the terminal or notebook during computation. This can be nice feedback during long running graph execution.
As with Profiler, this can be used as a contextmanager around calls to compute.
>>> from dask.diagnostics import ProgressBar
>>> a = da.random.normal(size=(10000, 10000), chunks=(1000, 1000))
>>> res = a.dot(a.T).mean(axis=0)
>>> with ProgressBar()
... out = res.compute()
[########################################] | 100% Completed | 17.1 s
Note that multiple diagnostic tools can be used concurrently by using multiple context managers:
>>> with ProgressBar(), Profiler() as prof:
... out = res.compute()
[########################################] | 100% Completed | 17.1 s
>>> prof.visualize()
Custom Callbacks¶
Custom diagnostics can be created using the callback mechanism described above. To add your own, it’s recommended to subclass the Callback class, and define your own methods. Below we create a class that prints the name of every key as it’s computed.
from dask.callbacks import Callback
class PrintKeys(Callback):
def _pretask(self, key, dask, state):
"""Print the key of every task as it's started"""
print("Computing: {0}!".format(repr(key)))
This can now be used as a contextmanager during computation:
>>> from operator import add, mul
>>> dsk = {'a': (add, 1, 2), 'b': (add, 3, 'a'), 'c': (mul, 'a', 'b')}
>>> with PrintKeys():
... get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!
Alternatively, functions can be passed in as keyword arguments to Callback:
>>> def printkeys(key, dask, state):
... print("Computing: {0}!".format(repr(key)))
>>> with Callback(pretask=printkeys):
... get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!
Frequently Asked Questions¶
Q: How do I debug my program when using dask?
If you want to inspect the dask graph itself see inspect docs
If you want to dive down with a Python debugger a common cause of frustration is the asynchronous schedulers which, because they run your code on different workers, are unable to provide access to the Python debugger. Fortunately you can change to a synchronous scheduler like dask.get or dask.async.get_sync by providing a get= keyword to the compute method:
my_array.compute(get=dask.async.get_sync)
Both dask.async.get_sync and dask.get will provide traceback traversals. dask.async.get_sync uses the same machinery of the async schedulers but with only one worker. dask.get is dead-simple but does not cache data and so can be slow for some workloads.
Q: In ``dask.array`` what is ``chunks``?
Dask.array breaks your large array into lots of little pieces, each of which can fit in memory. chunks determines the size of those pieces.
Users most often interact with chunks when they create an array as in:
>>> x = da.from_array(dataset, chunks=(1000, 1000))
In this case chunks is a tuple defining the shape of each chunk of your array; e.g. “Please break dataset into 1000 by 1000 chunks.”
However internally dask uses a different representation (a tuple of tuples) to handle uneven chunk sizes that inevitably occur during computation.
Q: How do I select a good value for ``chunks``?
Choosing good values for chunks can strongly impact performance. Here are some general guidelines. The strongest guide is memory:
- The size of your blocks should fit in memory.
- Actually, several blocks should fit in memory at once (assuming you want multi-core)
- The size of the blocks should be large enough to hide scheduling overhead, which is a couple of milliseconds per task
- Generally I shoot for 10MB-100MB sized chunks
Additionally the computations you do may also inform your choice of chunks. Some operations like matrix multiply require anti-symmetric chunk shapes. Others like svd and qr only work on tall-and-skinny matrices with only a single chunk along all of the columns. Other operations might work but be faster or slower with different chunk shapes.
Note that you can rechunk() an array if necessary.
Q: My computation fills memory, how do I spill to disk?
The schedulers endeavor not to use up all of your memory. However for some algorithms filling up memory is unavoidable. In these cases we can swap out the dictionary used to store intermediate results with a dictionary-like object that spills to disk. The Chest project handles this nicely.
>>> cache = Chest() # Uses temporary file. Deletes on garbage collection
or
>>> cache = Chest(path='/path/to/dir', available_memory=8e9) # Use 8GB
This chest object works just like a normal dictionary but, when available memory runs out (defaults to 1GB) it starts pickling data and sending it to disk, retrieving it as necessary.
You can specify your cache when calling compute
>>> x.dot(x.T).compute(cache=cache)
Alternatively you can set your cache as a global option.
>>> with dask.set_options(cache=cache): # sets state within with block ... y = x.dot(x.T).compute()
or
>>> dask.set_options(cache=cache) # sets global state >>> y = x.dot(x.T).compute()
However, while using an on-disk cache is a great fallback performance is always best if we can keep from spilling to disk. You could try one of the following
- Use a smaller chunk/partition size
- If you are convinced that a smaller chunk size will not help in your case you could also report your problem on our issue tracker and work with the dask development team to improve our scheduling policies.
Comparison to PySpark¶
Spark is a popular distributed computing tool with a decent Python API PySpark. Spark is growing to become a dominant name today in Big Data analysis alongside Hadoop, for which MRJob is possibly the dominant Python layer.
Dask has several elements that appear to intersect this space and as such we often receive the following question:
How does Dask compare with Spark?
Answering such comparison questions in an unbiased and informed way is hard, particularly when the differences can be somewhat technical. This document tries to do this; we welcome any corrections.
Brief Answer¶
Apache Spark is more mature and better integrates with HDFS. It handles resiliency and was originally built to scale up to thousands of workers.
Dask is pip installable, doesn’t use the Java Virtual Machine (JVM), and was originally built to handle numeric workloads in a large single workstation very efficiently.
User-Facing Differences¶
Scale¶
Spark began its life aimed at the thousand node cluster case. As such it thinks well about worker failures and integration with data-local file systems like the Hadoop FileSystem (HDFS). That being said, Spark can run in standalone mode on a single machine.
Dask began its life building out parallel algorithms for numerical array computations on a single computer. As such it thinks well about low-latency scheduling, low memory footprints, shared memory, and efficient use of local disk. That being said dask can run on a distributed cluster.
Java Python Performance¶
Spark is written in Scala, a multi-paradigm language built on top of the Java Virtual Machine (JVM). Since the rise of Hadoop, Java based languages have steadily gained traction on data warehousing tasks and are good at managing large amounts of heterogeneous data such as you might find in JSON blobs. The Spark development team is now focusing more on binary and native data formats with their new effort, Tungsten.
Dask is written in Python, a multi-paradigm language built on top of the C/Fortran native stack. This stack benefits from decades of scientific research optimizing very fast computation on numeric data. As such, dask is already very good on analytic computations on data such as you might find in HDF5 files or analytic databases. It can also handle JSON blob type data using Python data structures (which are surprisingly fast) using the cytoolz library in parallel.
Java Python Disconnect¶
Python users on Spark sometimes express frustration by how far separated they are from computations. Some of this is inevitable; distributed debugging is a hard problem. Some of it however is due to having to hop over the JVM. Spark workers spin up JVMs which in turn spin up Python processes. Data moving back and forth makes extra trips both through a distributed cluster and also through extra serialization layers (see py4j) and computation layers. Limitations like the Java heap size and large Java stack traces come as a surprise to users accustomed to native code execution.
Dask has an advantage for Python users because it is itself a Python library, so serialization and debugging when things go wrong happens more smoothly.
However, dask only benefits Python users while Spark is useful in a variety of JVM languages (Scala, Java, Clojure) as well as limited support in Python and R. New Spark projects like the DataFrame skip serialization and boxed execution issues by forgoing the Python process entirely and instead have Python code drive native Scala code. APIs for these libraries tend to lag a bit behind their Scala counterparts.
Scope¶
Spark was originally built around the RDD, an unordered collection allowing repeats. Most spark add-ons were built on top of this construct, inheriting both its abilities and limitations.
Dask is built on a lower-level and more general construct of a generic task graph with arbitrary data dependencies. This allows more general computations to be built by users within the dask framework. This is probably the largest fundamental difference between the two projects. Dask gives up high-level understanding to allow users to express more complex parallel algorithms. This ended up being essential when writing complex projects like dask.array.
Developer-Facing Differences¶
Graph Granularity¶
Both Spark and Dask represent computations with directed acyclic graphs. These graphs however represent computations at very different granularities.
One operation on a Spark RDD might add a node like Map and Filter to the graph. These are high-level operations that convey meaning and will eventually be turned into many little tasks to execute on individual workers. This many-little-tasks state is only available internally to the Spark scheduler.
Dask graphs skip this high-level representation and go directly to the many-little-tasks stage. As such one map operation on a dask collection will immediately generate and add possibly thousands of tiny tasks to the dask graph.
This difference in the scale of the underlying graph has implications on the kinds of analysis and optimizations one can do and also on the generality that one exposes to users. Dask is unable to perform some optimizations that Spark can because Dask schedulers do not have a top-down picture of the computation they were asked to perform. However, dask is able to easily represent far more complex algorithms and expose the creation of these algorithms to normal users.
Dask.bag, the equivalent of the Spark.RDD, is just one abstraction built on top of dask. Others exist. Alternatively power-users can forego high-level collections entirely and jump straight to direct low-level task scheduling.
Coding Styles¶
Both Spark and Dask are written in a functional style. Spark will probably be more familiar to those who enjoy algebraic types while dask will probably be more familiar to those who enjoy Lisp and “code as data structures”.
Conclusion¶
If you have petabytes of JSON files, a simple workflow, and a thousand node cluster then you should probably use Spark. If you have 10s-1000s of gigabytes of binary or numeric data, complex algorithms, and a large multi-core workstation then you should probably use dask.
If you have a terabyte or less of CSV or JSON data then you should forget both Spark and Dask and use Postgres or MongoDB.
Contact
- For user questions please tag StackOverflow questions with the #dask tag.
- For bug reports and feature requests please use the GitHub issue tracker
- For community discussion please use blaze-dev@continuum.io
Dask is part of the Blaze project supported by Continuum Analytics