
Welcome!¶
Welcome to Pipecat … elegant, flexible data logging in Python for connected sensors and instruments. Use Pipecat to log data from battery chargers, GPS, automobiles, gyros, weather, and more!
Documentation¶

Installation¶
Using a Package Manager¶
A package manager (conda, apt, yum, MacPorts, etc) should generally be your first stop for installing Pipecat - it will make it easy to install Pipecat and its dependencies, keep them up-to-date, and even (gasp!) uninstall them cleanly. If your package manager doesn’t support Pipecat yet, drop them a line and let them know you’d like them to add it!
If you’re new to Python or unsure where to start, we strongly recommend taking a look at Anaconda, which the Pipecat developers use during their day-to-day work.
Using Pip / Easy Install¶
If your package manager doesn’t support Pipecat, or doesn’t have the latest version, your next option should be Python setup tools like pip. You can always install the latest stable version of Pipecat and its required dependencies using:
$ pip install pipecat
… following that, you’ll be able to use all of Pipecat’s features.
From Source¶
Finally, if you want to work with the latest, bleeding-edge Pipecat goodness, you can install it using the source code:
$ git clone https://github.com/shead-custom-design/pipecat
$ cd pipecat
$ sudo python setup.py install
The setup script installs Pipecat’s required dependencies and copies Pipecat into your Python site-packages directory, ready to go.

Dependencies¶
Minimum Requirements¶
To use Pipecat you will need, at a minimum, Python 2 or 3 (duh):
- Python 2.7 / Python 3 - http://python.org
plus the following (if you install Pipecat using pip, these are automatically installed for you):
- arrow - Better dates and times for Python - http://arrow.readthedocs.io
- numpy >= 1.8.0 - http://numpy.org
- Pint - makes units easy - https://pint.readthedocs.io
- six - http://pythonhosted.org/six
Source Installation¶
If you’re installing Pipecat from source, you’ll need setuptools to run the Pipecat setup.py script:
- setuptools - https://setuptools.readthedocs.io
Regression Testing¶
The following are required to run Pipecat’s regression tests and view code coverage:
- behave - BDD test framework - http://pythonhosted.org/behave
- coverage - code coverage module - http://nedbatchelder.com/code/coverage
- mock - mocking and testing library - http://www.voidspace.org.uk/python/mock
- nose - unit test framework - https://nose.readthedocs.io/en/latest
Generating Documentation¶
And you’ll need to following to generate this documentation:
- Sphinx - documentation builder - http://sphinx-doc.org
- Sphinx readthedocs theme - https://github.com/snide/sphinx_rtd_theme
- napoleon - http://sphinxcontrib-napoleon.readthedocs.io/en/latest/
- Jupyter - http://ipython.org
- Pandoc - http://johnmacfarlane.net/pandoc

Compatibility¶
A quick disclaimer on backwards-compatibility for Pipecat users:
Pipecat follows the Semantic Versioning standard for assigning version numbers in a way that has specific meaning. As of this writing Pipecat releases are still in the 0.y.z development phase, which means (among other things) that the API may change at any time. We try not to be abusive about it, but you should be prepared for the occasional bump on the road to the 1.0 release.

User Guide¶
Contents:

Design¶
Records¶
Components in Pipecat manipulate records, which are plain Python dicts - record generators (such as hardware devices) produce records as output, record filters consume records as input and produce modified records as output, while record consumers (such as functions that store data) only take records as input. A record could represent a location reported by the GPS in your phone, an OBD-II reading from your car’s diagnostic computer, or the state of a battery reported by a battery charger. Because a record is a dict, it can contain however many key-value pairs are appropriate for a given device, and you can manipulate that data easily.
Record Keys¶
For interoperability, Pipecat requires (with one exception) that record keys be strings. Using strings as keys makes records easy to understand, manipulate, and serialize, while allowing great flexibility in the choice of keys for devices. Pipecat intentionally does not impose any naming scheme on record keys, although we plan to evolve consistent sets of well-defined keys for specific device classes as we go. Our desire is to encourage developers of new Pipecat components to think broadly about the type of data they store in records. The one exception to the permissive “strings-as-keys” approach is that Pipecat encourages developers to use “hierarchical” keys where appropriate, and requires that hierarchical keys be represented using tuples-of-strings. For example, a battery charger with both internal and external temperature sensors should use (“temperature”, “internal”) as a key instead of some other private naming scheme such as “temperature-internal”, “temperature/internal”, “temperature|internal”, etc. Representing hierarchies in this way makes them explicit, avoids a proliferation of private naming schemes for hierarchical data, and allows downstream components to manipulate hierarchical keys in a consistent way.
Record Values¶
Any type can be used as a value in a Pipecat record, and the goal again is to avoid needlessly constraining the ingenuity of Pipecat developers. There are just two caveats:
- We strongly encourage the use of Arrow objects to store timestamp data. Arrow is already a required dependency of Pipecat, so it’s guaranteed to be available.
- We strongly encourage the use of explicit physical units wherever possible. Pipecat provides builtin quantity and unit support from Pint to make this easy. Again, Pint is a required Pipecat dependency, so it’s guaranteed available for Pipecat developers.
Record Generators¶
Record generators are any iterable expression (function or object) that produces records. An iterable expression is any expression that can be used with the Python for statement, and you use for loops to read records from generators:
generator = pipecat.device.clock.metronome()
for record in generator:
pipecat.record.dump(record)
Some examples of record generators include:
pipecat.device.clock.metronome()
, which returns an empty record at fixed time intervals.pipecat.http.get()
, which retrieves data from an HTTP server at fixed time intervals.pipecat.utility.readline()
, which returns a record for each line in a file or file-like object.pipecat.udp.receive()
, which returns a record for each message received on a listening UDP port.
Record Consumers¶
Record consumers are any function or object that consumes records, i.e. iterates over the results of a record generator, returning nothing. Some examples of record consumers include:
pipecat.queue.send()
, which places records consumed from a generator sequentially into aqueue.Queue
.
Record Filters¶
Record filters are any function or object that both consumes and generates records. Most of the components provided by Pipecat fall into this category. Examples include:
pipecat.utility.add_timestamp()
, which adds a timestamp field to the records it receives.pipecat.device.charger.icharger208b()
, which takes records containing strings and converts them into records containing battery charging information.pipecat.device.gps.nmea()
, which takes records containing strings and converts them into records containing navigational information.pipecat.limit.duration()
, which passes records without modification until a fixed time interval has expired.
Pipes¶
When all is said and done, you use Pipecat by hooking-together components to create pipes that retrieve, process, and store records as part of some larger task. In the following example, we retrieve data from a battery charger connected via a serial port, and print it to the console:
pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(pipe)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
pipecat.record.dump(record)
If we want to save the records to a CSV file, we simply add an additional component to the pipe:
pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(pipe)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.store.csv.write(pipe, "battery.csv")
for record in pipe:
pipecat.record.dump(record)
Note from this example how we use a single variable to keep track of the “output” end of the pipe, passing it as the “input” to each component that we connect. Of course, nothing requires that you re-use a variable in this way, but we find that this style avoids a proliferation of otherwise unused symbols and makes reordering, adding and subtracting components in a pipe much easier. For example, it’s easy to comment-out the component we just added without affecting any downstream code:
pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(pipe)
pipe = pipecat.device.charger.icharger208b(pipe)
#pipe = pipecat.store.csv.write(pipe, "battery.csv")
for record in pipe:
pipecat.record.dump(record)
Similarly, you can easily insert and reorder components without having to worry about renaming variables. Here, we add a component to timestamp the battery charger records, and another component to automatically stop iteration after five seconds of inactivity:
pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(pipe)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.timeout(pipe, timeout=pipecat.quantity(5, pipecat.units.seconds))
pipe = pipecat.store.csv.write(pipe, "battery.csv")
for record in pipe:
pipecat.record.dump(record)

