Non-blocking IO Streams

The utility module fussy.nbio provides a basic mechanism to stream data through a series of processes.

The nbio module allows you to fairly easily create (potentially long) chains of processes which can all be run in parallel. The pipe internally uses non-blocking IO and generators to allow it to prevent buffer full deadlocks.

>>> from fussy.nbio import Process as P
>>> import requests
>>> response = requests.get( 'http://www.vrplumber.com' )
>>> pipe = response.iter_content | P( 'gzip -c' )
>>> result = pipe()
>>> assert result.startswith( '\x1f\x8b' )

Create a process and read output:

>>> from fussy.nbio import Process as P
>>> pipe = P( 'echo "Hello world"' )
>>> pipe()
'Hello world\n'

Pipe the output through another process:

>>> from fussy.nbio import Process as P
>>> pipe = P( 'echo "Hello world"' ) | P( 'cat' )
>>> pipe()
'Hello world\n'

Pipe a generator into a pipe:

>>> from fussy.nbio import Process as P
>>> def gen():
...     for i in range( 20 ):
...         yield str(i)
...
>>> pipe = gen() | P( 'cat' )
>>> pipe()
'012345678910111213141516171819'

Pipe a pipe into a function (note: the output also gets added to the pipe’s output):

>>> from fussy.nbio import Process as P
>>> result = []
>>> pipe = P( 'echo "Hello world"' ) | result.append
>>> pipe()
'Hello world\n'
>>> result
['Hello world\n']

Pipe a pipe to our current stdout:

>>> from fussy.nbio import Process as P
>>> pipe = P( 'echo "Hello world"' ) | '-'
>>> pipe()
Hello world
''

Module: fussy.nbio

Wraps subprocess with pipe semantics and generator based multiplexing

pipe = open( somefile, ‘rb’ ) | nbio.Process( [‘grep’,’blue’] ) | nbio.Process( [‘wc’,’-l’])

exception fussy.nbio.NBIOError[source]

Base class for nbio errors

__weakref__

list of weak references to the object (if defined)

class fussy.nbio.Pipe(*processes)[source]

Pipe of N processes which all need to process data in parallel

__call__(pause_on_silence=0.01)[source]

Iterate over this pipeline, returning combined results as a string

__getitem__(index)[source]

Retrieve a particular item in the pipe

__gt__(other)[source]

Pipe our output into a file

__iter__()[source]

Iterate over the processes in the pipe

If the stdout/stderr of the processes is not captured, then we will yield the results in whatever chunk-size is yielded from the individual processes.

If all of the processes yield DID_NOTHING in a particular cycle, then the pipe will do a pause() for self.pause_on_silence (normally passed into the __call__) before the next iteration.

__len__()[source]

Return the number of items in this pipe

__lt__(other)[source]

Pipe input from a named file

__or__(other)[source]

Pipe our output into a process, callable or list

__ror__(other)[source]

Pipe output of other into our first item

__weakref__

list of weak references to the object (if defined)

append(process)[source]

Add the given PipeComponent to this pipe (note: does not connect stdin/stdout)

first[source]

Retrieves the first item in the pipe

get_component(other)[source]

Given a python object other, create a PipeComponent for it

The purpose of this method is to allow for fairly “natural” descriptions of tasks. You can pipe to or from files, to or from the string ‘-‘ (stdin/stdout), to the string ‘’ (collect stdout), or from a regular string (which is treated as input). You can pipe iterables into a pipe, you can pipe the result of pipes into callables.

last[source]

Retrieves the last item in the pipe

prepend(process)[source]

Add the given PipeComponent to this pipe (note: does not connect stdin/stdout)

class fussy.nbio.Process(command, stderr=False, stdout=True, stdin=True, **named)[source]

A particular process in a Pipe

Processes are the most common entry point when using nbio, you create processes and pipe data into or out of them as appropriate to create Pipes.

Under the covers the Process runs subprocess.Popen, and it accepts most of the fields subprocess.Popen does. By default it captures stdout and pipes data into stdin. If nothing is connected to stdin then stdin is closed on the first iteration of the pipe. If nothing is connected to stdout or stderr (if stderr is captured) then the results will be returned to the caller joined together with ‘’

The implication is that if you do not want to store all of the results in RAM, you need to “sink” the results into a process or file, or not capture the results (pass False for stdout or stderr).

__call__(*args, **named)[source]

Create a Pipe and run it with just this item as its children

__gt__(other)[source]

Pipe our output into a filename

__init__(command, stderr=False, stdout=True, stdin=True, **named)[source]

Initialize the Process

command – subprocess.Popen command string or list
if a string, and “shell” is not explicitly set, then will set “shell=True”

stdin – whether to provide stdin writing

stdout – whether to capture stdout

stderr – whether to capture stderr, if -1, then combine stdout and stderr

good_exit – if provided, iterable which provides the set of good exit codes
which will not raise ProcessError when encountered
by_line – if provided, will cause the output to be line-buffered so that
only full lines will be reported, the ‘n’ character will be used to split the output, so there will be no ‘n’ character at the end of each line.

named – passed to the subprocess.Popen() command

__iter__()[source]

Iterate over the results of the process (normally done by the Pipe)

__lt__(other)[source]

Pipe our input from a filename

__or__(other)[source]

Pipe our output into a process, callable or list

pipe = Pipe( Process( ‘cat test.txt’ ) | Process( ‘grep blue’ ) | [] ) pipe()

__ror__(other)[source]

Pipe other into self

check_exit()[source]

Check our exit code

iter_read()[source]

Create the thing which iterates our read operation

iter_write(source)[source]

Create a thing which will read from source and write to us

kill()[source]

Kill our underlying subprocess.Popen

start_pipe(stdin, stdout, stderr, **named)[source]

Start the captive process (internal operation)

exception fussy.nbio.ProcessError[source]

Called process returned an error code

Attributes:

process – the Process (if applicable) which raised the error
fussy.nbio.by_line(iterable)[source]

Buffer iterable yielding individual lines

fussy.nbio.close(fh)[source]

Close the file/socket/closable thing

fussy.nbio.fileno(fh)[source]

Determine the fileno for the file-like thing

fussy.nbio.pause(duration)[source]

Allow tests to override sleeping using globalsub

fussy.nbio.reader(fh, blocksize=4096)[source]

Produce content blocks from fh without blocking

fussy.nbio.writeiter(iterator, fh)[source]

Write content from iterator to fh

To write a file from a read file:

writeiter(
    reader( open( filename )),
    fh 
)

To write a request.response object into a tar pipe iteratively:

writeiter( 
    response.iter_content( 4096, decode_unicode=False ),
    pipe 
)
fussy.nbio.writer(content, fh, encoding='utf8')[source]

Continue writing content (string) to fh until content is consumed

Used by writeiter to writing individual bits of content to the fh

Project Versions

Table Of Contents

Previous topic

Contributing to Fussy

Next topic

Tree Linking

This Page