The utility module fussy.nbio provides a basic mechanism to stream data through a series of processes.
The nbio module allows you to fairly easily create (potentially long) chains of processes which can all be run in parallel. The pipe internally uses non-blocking IO and generators to allow it to prevent buffer full deadlocks.
>>> from fussy.nbio import Process as P
>>> import requests
>>> response = requests.get( 'http://www.vrplumber.com' )
>>> pipe = response.iter_content | P( 'gzip -c' )
>>> result = pipe()
>>> assert result.startswith( '\x1f\x8b' )
Create a process and read output:
>>> from fussy.nbio import Process as P
>>> pipe = P( 'echo "Hello world"' )
>>> pipe()
'Hello world\n'
Pipe the output through another process:
>>> from fussy.nbio import Process as P
>>> pipe = P( 'echo "Hello world"' ) | P( 'cat' )
>>> pipe()
'Hello world\n'
Pipe a generator into a pipe:
>>> from fussy.nbio import Process as P
>>> def gen():
... for i in range( 20 ):
... yield str(i)
...
>>> pipe = gen() | P( 'cat' )
>>> pipe()
'012345678910111213141516171819'
Pipe a pipe into a function (note: the output also gets added to the pipe’s output):
>>> from fussy.nbio import Process as P
>>> result = []
>>> pipe = P( 'echo "Hello world"' ) | result.append
>>> pipe()
'Hello world\n'
>>> result
['Hello world\n']
Pipe a pipe to our current stdout:
>>> from fussy.nbio import Process as P
>>> pipe = P( 'echo "Hello world"' ) | '-'
>>> pipe()
Hello world
''
Wraps subprocess with pipe semantics and generator based multiplexing
pipe = open( somefile, ‘rb’ ) | nbio.Process( [‘grep’,’blue’] ) | nbio.Process( [‘wc’,’-l’])
Base class for nbio errors
list of weak references to the object (if defined)
Pipe of N processes which all need to process data in parallel
Iterate over this pipeline, returning combined results as a string
Iterate over the processes in the pipe
If the stdout/stderr of the processes is not captured, then we will yield the results in whatever chunk-size is yielded from the individual processes.
If all of the processes yield DID_NOTHING in a particular cycle, then the pipe will do a pause() for self.pause_on_silence (normally passed into the __call__) before the next iteration.
list of weak references to the object (if defined)
Add the given PipeComponent to this pipe (note: does not connect stdin/stdout)
Given a python object other, create a PipeComponent for it
The purpose of this method is to allow for fairly “natural” descriptions of tasks. You can pipe to or from files, to or from the string ‘-‘ (stdin/stdout), to the string ‘’ (collect stdout), or from a regular string (which is treated as input). You can pipe iterables into a pipe, you can pipe the result of pipes into callables.
A particular process in a Pipe
Processes are the most common entry point when using nbio, you create processes and pipe data into or out of them as appropriate to create Pipes.
Under the covers the Process runs subprocess.Popen, and it accepts most of the fields subprocess.Popen does. By default it captures stdout and pipes data into stdin. If nothing is connected to stdin then stdin is closed on the first iteration of the pipe. If nothing is connected to stdout or stderr (if stderr is captured) then the results will be returned to the caller joined together with ‘’
The implication is that if you do not want to store all of the results in RAM, you need to “sink” the results into a process or file, or not capture the results (pass False for stdout or stderr).
Initialize the Process
stdin – whether to provide stdin writing
stdout – whether to capture stdout
stderr – whether to capture stderr, if -1, then combine stdout and stderr
named – passed to the subprocess.Popen() command
Called process returned an error code
Attributes:
process – the Process (if applicable) which raised the error