Battery Chargers¶
Many modern “smart” battery chargers have data logging capabilities that you can use with Pipecat to view the state of a battery during charging. Typically, these chargers connect to your computer via a serial port or a serial-over-USB cable that acts like a traditional serial port. Data is then sent to the port during charging. For example, data from an iCharger 208B connected to a Mac computer using a USB cable could be read using the following:
import serial
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
for line in port:
print(line)
$1;1;;12250;3874;93;0;0;0;0;0;0;0;0;291;236;0;20
$1;1;;12250;3875;92;0;0;0;0;0;0;0;0;294;236;0;17
$1;1;;12250;3877;89;0;0;0;0;0;0;0;0;291;236;1;29
Here, we used the pySerial (http://pyserial.readthedocs.io) library to open a serial port and read data from the charger, which sends data to the port one line at a time. Note that the device name for the port - “/dev/cu.SLAB_USBtoUART” in this case - will vary depending on your device and operating system. For example, you might use “COM1” or “COM2” on Windows, or “/dev/ttyS0” on Linux.
Our first step in using Pipecat to decipher the raw device output is to turn the for-loop from the previous example into a pipe:
import pipecat.utility
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
for record in pipe:
print(record)
{'string': u'$1;1;;12250;3874;93;0;0;0;0;0;0;0;0;291;236;0;20rn'} {'string': u'$1;1;;12250;3875;92;0;0;0;0;0;0;0;0;294;236;0;17rn'} {'string': u'$1;1;;12250;3877;89;0;0;0;0;0;0;0;0;291;236;1;29rn'}
In this case, pipecat.utility.readline()
converts the raw data
into Records that store each line of data for further
processing. To decode the contents of each line, we add the appropriate
Pipecat device to the end of the pipe:
import pipecat.device.charger
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
print(record)
{('battery', 'cell4', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'temperature', 'external'): <Quantity(23.6, 'degC')>, ('battery', 'cell5', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell8', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'mode'): 'charge', ('charger', 'temperature', 'internal'): <Quantity(29.1, 'degC')>, ('battery', 'cell2', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'voltage'): <Quantity(3.874, 'volt')>, ('battery', 'cell6', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell1', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'charge'): <Quantity(0.0, 'hour * milliampere')>, ('battery', 'cell3', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell7', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'current'): <Quantity(930.0, 'milliampere')>, ('charger', 'supply'): <Quantity(12.25, 'volt')>}
{('battery', 'cell4', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'temperature', 'external'): <Quantity(23.6, 'degC')>, ('battery', 'cell5', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell8', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'mode'): 'charge', ('charger', 'temperature', 'internal'): <Quantity(29.4, 'degC')>, ('battery', 'cell2', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'voltage'): <Quantity(3.875, 'volt')>, ('battery', 'cell6', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell1', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'charge'): <Quantity(0.0, 'hour * milliampere')>, ('battery', 'cell3', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell7', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'current'): <Quantity(920.0, 'milliampere')>, ('charger', 'supply'): <Quantity(12.25, 'volt')>}
{('battery', 'cell4', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'temperature', 'external'): <Quantity(23.6, 'degC')>, ('battery', 'cell5', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell8', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'mode'): 'charge', ('charger', 'temperature', 'internal'): <Quantity(29.1, 'degC')>, ('battery', 'cell2', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'voltage'): <Quantity(3.877, 'volt')>, ('battery', 'cell6', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell1', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'charge'): <Quantity(1.0, 'hour * milliampere')>, ('battery', 'cell3', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell7', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'current'): <Quantity(890.0, 'milliampere')>, ('charger', 'supply'): <Quantity(12.25, 'volt')>}
As you can see, pipecat.device.charger.icharger208b()
has
converted the raw data records into charger-specific records containing
human-readable fields whose values have appropriate physical units.
Let’s use pipecat.record.dump()
to make the output a little more
readable:
import pipecat.record
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
pipecat.record.dump(record)
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 1.0 hour * milliampere
battery/current: 890.0 milliampere
battery/voltage: 3.877 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC
Now, you can extract just the data you care about from a record:
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
print(record[("battery", "voltage")], record[("battery", "current")])
3.874 volt 930.0 milliampere
3.875 volt 920.0 milliampere
3.877 volt 890.0 milliampere
And you can convert units safely and explicitly:
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
print(record[("battery", "current")].to(pipecat.units.amps))
0.93 ampere
0.92 ampere
0.89 ampere
As an aside, you may be wondering at this point why it’s necessary to
explicitly create the serial port and connect it to readline
… why
not code that functionality directly into icharger208b
? The answer
is flexibility: by separating Pipecat’s functionality into discrete,
well-defined components, those components can be easily combined in new
and unexpected ways. For example, you could use icharger208b
with a
charger that communicated over a network socket instead of a serial
port. Or you could “replay” data from a charger stored in a file:
fobj = open("../data/icharger208b-charging-sample")
pipe = pipecat.utility.readline(fobj)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
print(record[("battery", "charge")])
0.0 hour * milliampere
0.0 hour * milliampere
1.0 hour * milliampere
1.0 hour * milliampere
2.0 hour * milliampere
Let’s explore other things we can do with our pipe. To begin, you might want to add additional metadata to the records returned from a device. For example, you might want to append timestamps:
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
for record in pipe:
pipecat.record.dump(record)
timestamp: 2017-12-27T00:21:35.210018+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC
timestamp: 2017-12-27T00:21:37.215579+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC
timestamp: 2017-12-27T00:21:39.219548+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 1.0 hour * milliampere
battery/current: 890.0 milliampere
battery/voltage: 3.877 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC
Note that pipecat.utility.add_timestamp()
has added a
timestamp
field to each record. Timestamps in Pipecat are always
recorded using UTC (universal) time, so you will likely want to convert
them to your local timezone before formatting them for display:
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
for record in pipe:
print(
record["timestamp"].to("local").format("YYYY-MM-DD hh:mm:ss a"),
record[("battery", "voltage")],
)
2017-12-26 05:21:41 pm 3.874 volt
2017-12-26 05:21:43 pm 3.875 volt
2017-12-26 05:21:45 pm 3.877 volt
You could also use pipecat.utility.add_field()
to append your
own custom field to every record that passes through the pipe.
Now let’s consider calculating some simple statistics, such as the
minimum and maximum battery voltage while charging. When we iterate over
the contents of a pipe using a for
loop, we receive one record
at-a-time until the pipe is empty. We could keep track of a “running”
minimum and maximum during iteration, and there are use-cases where that
is the best way to solve the problem. However, for moderately-sized
data, Pipecat provides a more convenient approach:
import pipecat.store
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.store.cache(pipe)
for record in pipe:
pass
print(len(pipe.table))
print(pipe.table[("battery", "voltage")])
1390
[ 3.874 3.875 3.877 ..., 4.119 4.119 4.119] volt
Here, pipecat.store.cache()
creates an in-memory cache that
stores every record it receives. We have a do-nothing for
loop that
reads data from the charger to populate the cache. Once that’s complete,
we can use the cache table
attribute to retrieve data from the cache
using the same keys and syntax we would use with a record. Unlike a
record, the cache returns every value for a given key at once (using a
Numpy array), which makes it easy to compute the statistics we’re
interested in:
import pipecat.store
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.store.cache(pipe)
for record in pipe:
pass
print("Min:", pipe.table[("battery", "voltage")].min())
print("Max:", pipe.table[("battery", "voltage")].max())
Min: 3.874 volt
Max: 4.152 volt
Consolidating fields using the cache is also perfect for generating plots with a library like Toyplot (http://toyplot.readthedocs.io):
import toyplot
canvas = toyplot.Canvas()
axes = canvas.cartesian(grid=(3, 1, 0), label="Battery", ylabel="Voltage (V)")
axes.plot(pipe.table[("battery", "voltage")].to(pipecat.units.volt))
axes = canvas.cartesian(grid=(3, 1, 1), ylabel="Current (A)")
axes.plot(pipe.table[("battery", "current")].to(pipecat.units.amp))
axes = canvas.cartesian(grid=(3, 1, 2), ylabel="Charge (mAH)")
axes.plot(pipe.table[("battery", "charge")].to(pipecat.units.milliamp * pipecat.units.hour));
Note that nothing prevents us from doing useful work in the for
loop
that populates the cache, and nothing prevents us from accessing the
cache within the loop. For example, we might want to display field
values from individual records alongside a running average computed from
the cache. Or we might want to update our plot periodically as the loop
progresses.
Moving on, you will likely want to store records to disk for later
access. Pipecat provides components to make this easy too. First, you
can add pipecat.store.pickle.write()
to the end of a pipe, to
write records to disk using Python’s pickle format:
import pipecat.store.pickle
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.store.pickle.write(pipe, "charger.pickle")
for record in pipe:
pass
Later, you can use pipecat.store.pickle.read()
to read the records back in again:
pipe = pipecat.store.pickle.read("charger.pickle")
pipe = pipecat.store.cache(pipe)
for record in pipe:
pipecat.record.dump(record)
print("Average:", pipe.table[("battery", "voltage")].mean())
timestamp: 2017-12-27T00:21:49.522898+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC
timestamp: 2017-12-27T00:21:51.525903+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC
timestamp: 2017-12-27T00:21:53.532511+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 1.0 hour * milliampere
battery/current: 890.0 milliampere
battery/voltage: 3.877 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC
Average: 3.87533333333 volt
This is another example of the interchangeability of the Pipecat components: the pickle writer is a record consumer, and the pickle reader is a record generator. In essence, we “broke” our previous pipe into two separate pipes that communicate via the filesystem. While we won’t go into detail here, a similar approach could be used to communicate between threads using a message queue or between processes over a socket.
There is one final issue that we’ve ignored so far: when to stop logging
data. The pipecat.utility.readline()
function will read data
from the serial port as long as the port is open, blocking indefinitely
if no data is arriving. That means that for all the preceeding examples
the for
loop will never end unless the serial port is closed (i.e.
the external device is turned-off or unplugged), or the code is
interrupted using Control-C. While that’s fine for prototyping at the
command line, we need to have a way to stop collecting data and shutdown
cleanly if we’re going to automate data logging processes. Fortunately,
Pipecat provides several easy-to-use functions to do just that:
import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.count(pipe, count=2)
for record in pipe:
pipecat.record.dump(record)
timestamp: 2017-12-27T00:21:55.588725+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC
timestamp: 2017-12-27T00:21:57.597235+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC
Here, pipecat.limit.count()
ends the loop when count
records
have been received. This is often handy during development to limit the
amount of data consumed from a device that produces output continuously.
However, this approach is no good for devices like our charger that will
produce a finite, indeterminate number of records - if the device stops
sending records before the count
has been reached, the loop will
still block. Instead, we could use pipecat.limit.duration()
to
limit the total amount of time the loop is allowed to run instead:
import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.duration(pipe, duration=pipecat.quantity(4, pipecat.units.seconds))
for record in pipe:
pipecat.record.dump(record)
timestamp: 2017-12-27T00:21:57.614914+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC
timestamp: 2017-12-27T00:21:59.615434+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC
This approach is an improvement because it puts an upper-bound on the
amount of time the loop will run, whether the device has stopped sending
records or not. However, it’s still error-prone, since we don’t know in
advance how long charging will take - if we set the duration too low, it
may stop the loop before charging is complete. If we set the duration
too high, we will capture all the records we want, but we will likely
waste time waiting for records that will never come. Ideally, we would
like to exit the loop as soon as the charger tells us it’s finished.
Fortunately, the charger provides a field - charger/mode
that can do
just that:
import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.until(pipe, key=("charger", "mode"), value="finished")
for record in pipe:
pipecat.record.dump(record)
timestamp: 2017-12-27T00:22:01.639667+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 110.0 milliampere
battery/voltage: 4.141 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.8 degC
charger/temperature/internal: 32.5 degC
timestamp: 2017-12-27T00:22:03.648685+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 120.0 milliampere
battery/voltage: 4.14 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.9 degC
charger/temperature/internal: 32.7 degC
timestamp: 2017-12-27T00:22:05.653453+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 0.0 milliampere
battery/voltage: 4.138 volt
charger/mode: finished
charger/supply: 12.25 volt
charger/temperature/external: 23.9 degC
charger/temperature/internal: 32.7 degC
pipecat.limit.until()
terminates the loop as soon as it receives
a record with the given key and value. This approach finally gets us our
desired behavior (loop ends as soon as the charger is finished), but it
could use just a little more work to make it robust. For example, what
happens if the charger stops sending data before the mode changes? We
could combine pipecat.limit.until()
with
pipecat.limit.duration()
, but that would still suffer from the
terminate-too-soon / waste-too-much-time problem. Fortunately, we know
from testing that our charger sends records every two seconds (if at
all), and Pipecat provides pipecat.limit.timeout()
, which can
terminate the loop if it doesn’t receive a record within a specified
time interval:
import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.until(pipe, key=("charger", "mode"), value="finished")
pipe = pipecat.limit.timeout(pipe, timeout=pipecat.quantity(5, pipecat.units.seconds))
for record in pipe:
pipecat.record.dump(record)
timestamp: 2017-12-27T00:22:05.673115+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 110.0 milliampere
battery/voltage: 4.141 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.8 degC
charger/temperature/internal: 32.5 degC
timestamp: 2017-12-27T00:22:07.678467+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 120.0 milliampere
battery/voltage: 4.14 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.9 degC
charger/temperature/internal: 32.7 degC
timestamp: 2017-12-27T00:22:09.683829+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 0.0 milliampere
battery/voltage: 4.138 volt
charger/mode: finished
charger/supply: 12.25 volt
charger/temperature/external: 23.9 degC
charger/temperature/internal: 32.7 degC
This example still exits normally when the charger is finished, but it
will also exit cleanly if a record isn’t received within a five seconds.
More importantly, you can see how easy it is to combine limit functions
to control when the for
loop terminates.

GPS Receivers¶
Most GPS receivers have data logging capabilities that you can use with Pipecat to view navigational information. Some receivers connect to your computer via a serial port or a serial-over-USB cable that acts like a traditional serial port. Others can push data to a network socket. For this demonstration, we will receive GPS data sent from an iPhone to a UDP socket:
import pipecat.record
import pipecat.udp
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
for record in pipe:
pipecat.record.dump(record)
client: 172.10.0.20
message: $GPTXT,01,01,07,Pipecat*12
client: 172.10.0.20
message: $GPGGA,164100,3511.33136,N,10643.48435,W,1,8,0.9,1654.0,M,46.9,M,0,2*50
client: 172.10.0.20
message: $GPRMC,164100,A,3511.33136,N,10643.48435,W,0.00,0.00,311216,003.1,W*7C
client: 172.10.0.20
message: $GPGLL,3511.33136,N,10643.48435,W,164100,A*36
client: 172.10.0.20
message: $HCHDG,129.5,,,8.7,E*29
client: 172.10.0.20
message: $PASHR,164100190,138.24,T,+32.56,+48.49,+00.00,3.141,3.141,35.000,1,0*17
Here, we used pipecat.udp.receive()
to open a UDP socket
listening on port 7777 on all available network interfaces (“0.0.0.0”)
and convert the received messages into Pipecat Records, which
we dump to the console. Note that each record includes the address of
the client (the phone in this case), along with a “message” field
containing the raw data of the message. In this case the raw data is in
NMEA format, a widely-used standard for exchanging navigational data. To
decode the contents of each message, we add the appropriate Pipecat
device to the end of the pipe:
import pipecat.device.gps
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
for record in pipe:
pipecat.record.dump(record)
id: GPTXT
text: Pipecat
altitude: 1654.0 meter
dop: 0.9
geoid-height: 46.9 meter
id: GPGGA
latitude: 35.188856 degree
longitude: -106.724739167 degree
quality: 1
satellites: 8
time: 164100
active: True
date: 311216
id: GPRMC
latitude: 35.188856 degree
longitude: -106.724739167 degree
speed: 0.0 knot
time: 164100
track: 0.0 degree
variation: -3.1 degree
active: True
id: GPGLL
latitude: 35.188856 degree
longitude: -106.724739167 degree
time: 164100
heading: 129.5 degree
id: HCHDG
variation: 8.7 degree
heading: 138.24 degree
heading-accuracy: 35.0 degree
heave: 0.0 meter
id: PASHR
pitch: 48.49 degree
pitch-accuracy: 3.141 degree
roll: 32.56 degree
roll-accuracy: 3.141 degree
time: 164100190
As you can see, pipecat.device.gps.nmea()
has converted the raw
NMEA messages into records containing human-readable navigational fields
with appropriate physical units. Note that unlike the
Battery Chargers example, not every record produced by the GPS
receiver has the same fields. The NMEA standard includes many different
types of messages, and most GPS receivers will produce more than one
type. This will increase the complexity of our code - for example, we
might have to test for the presence of a field before extracting it from
a record:
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
for record in pipe:
if "latitude" in record:
print("Latitude:", record["latitude"], "Longitude:", record["longitude"])
Latitude: 35.1949926667 degree Longitude: -106.7111135 degree
Latitude: 35.1949926667 degree Longitude: -106.7111135 degree
Latitude: 35.1949926667 degree Longitude: -106.7111135 degree
Latitude: 35.1952843333 degree Longitude: -106.710192667 degree
Latitude: 35.1952843333 degree Longitude: -106.710192667 degree
Latitude: 35.1952843333 degree Longitude: -106.710192667 degree
Alternatively, we might use the record id
field to key our code off
a specific type of NMEA message:
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
for record in pipe:
if record["id"] == "PASHR":
print("Pitch:", record["pitch"])
Pitch: 66.82 degree
Pitch: 67.3 degree
Pitch: 66.8 degree
Pitch: 66.18 degree
Another alternative would be to add a filter to our pipe so we only receive records that match some criteria:
import pipecat.filter
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
pipe = pipecat.filter.keep(pipe, key="id", value="GPGLL")
for record in pipe:
pipecat.record.dump(record)
active: True
id: GPGLL
latitude: 35.1949926667 degree
longitude: -106.7111135 degree
time: 164252
active: True
id: GPGLL
latitude: 35.1952843333 degree
longitude: -106.710192667 degree
time: 164257
active: True
id: GPGLL
latitude: 35.1956116667 degree
longitude: -106.709064 degree
time: 164303
active: True
id: GPGLL
latitude: 35.1958851667 degree
longitude: -106.708156167 degree
time: 164308
Note that pipecat.filter.keep()
discards all records that don’t
meet the given criteria, which allows our downstream code to rely on the
availability of specific fields.
Regardless of the logic you employ to identify fields of interest, Pipecat always makes it easy to convert units safely and explicitly:
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
for record in pipe:
if "speed" in record:
print(record["speed"].to(pipecat.units.mph))
39.9320468464 mph
40.0586325857 mph
40.1276793526 mph
38.5626193033 mph
Let’s explore other things we can do with our pipe. To begin, you might want to add additional metadata to the records returned from a device. For example, if you were collecting data from multiple devices you might want to “tag” records with a user-specific unique identifier:
import pipecat.utility
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
pipe = pipecat.filter.keep(pipe, key="id", value="GPGLL")
pipe = pipecat.utility.add_field(pipe, "serial", "1237V")
for record in pipe:
pipecat.record.dump(record)
active: True
id: GPGLL
latitude: 35.1949926667 degree
longitude: -106.7111135 degree
serial: 1237V
time: 164252
active: True
id: GPGLL
latitude: 35.1952843333 degree
longitude: -106.710192667 degree
serial: 1237V
time: 164257
active: True
id: GPGLL
latitude: 35.1956116667 degree
longitude: -106.709064 degree
serial: 1237V
time: 164303
Now let’s consider calculating some simple statistics, such as our
average speed on a trip. When we iterate over the contents of a pipe
using a for
loop, we receive one record at-a-time until the pipe is
empty. We could keep track of a “running” average during iteration, and
there are use-cases where that is the best way to solve the problem.
However, for moderately-sized data, Pipecat provides a more convenient
approach:
import pipecat.store
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
pipe = pipecat.store.cache(pipe)
for record in pipe:
pass
print(pipe.table["speed"])
[ 0. 0. 10.59 17.96 4.39 24.14 30.65 33.59 33.28 32.85 34.08 34.78 35.28 34.66 34.46 34.1 34.64 34.41 33.88 33.75 34.7 34.81 34.87 33.51 33.71 35.38 32.09 28.94 18. 0. 1.19 21.23 31.92 33.55 34.91 34.78 33.75 32.71 31.67 31.14 31.45 31.94 31.16 32.27 35.46 35.34 34.06 33.82 34.91 34.72 34.83 34.95 33.38 33.08 27.39 5.21 0. 2.45 22.68 33.1 33.8 34.64 33.96 34.37 34.81 32.75 29.55 21.71 13.8 14.48 27.29 25.21 11.68 13.86 9.16 0. ] knot
Here, pipecat.store.cache()
creates an in-memory cache that
stores every record it receives. We have a do-nothing for
loop that
reads data from the charger to populate the cache. Once that’s complete,
we can use the cache table
attribute to retrieve data from the cache
using the same keys and syntax we would use with a record. Unlike a
record, the cache returns every value for a given key at once (using a
Numpy array), which makes it easy to compute the statistics we’re
interested in:
print("Average speed:", pipe.table["speed"].mean().to(pipecat.units.mph))
Average speed: 30.8533055116 mph
Consolidating fields using the cache is also perfect for generating plots with a library like Toyplot (http://toyplot.readthedocs.io):
import toyplot
canvas = toyplot.Canvas(width=600, height=400)
axes = canvas.cartesian(grid=(2, 1, 0), xlabel="Record #", ylabel="Speed (MPH)")
axes.plot(pipe.table["speed"].to(pipecat.units.mph))
axes = canvas.cartesian(grid=(2, 1, 1), xlabel="Record #", ylabel="Track")
axes.plot(pipe.table["track"]);
Note that nothing prevents us from doing useful work in the for
loop
that populates the cache, and nothing prevents us from accessing the
cache within the loop. For example, we might want to display field
values from individual records alongside a running average computed from
the cache. Or we might want to update our plot periodically as the loop
progresses.

Contributing¶
Even if you’re not in a position to contribute code to Pipecat, there are many ways you can help the project out:
- Send the names of serial ports that work with a device on your platform, so we can document it.
- Let us know if one of our existing devices works with some other model or brand of hardware, so we can document that.
- Send us a file containing sample output from your device.
- Tell us if our code doesn’t work with your device.
- Spread the word!
Getting Started¶
If you haven’t already, you’ll want to get familiar with the Pipecat repository at http://github.com/shead-custom-design/pipecat … there, you’ll find the Pipecat sources, issue tracker, and wiki.
Next, you’ll need to install Pipecat’s Dependencies. Then, you’ll be ready to get Pipecat’s source code and use setuptools to install it. To do this, you’ll almost certainly want to use “develop mode”. Develop mode is a a feature provided by setuptools that links the Pipecat source code into the install directory instead of copying it … that way you can edit the source code in your git sandbox, and you don’t have to re-install it to test your changes:
$ git clone https://github.com/sandialabs/pipecat.git
$ cd pipecat
$ python setup.py develop
Versioning¶
Pipecat version numbers follow the Semantic Versioning standard.
Coding Style¶
The Pipecat source code follows the PEP-8 Style Guide for Python Code.
Running Regression Tests¶
To run the Pipecat test suite, simply run regression.py from the top-level source directory:
$ cd pipecat
$ python regression.py
The tests will run, providing feedback on successes / failures.
Test Coverage¶
When you run the test suite with regression.py, it also automatically generates code coverage statistics. To see the coverage results, open .cover/index.html in a web browser.
Building the Documentation¶
To build the documentation, run:
$ cd pipecat
$ python docs/setup.py
Note that significant subsets of the documentation are written using Jupyter notebooks, so the docs/setup.py script requires Jupyter to convert the notebooks into restructured text files for inclusion with the rest of the documentation.
Once the documentation is built, you can view it by opening docs/_build/html/index.html in a web browser.

Release Notes¶
Pipecat 0.3.0 - May 27th, 2017¶
- Added support for motion capture on iOS devices.
- pipecat.record.dump() immediately flushes its output.
- Add sample GPS data.
- Set the Pint application registry when reading records with pipecat.store.pickle.read()
- Added support for OBD-II devices.
- Add example limits to the battery charger section of the user guide.
- Clarify the different between client and listener addresses for pipecat.udp.receive()
- Stub-out the GPS receiver section of the user guide.
- Add a new module for filtering records.
- Update testing code and expand the section on GPS receivers in the user guide.
- Ensure that pipecat.limit shuts-down upstream sources when terminated.
- Shutdown upstream generators for real this time.
- Handle KeyboardInterrupt gracefully within pipecat.limit.
- Add support for individual cell voltages in pipecat.device.charger.icharger_208b().
- Add a sample executable that monitors battery charger status, pipecat-charger-status.
- Add pipecat.device.serial.readline() for reliably retrieving lines of text from a serial port.
- Make all threads daemon threads so callers can shutdown normally.
- charging-status handles the case where no data has been received.
- Make the poll rate configurable for pipecat.device.serial.readline().
- Include the charger mode in charging-status output.
- Add functionality to retrieve data using HTTP requests.
- Add functionality to parse XML data.
- Add functionality to parse METAR weather data.
- Create a sample weather-monitoring application, pipecat-wind-status.
- Added pipecat.filter.duplicates().
- Add an option to clear pipecat.store.Table so caches can be emptied.
- Improve the logic for extracting METAR data from XML.
- Add exception handling to pipecat.xml.parse() and pipecat.http.get().
- Added separate loggers for pipecat.http and pipecat.xml.
Pipecat 0.2.0 - December 28th, 2016¶
- Improved documentation.
- Added sample charger data.
- Consolidated redundant file-management code.
Pipecat 0.1.0 - December 14th, 2016¶
- Initial Release

API Reference¶
Contents:
pipecat module¶
Elegant, flexible data logging in Python for connected sensors and instruments.
pipecat.connect module¶
Functions for connecting data sources.
-
pipecat.connect.
concatenate
(sources)¶ Concatenate records from multiple sources.
Yields all of the records from the first source, then all the records from the second source, and-so-on until every source has been consumed. Note that this means that it only makes sense to use sources that return a bounded number of records with concatenate()!
Parameters: sources (sequence of Record Generators, required) Yields: record (dict)
-
pipecat.connect.
multiplex
(sources)¶ Interleave records from multiple sources.
Yields records from all sources in the order that they arrive.
Parameters: sources (sequence of Record Generators, required) Yields: record (dict)
pipecat.device module¶
Data sources for specific hardware devices.
See sub-modules of this module for different classes of hardware device supported by Pipecat:
- Automobiles -
pipecat.device.auto
- Battery chargers -
pipecat.device.charger
- Clocks -
pipecat.device.clock
- GPS receivers -
pipecat.device.gps
- Motion sensors / accelerometers -
pipecat.device.motion
- Serial ports -
pipecat.device.serial
- Weather -
pipecat.device.weather
pipecat.device.auto module¶
Data sources that retrieve on-board diagnostics from an automobile using OBD-II.
-
pipecat.device.auto.
obd
(connection, commands=None, rate=<Quantity(5, 'second')>)¶ Retrieve OBD-II data from an automobile.
This component requires the Python-OBD module (http://python-obd.readthedocs.io).
Parameters: - connection (
obd.OBD
instance, required.) - rate (time quantity, required) – Rate at which data will be retrieved.
Yields: record (dict) – Records will contain OBD-II data retrieved from an automobile computer.
- connection (
pipecat.device.charger module¶
Data sources that retrieve information from battery chargers.
-
pipecat.device.charger.
icharger208b
(source, key)¶ Parse data from an iCharger 208B battery charger.
Parses data events emitted by the charger during charge, discharge, etc. Likely works with other models from iCharger, but this is untested. Consider Contributing to let us know.
This model battery charger comes with a USB cable that provides serial-over-USB communication with the host computer. To connect with the charger, you’ll need to open a handle to the appropriate serial port, the name of which will vary between platforms and based on the number of devices currently connected, and feed lines of text to the parser.
Examples
Open a serial port on a Mac OSX computer using pySerial, read lines from the serial port, parse them into records, and print them to stdout:
>>> pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000) >>> pipe = pipecat.utility.readline(pipe) >>> pipe = pipecat.device.charger.icharger208b(pipe, key="line") >>> for record in pipe: ... print record
Parameters: - source (Record generator returning records containing a “string” field.)
- key (Record key, optional) – The key in incoming records containing charger data.
Yields: record (dict) – Records will contain information including the charge mode, supply voltage, battery voltage, battery current, internal and external charger temperature, and the total charge added-to / removed-from the battery.
pipecat.device.clock module¶
Data sources that retrieve information from clocks.
-
pipecat.device.clock.
metronome
(rate=<Quantity(1.0, 'second')>)¶ Generate an empty record at fixed time intervals using the host clock.
Typically, you would use functions such as
pipecat.utility.add_field()
orpipecat.utility.add_timestamp()
to populate the (otherwise empty) records.Examples
If you want to know what time it is, at 5-minute intervals:
>>> pipe = pipecat.device.clock.metronome(pipecat.quantity(5, pipecat.units.minutes)) >>> pipe = pipecat.utility.add_timestamp(pipe) >>> for record in pipe: ... print record
Parameters: rate (time quantity, required) – The amount of time to wait between records. Yields: record (dict) – Empty record returned at fixed time intervals.
pipecat.device.gps module¶
Functions for working with GPS receivers.
-
pipecat.device.gps.
nmea
(source, key)¶ Parse NMEA messages from raw strings.
Examples
Parameters: - source (Record generator returning records containing a “string” field.)
- key (Record key, required) – The key in incoming records to parse as NMEA.
Yields: record (dict) – Records will contain multiple fields containing time, position, speed, heading, pitch, roll, and quality information based on device sending the data. Support is provided for GPGGA, GPGLL, GPRMC, GPTXT, HCHDG, and PASHR messages.
pipecat.device.motion module¶
Data sources that retrieve motion information from portable devices such as smartphones.
-
pipecat.device.motion.
ios
(rate=<Quantity(1, 'second')>)¶ Retrieve motion information from an iOS device.
This component requires the motion module provided by Pythonista.
Parameters: rate (time quantity, required) – Rate at which motion data will be retrieved. Yields: record (dict) – Records will contain information including the current acceleration due to gravity and the user, along with device attitude.
pipecat.device.serial module¶
Functions for working with serial ports.
-
pipecat.device.serial.
readline
(*args, **kwargs)¶ Reliably read lines from a serial port.
Accepts the same parameters as
serial.serial_for_url()
, plus the following:Parameters: poll (time quantity, optional) – Time to wait between failures. Yields: record (dict) – Records will contain each line of text read from the port.
pipecat.device.weather module¶
Data sources that decode weather information.
-
pipecat.device.weather.
metars
(source, key='xml')¶ Parse METAR information retrieved from https://aviationweather.gov/metar.
Parameters: - source (Record generator returning records containing XML data, required)
- key (Record key, optional)
Yields: record (dict) – Records will contain METAR data extracted from XML.
pipecat.filter module¶
Functions to filter records as they pass through a Pipecat pipe.
-
pipecat.filter.
duplicates
(source, key)¶ Discard records unless the given key value changes.
Parameters: - source (Record generator, required)
- key (string or tuple-of-strings, required) – Records will be discarded unless this key value changes.
-
pipecat.filter.
keep
(source, key=None, value=None)¶ Discard all records that don’t match the given criteria
Examples
Discard any record that doesn’t have an ‘id’ key with value “GPGGA”:
>>> pipe = # Defined elsewhere >>> pipe = pipecat.filter.keep(pipe, key="id", value="GPGGA") >>> for record in pipe: ... print record
Parameters: - source (Record generator, required)
- key (string or tuple-of-strings, optional) – Records must contain this key or they will be discarded.
- value (optional.) – Records with field ‘key’ must match this value or they will be discarded.
pipecat.http module¶
Functions for working with HTTP requests.
-
pipecat.http.
get
(*args, **kwargs)¶ Retrieve data using HTTP requests.
Accepts the same parameters as
requests.get()
, plus the following:Parameters: poll (time quantity, optional.) – Time to wait between requests. Yields: record (dict) – Records will contain status, (header, key), encoding, and body keys containing the results returned from each request.
-
pipecat.http.
receive
(address, include_body=True, include_client=False, include_method=False, include_path=False, include_version=False)¶ Receives data sent using HTTP requests.
Provides a simple HTTP server that receives client requests, converting each request into Pipecat records.
Parameters: address ((host, port) tuple, required) – TCP address and IP port to be bound for listening to requests. Yields: record (dict) – Records will contain client, version, method, path, and body keys containing the content of each request.
pipecat.json module¶
Functions for working with JSON data.
-
pipecat.json.
parse
(source, key='string', keyout='json')¶ Parse JSON data from records.
This filter parses an incoming record key as JSON, appending the JSON data to the output.
Parameters: - source (Record generator, required)
- key (Record key, optional) – The key in incoming records to parse as JSON.
- keyout (Record key, optional) – The key in outgoing records where the parsed JSON will be stored.
Yields: record (dict) – Input records with an additional keyout field containing JSON-compatible data.
pipecat.limit module¶
Functions to limit the output of a Pipecat pipeline.
-
pipecat.limit.
count
(source, count, name=None)¶ Limits the number of records returned from a source.
Examples
Produce seven records at one-second intervals:
>>> pipe = pipecat.device.clock.metronome() >>> pipe = pipecat.limit.count(pipe, count=7) >>> for record in pipe: ... print record
Parameters: - source (Record generator, required)
- count (int, required) – The number of records that will be returned from source.
- name (string, optional.) – Optional name for this Record generator to use in log output. Defaults to the function name.
-
pipecat.limit.
duration
(source, duration, timeout=<Quantity(0.1, 'second')>, name=None)¶ Return records from a source until a fixed time duration has expired.
Examples
Produce records at one-second intervals for three minutes:
>>> pipe = pipecat.device.clock.metronome() >>> pipe = pipecat.limit.duration(pipe, pipecat.quantity(3, pipecat.units.minutes)) >>> for record in pipe: ... print record
Parameters: - source (Record generator, required)
- duration (time quantity, required) – Maximum amount of time that records will be returned from source.
- timeout (time quantity, optional.) – Limits the amount of time to block while waiting for output from source. This affects the accuracy of when the function exits.
- name (string, optional.) – Optional name for this Record generator to use in log output. Defaults to the function name.
-
pipecat.limit.
timeout
(source, timeout, initial=<Quantity(1, 'hour')>, name=None)¶ Return records from another source until they stop arriving.
Parameters: - source (Record generator, required)
- timeout (time quantity, required) – Maximum time to wait for the next record before exiting.
- initial (time quantity, optional) – Maximum time to wait for the first record.
- name (string, optional.) – Optional name for this Record generator to use in log output. Defaults to the function name.
-
pipecat.limit.
until
(source, key, value, name=None)¶ Return records from another source until a record occurs with a specific key and value.
Examples
Print output from a battery charger until its mode changes to “finished”:
>>> pipe = <battery charger pipeline> >>> pipe = pipecat.limit.until(pipe, "mode", "finished") >>> for record in pipe: ... print record
Parameters: - source (Record generator, required)
- key (Record key, required)
- value (anything, required)
- name (string, optional.) – Optional name for this Record generator to use in log output. Defaults to the function name.
pipecat.queue module¶
Functions for working with queues.
-
pipecat.queue.
receive
(queue)¶ Receive records from a queue.
Queues are a handy mechanism for passing data between threads. Use this function to receive records sent over a queue by
pipecat.queue.send()
.Parameters: queue ( queue.Queue
, required)Yields: record (dict)
-
pipecat.queue.
send
(source, queue, shutdown=None)¶ Send records from a source to a queue.
Queues are a handy mechanism for passing data between threads. Use this function to send records over a queue to
pipecat.queue.receive()
.Parameters: - source (Record generator, required)
- queue (
queue.Queue
, required) - shutdown (
threading.Event
, optional.) – If supplied, callers can set the event to safely stop sending from another thread.
pipecat.record module¶
Functions for manipulating data records.
-
pipecat.record.
add_field
(record, key, value)¶ Add a key-value pair to a record.
Parameters: - record (dict, required) – Dictionary of key-value pairs that constitute a record.
- key (Record key, required) – Record key to be overwritten.
- value (object) – New record value.
-
pipecat.record.
dump
(record, fobj=<open file '<stdout>', mode 'w'>)¶ Dump a human-readable text representation of a record to a file-like object.
Parameters: - record (dict, required) – Dictionary of key-value pairs to be written-out.
- fobj (file-like object, optional)
-
pipecat.record.
remove_field
(record, key)¶ Remove a key-value pair from a record.
Parameters: - record (dict, required) – Dictionary of key-value pairs that constitute a record.
- key (Record key, required) – Record key to be removed.
pipecat.store module¶
Functions for storing data.
-
class
pipecat.store.
Cache
(source)¶ Bases:
object
Cache records in memory for column-oriented access.
-
next
()¶
-
table
¶
-
-
pipecat.store.
cache
(source)¶ Create an in-memory cache for records.
Parameters: source (generator, required) – A source of records to be cached. Returns: cache Return type: instance of pipecat.store.Cache
.
pipecat.store.csv module¶
Functions for reading and writing CSV data.
-
pipecat.store.csv.
write
(source, fobj)¶ Append records to a CSV file.
pipecat.store.pickle module¶
Functions for reading and writing data using Python pickle files.
Warning
Pickle files written using Python 2 cannot be read using Python 3, and vice-versa.
-
pipecat.store.pickle.
read
(fobj)¶ Read records from a pickle file.
-
pipecat.store.pickle.
write
(source, fobj)¶ Append records to a pickle file.
pipecat.udp module¶
Functions for working with UDP messages.
-
pipecat.udp.
receive
(address, maxsize)¶ Receive messages from a UDP socket.
Parameters: - address ((host, port) tuple, required) – TCP address and IP port to be bound for listening for UDP packets.
- maxsize (int, required) – Maximum length of packets returned from the UDP socket.
Yields: record (dict) – Records will contain a client field with the address of the sending client, and a message field containing the content of the message.
pipecat.utility module¶
Convenience functions for working with data sources.
-
pipecat.utility.
add_field
(source, key, value)¶ Adds a key-value pair to every record returned from a source.
Parameters: - source (Record generator, required)
- key (Record key, required)
- value (any value.)
Yields: record (dict) – Input records containing an additional field key with value value.
-
pipecat.utility.
add_timestamp
(source, key='timestamp')¶ Add a timestamp to every record returned from a source.
Parameters: - source (Record generator, required)
- key (Record key, optional)
Yields: record (dict) – Input records containing an additional field key with a
arrow.arrow.Arrow
UTC timestamp value.
-
pipecat.utility.
extract_quantities
(source, value='value', units='units')¶ Convert values with separate magnitude / units data into
pipecat.quantity
instances.If a field value is a dict containing value and units keys, it will be replaced with a matching
pipecat.quantity
.Parameters: - source (Record generator, required)
- value (Record key, optional)
- units (Record key, optional)
Yields: record (dict) – Input records with field values converted to
pipecat.quantity
instances.
-
pipecat.utility.
promote_field
(source, key)¶ Promote key-value pairs from one field into their own fields.
The field key must exist and contain a dict or dict-like object.
Parameters: - source (Record generator, required)
- key (Record key, required)
Yields: record (dict) – Input records containing additional fields promoted from the value stored in key.
-
pipecat.utility.
readline
(fobj, encoding='utf-8')¶ Extract lines from a file or file-like object.
Parameters: fobj (file-like object, required) – This could be an open file, instance of io.StringIO
, a serial connection, or any other object from which lines of text can be read.Yields: record (dict) – Records containing a “string” field that stores one line of text.
-
pipecat.utility.
remove_field
(source, key)¶ Removes a key-value pair from every record returned by a source.
Records that don’t contain key are returned unmodified (this is not an error condition).
Parameters: - source (Record generator, required)
- key (Record key, required) – The field to be removed.
Yields: record (dict) – Input records with the key field removed.
-
pipecat.utility.
split_keys
(source, delimiter)¶ Convert flat keys into hierarchical keys using a delimiter.
This is useful working with data such as JSON that can’t represent hierarchical keys explicitly.
Parameters: - source (Record generator, required)
- delimiter (str, required)
Yields: record (dict) – Input records with keys containing the delimiter converted into hierarchical keys.
-
pipecat.utility.
trace
(source, name=None)¶ Log the behavior of a source for debugging.
Use
pipecat.utility.trace()
to log the behavior of another record generator, including the records it generates, any exceptions thrown, and notifications when it starts and finishes. Records are passed through this function unmodified, so it can be inserted into an existing pipe without altering its behavior.Parameters: source (Record generator, required) Yields: record (dict) – Unmodified input records.
pipecat.xml module¶
Functions for working with XML data.
-
pipecat.xml.
parse
(source, key='string', keyout='xml')¶ Parse XML data from a record.
Parameters: - source (Record generator, required)
- key (Record key, optional) – Input key containing a string to be parsed as XML.
- keyout (Record key, optional) – Output key where the parsed XML DOM will be stored.
Yields: record (dict) – Input records with a new field key containing an XML DOM.

Support¶
The Pipecat documentation:
Visit our GitHub repository for access to source code, issue tracker, and the wiki:
We also have a continuous integration server that runs the Pipecat regression test suite anytime changes are committed to GitHub:
And here are our test coverage stats, also updated automatically when modifications are committed:
For Pipecat questions, comments, or suggestions, get in touch with the team at:
Otherwise, you can contact Tim directly:
- Timothy M. Shead - tim@shead-custom-design

Credits¶
Included in Pipecat¶
“FOSS Data-Logging for Junsi iCharger”, https://apollo.open-resource.org/mission:log:2012:10:06:free-and-open-source-junsi-icharger-data-logger, accessed December, 2016.
Influential Reading¶
D. Beazley, “Generator Tricks For Systems Programmers Version 2.0,” http://www.dabeaz.com/generators-uk/index.html, accessed December, 2016.