_images/pipecat.png

Welcome!

Welcome to Pipecat … elegant, flexible data logging in Python for connected sensors and instruments. Use Pipecat to log data from battery chargers, GPS, automobiles, gyros, weather, and more!

Documentation

_images/pipecat.png

Installation

Using a Package Manager

A package manager (conda, apt, yum, MacPorts, etc) should generally be your first stop for installing Pipecat - it will make it easy to install Pipecat and its dependencies, keep them up-to-date, and even (gasp!) uninstall them cleanly. If your package manager doesn’t support Pipecat yet, drop them a line and let them know you’d like them to add it!

If you’re new to Python or unsure where to start, we strongly recommend taking a look at Anaconda, which the Pipecat developers use during their day-to-day work.

_images/pipecat.png

Anaconda Installation

Whether you’re new to Python or an old hand, we highly recommend installing Anaconda. Anaconda includes and conveniently installs Python and other commonly used packages for scientific computing and data science, including many of Pipecat’s dependencies.

Using Pip / Easy Install

If your package manager doesn’t support Pipecat, or doesn’t have the latest version, your next option should be Python setup tools like pip. You can always install the latest stable version of Pipecat and its required dependencies using:

$ pip install pipecat

… following that, you’ll be able to use all of Pipecat’s features.

From Source

Finally, if you want to work with the latest, bleeding-edge Pipecat goodness, you can install it using the source code:

$ git clone https://github.com/shead-custom-design/pipecat
$ cd pipecat
$ sudo python setup.py install

The setup script installs Pipecat’s required dependencies and copies Pipecat into your Python site-packages directory, ready to go.

_images/pipecat.png

Dependencies

Minimum Requirements

To use Pipecat you will need, at a minimum, Python 2 or 3 (duh):

plus the following (if you install Pipecat using pip, these are automatically installed for you):

Source Installation

If you’re installing Pipecat from source, you’ll need setuptools to run the Pipecat setup.py script:

Regression Testing

The following are required to run Pipecat’s regression tests and view code coverage:

Generating Documentation

And you’ll need to following to generate this documentation:

_images/pipecat.png

Compatibility

A quick disclaimer on backwards-compatibility for Pipecat users:

Pipecat follows the Semantic Versioning standard for assigning version numbers in a way that has specific meaning. As of this writing Pipecat releases are still in the 0.y.z development phase, which means (among other things) that the API may change at any time. We try not to be abusive about it, but you should be prepared for the occasional bump on the road to the 1.0 release.

_images/pipecat.png

User Guide

Contents:

_images/pipecat.png

Design

Records

Components in Pipecat manipulate records, which are plain Python dicts - record generators (such as hardware devices) produce records as output, record filters consume records as input and produce modified records as output, while record consumers (such as functions that store data) only take records as input. A record could represent a location reported by the GPS in your phone, an OBD-II reading from your car’s diagnostic computer, or the state of a battery reported by a battery charger. Because a record is a dict, it can contain however many key-value pairs are appropriate for a given device, and you can manipulate that data easily.

Record Keys

For interoperability, Pipecat requires (with one exception) that record keys be strings. Using strings as keys makes records easy to understand, manipulate, and serialize, while allowing great flexibility in the choice of keys for devices. Pipecat intentionally does not impose any naming scheme on record keys, although we plan to evolve consistent sets of well-defined keys for specific device classes as we go. Our desire is to encourage developers of new Pipecat components to think broadly about the type of data they store in records. The one exception to the permissive “strings-as-keys” approach is that Pipecat encourages developers to use “hierarchical” keys where appropriate, and requires that hierarchical keys be represented using tuples-of-strings. For example, a battery charger with both internal and external temperature sensors should use (“temperature”, “internal”) as a key instead of some other private naming scheme such as “temperature-internal”, “temperature/internal”, “temperature|internal”, etc. Representing hierarchies in this way makes them explicit, avoids a proliferation of private naming schemes for hierarchical data, and allows downstream components to manipulate hierarchical keys in a consistent way.

Record Values

Any type can be used as a value in a Pipecat record, and the goal again is to avoid needlessly constraining the ingenuity of Pipecat developers. There are just two caveats:

  • We strongly encourage the use of Arrow objects to store timestamp data. Arrow is already a required dependency of Pipecat, so it’s guaranteed to be available.
  • We strongly encourage the use of explicit physical units wherever possible. Pipecat provides builtin quantity and unit support from Pint to make this easy. Again, Pint is a required Pipecat dependency, so it’s guaranteed available for Pipecat developers.

Record Generators

Record generators are any iterable expression (function or object) that produces records. An iterable expression is any expression that can be used with the Python for statement, and you use for loops to read records from generators:

generator = pipecat.device.clock.metronome()
for record in generator:
    pipecat.record.dump(record)

Some examples of record generators include:

Record Consumers

Record consumers are any function or object that consumes records, i.e. iterates over the results of a record generator, returning nothing. Some examples of record consumers include:

Record Filters

Record filters are any function or object that both consumes and generates records. Most of the components provided by Pipecat fall into this category. Examples include:

Pipes

When all is said and done, you use Pipecat by hooking-together components to create pipes that retrieve, process, and store records as part of some larger task. In the following example, we retrieve data from a battery charger connected via a serial port, and print it to the console:

pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(pipe)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
    pipecat.record.dump(record)

If we want to save the records to a CSV file, we simply add an additional component to the pipe:

pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(pipe)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.store.csv.write(pipe, "battery.csv")
for record in pipe:
    pipecat.record.dump(record)

Note from this example how we use a single variable to keep track of the “output” end of the pipe, passing it as the “input” to each component that we connect. Of course, nothing requires that you re-use a variable in this way, but we find that this style avoids a proliferation of otherwise unused symbols and makes reordering, adding and subtracting components in a pipe much easier. For example, it’s easy to comment-out the component we just added without affecting any downstream code:

pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(pipe)
pipe = pipecat.device.charger.icharger208b(pipe)
#pipe = pipecat.store.csv.write(pipe, "battery.csv")
for record in pipe:
    pipecat.record.dump(record)

Similarly, you can easily insert and reorder components without having to worry about renaming variables. Here, we add a component to timestamp the battery charger records, and another component to automatically stop iteration after five seconds of inactivity:

pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(pipe)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.timeout(pipe, timeout=pipecat.quantity(5, pipecat.units.seconds))
pipe = pipecat.store.csv.write(pipe, "battery.csv")
for record in pipe:
    pipecat.record.dump(record)
_images/pipecat.png

Battery Chargers

Many modern “smart” battery chargers have data logging capabilities that you can use with Pipecat to view the state of a battery during charging. Typically, these chargers connect to your computer via a serial port or a serial-over-USB cable that acts like a traditional serial port. Data is then sent to the port during charging. For example, data from an iCharger 208B connected to a Mac computer using a USB cable could be read using the following:

import serial
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
for line in port:
    print(line)
$1;1;;12250;3874;93;0;0;0;0;0;0;0;0;291;236;0;20

$1;1;;12250;3875;92;0;0;0;0;0;0;0;0;294;236;0;17

$1;1;;12250;3877;89;0;0;0;0;0;0;0;0;291;236;1;29

Here, we used the pySerial (http://pyserial.readthedocs.io) library to open a serial port and read data from the charger, which sends data to the port one line at a time. Note that the device name for the port - “/dev/cu.SLAB_USBtoUART” in this case - will vary depending on your device and operating system. For example, you might use “COM1” or “COM2” on Windows, or “/dev/ttyS0” on Linux.

Our first step in using Pipecat to decipher the raw device output is to turn the for-loop from the previous example into a pipe:

import pipecat.utility
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
for record in pipe:
    print(record)
{'string': u'$1;1;;12250;3874;93;0;0;0;0;0;0;0;0;291;236;0;20rn'}
{'string': u'$1;1;;12250;3875;92;0;0;0;0;0;0;0;0;294;236;0;17rn'}
{'string': u'$1;1;;12250;3877;89;0;0;0;0;0;0;0;0;291;236;1;29rn'}

In this case, pipecat.utility.readline() converts the raw data into Records that store each line of data for further processing. To decode the contents of each line, we add the appropriate Pipecat device to the end of the pipe:

import pipecat.device.charger
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
    print(record)
{('battery', 'cell4', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'temperature', 'external'): <Quantity(23.6, 'degC')>, ('battery', 'cell5', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell8', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'mode'): 'charge', ('charger', 'temperature', 'internal'): <Quantity(29.1, 'degC')>, ('battery', 'cell2', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'voltage'): <Quantity(3.874, 'volt')>, ('battery', 'cell6', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell1', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'charge'): <Quantity(0.0, 'hour * milliampere')>, ('battery', 'cell3', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell7', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'current'): <Quantity(930.0, 'milliampere')>, ('charger', 'supply'): <Quantity(12.25, 'volt')>}
{('battery', 'cell4', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'temperature', 'external'): <Quantity(23.6, 'degC')>, ('battery', 'cell5', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell8', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'mode'): 'charge', ('charger', 'temperature', 'internal'): <Quantity(29.4, 'degC')>, ('battery', 'cell2', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'voltage'): <Quantity(3.875, 'volt')>, ('battery', 'cell6', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell1', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'charge'): <Quantity(0.0, 'hour * milliampere')>, ('battery', 'cell3', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell7', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'current'): <Quantity(920.0, 'milliampere')>, ('charger', 'supply'): <Quantity(12.25, 'volt')>}
{('battery', 'cell4', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'temperature', 'external'): <Quantity(23.6, 'degC')>, ('battery', 'cell5', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell8', 'voltage'): <Quantity(0.0, 'volt')>, ('charger', 'mode'): 'charge', ('charger', 'temperature', 'internal'): <Quantity(29.1, 'degC')>, ('battery', 'cell2', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'voltage'): <Quantity(3.877, 'volt')>, ('battery', 'cell6', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell1', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'charge'): <Quantity(1.0, 'hour * milliampere')>, ('battery', 'cell3', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'cell7', 'voltage'): <Quantity(0.0, 'volt')>, ('battery', 'current'): <Quantity(890.0, 'milliampere')>, ('charger', 'supply'): <Quantity(12.25, 'volt')>}

As you can see, pipecat.device.charger.icharger208b() has converted the raw data records into charger-specific records containing human-readable fields whose values have appropriate physical units. Let’s use pipecat.record.dump() to make the output a little more readable:

import pipecat.record
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
    pipecat.record.dump(record)
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC

battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC

battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 1.0 hour * milliampere
battery/current: 890.0 milliampere
battery/voltage: 3.877 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC

Now, you can extract just the data you care about from a record:

port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
    print(record[("battery", "voltage")], record[("battery", "current")])
3.874 volt 930.0 milliampere
3.875 volt 920.0 milliampere
3.877 volt 890.0 milliampere

And you can convert units safely and explicitly:

port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
    print(record[("battery", "current")].to(pipecat.units.amps))
0.93 ampere
0.92 ampere
0.89 ampere

As an aside, you may be wondering at this point why it’s necessary to explicitly create the serial port and connect it to readline … why not code that functionality directly into icharger208b? The answer is flexibility: by separating Pipecat’s functionality into discrete, well-defined components, those components can be easily combined in new and unexpected ways. For example, you could use icharger208b with a charger that communicated over a network socket instead of a serial port. Or you could “replay” data from a charger stored in a file:

fobj = open("../data/icharger208b-charging-sample")
pipe = pipecat.utility.readline(fobj)
pipe = pipecat.device.charger.icharger208b(pipe)
for record in pipe:
    print(record[("battery", "charge")])
0.0 hour * milliampere
0.0 hour * milliampere
1.0 hour * milliampere
1.0 hour * milliampere
2.0 hour * milliampere

Let’s explore other things we can do with our pipe. To begin, you might want to add additional metadata to the records returned from a device. For example, you might want to append timestamps:

port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
for record in pipe:
    pipecat.record.dump(record)
timestamp: 2017-12-27T00:21:35.210018+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC

timestamp: 2017-12-27T00:21:37.215579+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC

timestamp: 2017-12-27T00:21:39.219548+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 1.0 hour * milliampere
battery/current: 890.0 milliampere
battery/voltage: 3.877 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC

Note that pipecat.utility.add_timestamp() has added a timestamp field to each record. Timestamps in Pipecat are always recorded using UTC (universal) time, so you will likely want to convert them to your local timezone before formatting them for display:

port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
for record in pipe:
    print(
        record["timestamp"].to("local").format("YYYY-MM-DD hh:mm:ss a"),
        record[("battery", "voltage")],
    )
2017-12-26 05:21:41 pm 3.874 volt
2017-12-26 05:21:43 pm 3.875 volt
2017-12-26 05:21:45 pm 3.877 volt

You could also use pipecat.utility.add_field() to append your own custom field to every record that passes through the pipe.

Now let’s consider calculating some simple statistics, such as the minimum and maximum battery voltage while charging. When we iterate over the contents of a pipe using a for loop, we receive one record at-a-time until the pipe is empty. We could keep track of a “running” minimum and maximum during iteration, and there are use-cases where that is the best way to solve the problem. However, for moderately-sized data, Pipecat provides a more convenient approach:

import pipecat.store
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.store.cache(pipe)
for record in pipe:
    pass
print(len(pipe.table))
print(pipe.table[("battery", "voltage")])
1390
[ 3.874  3.875  3.877 ...,  4.119  4.119  4.119] volt

Here, pipecat.store.cache() creates an in-memory cache that stores every record it receives. We have a do-nothing for loop that reads data from the charger to populate the cache. Once that’s complete, we can use the cache table attribute to retrieve data from the cache using the same keys and syntax we would use with a record. Unlike a record, the cache returns every value for a given key at once (using a Numpy array), which makes it easy to compute the statistics we’re interested in:

import pipecat.store
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.store.cache(pipe)
for record in pipe:
    pass
print("Min:", pipe.table[("battery", "voltage")].min())
print("Max:", pipe.table[("battery", "voltage")].max())
Min: 3.874 volt
Max: 4.152 volt

Consolidating fields using the cache is also perfect for generating plots with a library like Toyplot (http://toyplot.readthedocs.io):

import toyplot

canvas = toyplot.Canvas()
axes = canvas.cartesian(grid=(3, 1, 0), label="Battery", ylabel="Voltage (V)")
axes.plot(pipe.table[("battery", "voltage")].to(pipecat.units.volt))

axes = canvas.cartesian(grid=(3, 1, 1), ylabel="Current (A)")
axes.plot(pipe.table[("battery", "current")].to(pipecat.units.amp))

axes = canvas.cartesian(grid=(3, 1, 2), ylabel="Charge (mAH)")
axes.plot(pipe.table[("battery", "charge")].to(pipecat.units.milliamp * pipecat.units.hour));
0500100015003.94.04.04.2Voltage (V)Battery0500100015000.00.51.0Current (A)0500100015000100200300400500Charge (mAH)

Note that nothing prevents us from doing useful work in the for loop that populates the cache, and nothing prevents us from accessing the cache within the loop. For example, we might want to display field values from individual records alongside a running average computed from the cache. Or we might want to update our plot periodically as the loop progresses.

Moving on, you will likely want to store records to disk for later access. Pipecat provides components to make this easy too. First, you can add pipecat.store.pickle.write() to the end of a pipe, to write records to disk using Python’s pickle format:

import pipecat.store.pickle
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.store.pickle.write(pipe, "charger.pickle")
for record in pipe:
    pass

Later, you can use pipecat.store.pickle.read() to read the records back in again:

pipe = pipecat.store.pickle.read("charger.pickle")
pipe = pipecat.store.cache(pipe)
for record in pipe:
    pipecat.record.dump(record)
print("Average:", pipe.table[("battery", "voltage")].mean())
timestamp: 2017-12-27T00:21:49.522898+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC

timestamp: 2017-12-27T00:21:51.525903+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC

timestamp: 2017-12-27T00:21:53.532511+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 1.0 hour * milliampere
battery/current: 890.0 milliampere
battery/voltage: 3.877 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC

Average: 3.87533333333 volt

This is another example of the interchangeability of the Pipecat components: the pickle writer is a record consumer, and the pickle reader is a record generator. In essence, we “broke” our previous pipe into two separate pipes that communicate via the filesystem. While we won’t go into detail here, a similar approach could be used to communicate between threads using a message queue or between processes over a socket.

There is one final issue that we’ve ignored so far: when to stop logging data. The pipecat.utility.readline() function will read data from the serial port as long as the port is open, blocking indefinitely if no data is arriving. That means that for all the preceeding examples the for loop will never end unless the serial port is closed (i.e. the external device is turned-off or unplugged), or the code is interrupted using Control-C. While that’s fine for prototyping at the command line, we need to have a way to stop collecting data and shutdown cleanly if we’re going to automate data logging processes. Fortunately, Pipecat provides several easy-to-use functions to do just that:

import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.count(pipe, count=2)
for record in pipe:
    pipecat.record.dump(record)
timestamp: 2017-12-27T00:21:55.588725+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC

timestamp: 2017-12-27T00:21:57.597235+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC

Here, pipecat.limit.count() ends the loop when count records have been received. This is often handy during development to limit the amount of data consumed from a device that produces output continuously. However, this approach is no good for devices like our charger that will produce a finite, indeterminate number of records - if the device stops sending records before the count has been reached, the loop will still block. Instead, we could use pipecat.limit.duration() to limit the total amount of time the loop is allowed to run instead:

import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.duration(pipe, duration=pipecat.quantity(4, pipecat.units.seconds))
for record in pipe:
    pipecat.record.dump(record)
timestamp: 2017-12-27T00:21:57.614914+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 930.0 milliampere
battery/voltage: 3.874 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.1 degC

timestamp: 2017-12-27T00:21:59.615434+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 0.0 hour * milliampere
battery/current: 920.0 milliampere
battery/voltage: 3.875 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.6 degC
charger/temperature/internal: 29.4 degC

This approach is an improvement because it puts an upper-bound on the amount of time the loop will run, whether the device has stopped sending records or not. However, it’s still error-prone, since we don’t know in advance how long charging will take - if we set the duration too low, it may stop the loop before charging is complete. If we set the duration too high, we will capture all the records we want, but we will likely waste time waiting for records that will never come. Ideally, we would like to exit the loop as soon as the charger tells us it’s finished. Fortunately, the charger provides a field - charger/mode that can do just that:

import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.until(pipe, key=("charger", "mode"), value="finished")
for record in pipe:
    pipecat.record.dump(record)
timestamp: 2017-12-27T00:22:01.639667+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 110.0 milliampere
battery/voltage: 4.141 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.8 degC
charger/temperature/internal: 32.5 degC

timestamp: 2017-12-27T00:22:03.648685+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 120.0 milliampere
battery/voltage: 4.14 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.9 degC
charger/temperature/internal: 32.7 degC

timestamp: 2017-12-27T00:22:05.653453+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 0.0 milliampere
battery/voltage: 4.138 volt
charger/mode: finished
charger/supply: 12.25 volt
charger/temperature/external: 23.9 degC
charger/temperature/internal: 32.7 degC

pipecat.limit.until() terminates the loop as soon as it receives a record with the given key and value. This approach finally gets us our desired behavior (loop ends as soon as the charger is finished), but it could use just a little more work to make it robust. For example, what happens if the charger stops sending data before the mode changes? We could combine pipecat.limit.until() with pipecat.limit.duration(), but that would still suffer from the terminate-too-soon / waste-too-much-time problem. Fortunately, we know from testing that our charger sends records every two seconds (if at all), and Pipecat provides pipecat.limit.timeout(), which can terminate the loop if it doesn’t receive a record within a specified time interval:

import pipecat.limit
port = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
pipe = pipecat.utility.readline(port)
pipe = pipecat.device.charger.icharger208b(pipe)
pipe = pipecat.utility.add_timestamp(pipe)
pipe = pipecat.limit.until(pipe, key=("charger", "mode"), value="finished")
pipe = pipecat.limit.timeout(pipe, timeout=pipecat.quantity(5, pipecat.units.seconds))
for record in pipe:
    pipecat.record.dump(record)
timestamp: 2017-12-27T00:22:05.673115+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 110.0 milliampere
battery/voltage: 4.141 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.8 degC
charger/temperature/internal: 32.5 degC

timestamp: 2017-12-27T00:22:07.678467+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 120.0 milliampere
battery/voltage: 4.14 volt
charger/mode: charge
charger/supply: 12.25 volt
charger/temperature/external: 23.9 degC
charger/temperature/internal: 32.7 degC

timestamp: 2017-12-27T00:22:09.683829+00:00
battery/cell1/voltage: 0.0 volt
battery/cell2/voltage: 0.0 volt
battery/cell3/voltage: 0.0 volt
battery/cell4/voltage: 0.0 volt
battery/cell5/voltage: 0.0 volt
battery/cell6/voltage: 0.0 volt
battery/cell7/voltage: 0.0 volt
battery/cell8/voltage: 0.0 volt
battery/charge: 503.0 hour * milliampere
battery/current: 0.0 milliampere
battery/voltage: 4.138 volt
charger/mode: finished
charger/supply: 12.25 volt
charger/temperature/external: 23.9 degC
charger/temperature/internal: 32.7 degC

This example still exits normally when the charger is finished, but it will also exit cleanly if a record isn’t received within a five seconds. More importantly, you can see how easy it is to combine limit functions to control when the for loop terminates.

_images/pipecat.png

GPS Receivers

Most GPS receivers have data logging capabilities that you can use with Pipecat to view navigational information. Some receivers connect to your computer via a serial port or a serial-over-USB cable that acts like a traditional serial port. Others can push data to a network socket. For this demonstration, we will receive GPS data sent from an iPhone to a UDP socket:

import pipecat.record
import pipecat.udp
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
for record in pipe:
    pipecat.record.dump(record)
client: 172.10.0.20
message: $GPTXT,01,01,07,Pipecat*12


client: 172.10.0.20
message: $GPGGA,164100,3511.33136,N,10643.48435,W,1,8,0.9,1654.0,M,46.9,M,0,2*50


client: 172.10.0.20
message: $GPRMC,164100,A,3511.33136,N,10643.48435,W,0.00,0.00,311216,003.1,W*7C


client: 172.10.0.20
message: $GPGLL,3511.33136,N,10643.48435,W,164100,A*36


client: 172.10.0.20
message: $HCHDG,129.5,,,8.7,E*29


client: 172.10.0.20
message: $PASHR,164100190,138.24,T,+32.56,+48.49,+00.00,3.141,3.141,35.000,1,0*17

Here, we used pipecat.udp.receive() to open a UDP socket listening on port 7777 on all available network interfaces (“0.0.0.0”) and convert the received messages into Pipecat Records, which we dump to the console. Note that each record includes the address of the client (the phone in this case), along with a “message” field containing the raw data of the message. In this case the raw data is in NMEA format, a widely-used standard for exchanging navigational data. To decode the contents of each message, we add the appropriate Pipecat device to the end of the pipe:

import pipecat.device.gps
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
for record in pipe:
    pipecat.record.dump(record)
id: GPTXT
text: Pipecat

altitude: 1654.0 meter
dop: 0.9
geoid-height: 46.9 meter
id: GPGGA
latitude: 35.188856 degree
longitude: -106.724739167 degree
quality: 1
satellites: 8
time: 164100

active: True
date: 311216
id: GPRMC
latitude: 35.188856 degree
longitude: -106.724739167 degree
speed: 0.0 knot
time: 164100
track: 0.0 degree
variation: -3.1 degree

active: True
id: GPGLL
latitude: 35.188856 degree
longitude: -106.724739167 degree
time: 164100

heading: 129.5 degree
id: HCHDG
variation: 8.7 degree

heading: 138.24 degree
heading-accuracy: 35.0 degree
heave: 0.0 meter
id: PASHR
pitch: 48.49 degree
pitch-accuracy: 3.141 degree
roll: 32.56 degree
roll-accuracy: 3.141 degree
time: 164100190

As you can see, pipecat.device.gps.nmea() has converted the raw NMEA messages into records containing human-readable navigational fields with appropriate physical units. Note that unlike the Battery Chargers example, not every record produced by the GPS receiver has the same fields. The NMEA standard includes many different types of messages, and most GPS receivers will produce more than one type. This will increase the complexity of our code - for example, we might have to test for the presence of a field before extracting it from a record:

pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
for record in pipe:
    if "latitude" in record:
        print("Latitude:", record["latitude"], "Longitude:", record["longitude"])
Latitude: 35.1949926667 degree Longitude: -106.7111135 degree
Latitude: 35.1949926667 degree Longitude: -106.7111135 degree
Latitude: 35.1949926667 degree Longitude: -106.7111135 degree
Latitude: 35.1952843333 degree Longitude: -106.710192667 degree
Latitude: 35.1952843333 degree Longitude: -106.710192667 degree
Latitude: 35.1952843333 degree Longitude: -106.710192667 degree

Alternatively, we might use the record id field to key our code off a specific type of NMEA message:

pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
for record in pipe:
    if record["id"] == "PASHR":
        print("Pitch:", record["pitch"])
Pitch: 66.82 degree
Pitch: 67.3 degree
Pitch: 66.8 degree
Pitch: 66.18 degree

Another alternative would be to add a filter to our pipe so we only receive records that match some criteria:

import pipecat.filter
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
pipe = pipecat.filter.keep(pipe, key="id", value="GPGLL")
for record in pipe:
    pipecat.record.dump(record)
active: True
id: GPGLL
latitude: 35.1949926667 degree
longitude: -106.7111135 degree
time: 164252

active: True
id: GPGLL
latitude: 35.1952843333 degree
longitude: -106.710192667 degree
time: 164257

active: True
id: GPGLL
latitude: 35.1956116667 degree
longitude: -106.709064 degree
time: 164303

active: True
id: GPGLL
latitude: 35.1958851667 degree
longitude: -106.708156167 degree
time: 164308

Note that pipecat.filter.keep() discards all records that don’t meet the given criteria, which allows our downstream code to rely on the availability of specific fields.

Regardless of the logic you employ to identify fields of interest, Pipecat always makes it easy to convert units safely and explicitly:

pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
for record in pipe:
    if "speed" in record:
        print(record["speed"].to(pipecat.units.mph))
39.9320468464 mph
40.0586325857 mph
40.1276793526 mph
38.5626193033 mph

Let’s explore other things we can do with our pipe. To begin, you might want to add additional metadata to the records returned from a device. For example, if you were collecting data from multiple devices you might want to “tag” records with a user-specific unique identifier:

import pipecat.utility
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
pipe = pipecat.filter.keep(pipe, key="id", value="GPGLL")
pipe = pipecat.utility.add_field(pipe, "serial", "1237V")
for record in pipe:
    pipecat.record.dump(record)
active: True
id: GPGLL
latitude: 35.1949926667 degree
longitude: -106.7111135 degree
serial: 1237V
time: 164252

active: True
id: GPGLL
latitude: 35.1952843333 degree
longitude: -106.710192667 degree
serial: 1237V
time: 164257

active: True
id: GPGLL
latitude: 35.1956116667 degree
longitude: -106.709064 degree
serial: 1237V
time: 164303

Now let’s consider calculating some simple statistics, such as our average speed on a trip. When we iterate over the contents of a pipe using a for loop, we receive one record at-a-time until the pipe is empty. We could keep track of a “running” average during iteration, and there are use-cases where that is the best way to solve the problem. However, for moderately-sized data, Pipecat provides a more convenient approach:

import pipecat.store
pipe = pipecat.udp.receive(address=("0.0.0.0", 7777), maxsize=1024)
pipe = pipecat.device.gps.nmea(pipe, key="message")
pipe = pipecat.store.cache(pipe)
for record in pipe:
    pass
print(pipe.table["speed"])
[  0.     0.    10.59  17.96   4.39  24.14  30.65  33.59  33.28  32.85  34.08  34.78  35.28  34.66  34.46  34.1   34.64  34.41  33.88  33.75  34.7   34.81  34.87  33.51  33.71  35.38  32.09  28.94  18.     0.     1.19  21.23  31.92  33.55  34.91  34.78  33.75  32.71  31.67  31.14  31.45  31.94  31.16  32.27  35.46  35.34  34.06  33.82  34.91  34.72  34.83  34.95  33.38  33.08  27.39   5.21   0.     2.45  22.68  33.1   33.8  34.64  33.96  34.37  34.81  32.75  29.55  21.71  13.8   14.48  27.29  25.21  11.68  13.86   9.16   0.  ] knot

Here, pipecat.store.cache() creates an in-memory cache that stores every record it receives. We have a do-nothing for loop that reads data from the charger to populate the cache. Once that’s complete, we can use the cache table attribute to retrieve data from the cache using the same keys and syntax we would use with a record. Unlike a record, the cache returns every value for a given key at once (using a Numpy array), which makes it easy to compute the statistics we’re interested in:

print("Average speed:", pipe.table["speed"].mean().to(pipecat.units.mph))
Average speed: 30.8533055116 mph

Consolidating fields using the cache is also perfect for generating plots with a library like Toyplot (http://toyplot.readthedocs.io):

import toyplot

canvas = toyplot.Canvas(width=600, height=400)
axes = canvas.cartesian(grid=(2, 1, 0), xlabel="Record #", ylabel="Speed (MPH)")
axes.plot(pipe.table["speed"].to(pipecat.units.mph))
axes = canvas.cartesian(grid=(2, 1, 1), xlabel="Record #", ylabel="Track")
axes.plot(pipe.table["track"]);
0255075Record #010203040Speed (MPH)0255075Record #0100200300Track

Note that nothing prevents us from doing useful work in the for loop that populates the cache, and nothing prevents us from accessing the cache within the loop. For example, we might want to display field values from individual records alongside a running average computed from the cache. Or we might want to update our plot periodically as the loop progresses.

_images/pipecat.png

Contributing

Even if you’re not in a position to contribute code to Pipecat, there are many ways you can help the project out:

  • Send the names of serial ports that work with a device on your platform, so we can document it.
  • Let us know if one of our existing devices works with some other model or brand of hardware, so we can document that.
  • Send us a file containing sample output from your device.
  • Tell us if our code doesn’t work with your device.
  • Spread the word!

Getting Started

If you haven’t already, you’ll want to get familiar with the Pipecat repository at http://github.com/shead-custom-design/pipecat … there, you’ll find the Pipecat sources, issue tracker, and wiki.

Next, you’ll need to install Pipecat’s Dependencies. Then, you’ll be ready to get Pipecat’s source code and use setuptools to install it. To do this, you’ll almost certainly want to use “develop mode”. Develop mode is a a feature provided by setuptools that links the Pipecat source code into the install directory instead of copying it … that way you can edit the source code in your git sandbox, and you don’t have to re-install it to test your changes:

$ git clone https://github.com/sandialabs/pipecat.git
$ cd pipecat
$ python setup.py develop

Versioning

Pipecat version numbers follow the Semantic Versioning standard.

Coding Style

The Pipecat source code follows the PEP-8 Style Guide for Python Code.

Running Regression Tests

To run the Pipecat test suite, simply run regression.py from the top-level source directory:

$ cd pipecat
$ python regression.py

The tests will run, providing feedback on successes / failures.

Test Coverage

When you run the test suite with regression.py, it also automatically generates code coverage statistics. To see the coverage results, open .cover/index.html in a web browser.

Building the Documentation

To build the documentation, run:

$ cd pipecat
$ python docs/setup.py

Note that significant subsets of the documentation are written using Jupyter notebooks, so the docs/setup.py script requires Jupyter to convert the notebooks into restructured text files for inclusion with the rest of the documentation.

Once the documentation is built, you can view it by opening docs/_build/html/index.html in a web browser.

_images/pipecat.png

Release Notes

Pipecat 0.3.0 - May 27th, 2017

  • Added support for motion capture on iOS devices.
  • pipecat.record.dump() immediately flushes its output.
  • Add sample GPS data.
  • Set the Pint application registry when reading records with pipecat.store.pickle.read()
  • Added support for OBD-II devices.
  • Add example limits to the battery charger section of the user guide.
  • Clarify the different between client and listener addresses for pipecat.udp.receive()
  • Stub-out the GPS receiver section of the user guide.
  • Add a new module for filtering records.
  • Update testing code and expand the section on GPS receivers in the user guide.
  • Ensure that pipecat.limit shuts-down upstream sources when terminated.
  • Shutdown upstream generators for real this time.
  • Handle KeyboardInterrupt gracefully within pipecat.limit.
  • Add support for individual cell voltages in pipecat.device.charger.icharger_208b().
  • Add a sample executable that monitors battery charger status, pipecat-charger-status.
  • Add pipecat.device.serial.readline() for reliably retrieving lines of text from a serial port.
  • Make all threads daemon threads so callers can shutdown normally.
  • charging-status handles the case where no data has been received.
  • Make the poll rate configurable for pipecat.device.serial.readline().
  • Include the charger mode in charging-status output.
  • Add functionality to retrieve data using HTTP requests.
  • Add functionality to parse XML data.
  • Add functionality to parse METAR weather data.
  • Create a sample weather-monitoring application, pipecat-wind-status.
  • Added pipecat.filter.duplicates().
  • Add an option to clear pipecat.store.Table so caches can be emptied.
  • Improve the logic for extracting METAR data from XML.
  • Add exception handling to pipecat.xml.parse() and pipecat.http.get().
  • Added separate loggers for pipecat.http and pipecat.xml.

Pipecat 0.2.0 - December 28th, 2016

  • Improved documentation.
  • Added sample charger data.
  • Consolidated redundant file-management code.

Pipecat 0.1.0 - December 14th, 2016

  • Initial Release
_images/pipecat.png

API Reference

Contents:

pipecat module

Elegant, flexible data logging in Python for connected sensors and instruments.

pipecat.connect module

Functions for connecting data sources.

pipecat.connect.concatenate(sources)

Concatenate records from multiple sources.

Yields all of the records from the first source, then all the records from the second source, and-so-on until every source has been consumed. Note that this means that it only makes sense to use sources that return a bounded number of records with concatenate()!

Parameters:sources (sequence of Record Generators, required)
Yields:record (dict)
pipecat.connect.multiplex(sources)

Interleave records from multiple sources.

Yields records from all sources in the order that they arrive.

Parameters:sources (sequence of Record Generators, required)
Yields:record (dict)

pipecat.device module

Data sources for specific hardware devices.

See sub-modules of this module for different classes of hardware device supported by Pipecat:

pipecat.device.auto module

Data sources that retrieve on-board diagnostics from an automobile using OBD-II.

pipecat.device.auto.obd(connection, commands=None, rate=<Quantity(5, 'second')>)

Retrieve OBD-II data from an automobile.

This component requires the Python-OBD module (http://python-obd.readthedocs.io).

Parameters:
  • connection (obd.OBD instance, required.)
  • rate (time quantity, required) – Rate at which data will be retrieved.
Yields:

record (dict) – Records will contain OBD-II data retrieved from an automobile computer.

pipecat.device.charger module

Data sources that retrieve information from battery chargers.

pipecat.device.charger.icharger208b(source, key)

Parse data from an iCharger 208B battery charger.

Parses data events emitted by the charger during charge, discharge, etc. Likely works with other models from iCharger, but this is untested. Consider Contributing to let us know.

This model battery charger comes with a USB cable that provides serial-over-USB communication with the host computer. To connect with the charger, you’ll need to open a handle to the appropriate serial port, the name of which will vary between platforms and based on the number of devices currently connected, and feed lines of text to the parser.

Examples

Open a serial port on a Mac OSX computer using pySerial, read lines from the serial port, parse them into records, and print them to stdout:

>>> pipe = serial.serial_for_url("/dev/cu.SLAB_USBtoUART", baudrate=128000)
>>> pipe = pipecat.utility.readline(pipe)
>>> pipe = pipecat.device.charger.icharger208b(pipe, key="line")
>>> for record in pipe:
...   print record
Parameters:
  • source (Record generator returning records containing a “string” field.)
  • key (Record key, optional) – The key in incoming records containing charger data.
Yields:

record (dict) – Records will contain information including the charge mode, supply voltage, battery voltage, battery current, internal and external charger temperature, and the total charge added-to / removed-from the battery.

pipecat.device.clock module

Data sources that retrieve information from clocks.

pipecat.device.clock.metronome(rate=<Quantity(1.0, 'second')>)

Generate an empty record at fixed time intervals using the host clock.

Typically, you would use functions such as pipecat.utility.add_field() or pipecat.utility.add_timestamp() to populate the (otherwise empty) records.

Examples

If you want to know what time it is, at 5-minute intervals:

>>> pipe = pipecat.device.clock.metronome(pipecat.quantity(5, pipecat.units.minutes))
>>> pipe = pipecat.utility.add_timestamp(pipe)
>>> for record in pipe:
...   print record
Parameters:rate (time quantity, required) – The amount of time to wait between records.
Yields:record (dict) – Empty record returned at fixed time intervals.

pipecat.device.gps module

Functions for working with GPS receivers.

pipecat.device.gps.nmea(source, key)

Parse NMEA messages from raw strings.

Examples

Parameters:
  • source (Record generator returning records containing a “string” field.)
  • key (Record key, required) – The key in incoming records to parse as NMEA.
Yields:

record (dict) – Records will contain multiple fields containing time, position, speed, heading, pitch, roll, and quality information based on device sending the data. Support is provided for GPGGA, GPGLL, GPRMC, GPTXT, HCHDG, and PASHR messages.

pipecat.device.motion module

Data sources that retrieve motion information from portable devices such as smartphones.

pipecat.device.motion.ios(rate=<Quantity(1, 'second')>)

Retrieve motion information from an iOS device.

This component requires the motion module provided by Pythonista.

Parameters:rate (time quantity, required) – Rate at which motion data will be retrieved.
Yields:record (dict) – Records will contain information including the current acceleration due to gravity and the user, along with device attitude.

pipecat.device.serial module

Functions for working with serial ports.

pipecat.device.serial.readline(*args, **kwargs)

Reliably read lines from a serial port.

Accepts the same parameters as serial.serial_for_url(), plus the following:

Parameters:poll (time quantity, optional) – Time to wait between failures.
Yields:record (dict) – Records will contain each line of text read from the port.

pipecat.device.weather module

Data sources that decode weather information.

pipecat.device.weather.metars(source, key='xml')

Parse METAR information retrieved from https://aviationweather.gov/metar.

Parameters:
Yields:

record (dict) – Records will contain METAR data extracted from XML.

pipecat.filter module

Functions to filter records as they pass through a Pipecat pipe.

pipecat.filter.duplicates(source, key)

Discard records unless the given key value changes.

Parameters:
  • source (Record generator, required)
  • key (string or tuple-of-strings, required) – Records will be discarded unless this key value changes.
pipecat.filter.keep(source, key=None, value=None)

Discard all records that don’t match the given criteria

Examples

Discard any record that doesn’t have an ‘id’ key with value “GPGGA”:

>>> pipe = # Defined elsewhere
>>> pipe = pipecat.filter.keep(pipe, key="id", value="GPGGA")
>>> for record in pipe:
...     print record
Parameters:
  • source (Record generator, required)
  • key (string or tuple-of-strings, optional) – Records must contain this key or they will be discarded.
  • value (optional.) – Records with field ‘key’ must match this value or they will be discarded.

pipecat.http module

Functions for working with HTTP requests.

pipecat.http.get(*args, **kwargs)

Retrieve data using HTTP requests.

Accepts the same parameters as requests.get(), plus the following:

Parameters:poll (time quantity, optional.) – Time to wait between requests.
Yields:record (dict) – Records will contain status, (header, key), encoding, and body keys containing the results returned from each request.
pipecat.http.receive(address, include_body=True, include_client=False, include_method=False, include_path=False, include_version=False)

Receives data sent using HTTP requests.

Provides a simple HTTP server that receives client requests, converting each request into Pipecat records.

Parameters:address ((host, port) tuple, required) – TCP address and IP port to be bound for listening to requests.
Yields:record (dict) – Records will contain client, version, method, path, and body keys containing the content of each request.

pipecat.json module

Functions for working with JSON data.

pipecat.json.parse(source, key='string', keyout='json')

Parse JSON data from records.

This filter parses an incoming record key as JSON, appending the JSON data to the output.

Parameters:
  • source (Record generator, required)
  • key (Record key, optional) – The key in incoming records to parse as JSON.
  • keyout (Record key, optional) – The key in outgoing records where the parsed JSON will be stored.
Yields:

record (dict) – Input records with an additional keyout field containing JSON-compatible data.

pipecat.limit module

Functions to limit the output of a Pipecat pipeline.

pipecat.limit.count(source, count, name=None)

Limits the number of records returned from a source.

Examples

Produce seven records at one-second intervals:

>>> pipe = pipecat.device.clock.metronome()
>>> pipe = pipecat.limit.count(pipe, count=7)
>>> for record in pipe:
...     print record
Parameters:
  • source (Record generator, required)
  • count (int, required) – The number of records that will be returned from source.
  • name (string, optional.) – Optional name for this Record generator to use in log output. Defaults to the function name.
pipecat.limit.duration(source, duration, timeout=<Quantity(0.1, 'second')>, name=None)

Return records from a source until a fixed time duration has expired.

Examples

Produce records at one-second intervals for three minutes:

>>> pipe = pipecat.device.clock.metronome()
>>> pipe = pipecat.limit.duration(pipe, pipecat.quantity(3, pipecat.units.minutes))
>>> for record in pipe:
...     print record
Parameters:
  • source (Record generator, required)
  • duration (time quantity, required) – Maximum amount of time that records will be returned from source.
  • timeout (time quantity, optional.) – Limits the amount of time to block while waiting for output from source. This affects the accuracy of when the function exits.
  • name (string, optional.) – Optional name for this Record generator to use in log output. Defaults to the function name.
pipecat.limit.timeout(source, timeout, initial=<Quantity(1, 'hour')>, name=None)

Return records from another source until they stop arriving.

Parameters:
  • source (Record generator, required)
  • timeout (time quantity, required) – Maximum time to wait for the next record before exiting.
  • initial (time quantity, optional) – Maximum time to wait for the first record.
  • name (string, optional.) – Optional name for this Record generator to use in log output. Defaults to the function name.
pipecat.limit.until(source, key, value, name=None)

Return records from another source until a record occurs with a specific key and value.

Examples

Print output from a battery charger until its mode changes to “finished”:

>>> pipe = <battery charger pipeline>
>>> pipe = pipecat.limit.until(pipe, "mode", "finished")
>>> for record in pipe:
...     print record
Parameters:
  • source (Record generator, required)
  • key (Record key, required)
  • value (anything, required)
  • name (string, optional.) – Optional name for this Record generator to use in log output. Defaults to the function name.

pipecat.queue module

Functions for working with queues.

pipecat.queue.receive(queue)

Receive records from a queue.

Queues are a handy mechanism for passing data between threads. Use this function to receive records sent over a queue by pipecat.queue.send().

Parameters:queue (queue.Queue, required)
Yields:record (dict)
pipecat.queue.send(source, queue, shutdown=None)

Send records from a source to a queue.

Queues are a handy mechanism for passing data between threads. Use this function to send records over a queue to pipecat.queue.receive().

Parameters:

pipecat.record module

Functions for manipulating data records.

pipecat.record.add_field(record, key, value)

Add a key-value pair to a record.

Parameters:
  • record (dict, required) – Dictionary of key-value pairs that constitute a record.
  • key (Record key, required) – Record key to be overwritten.
  • value (object) – New record value.
pipecat.record.dump(record, fobj=<open file '<stdout>', mode 'w'>)

Dump a human-readable text representation of a record to a file-like object.

Parameters:
  • record (dict, required) – Dictionary of key-value pairs to be written-out.
  • fobj (file-like object, optional)
pipecat.record.remove_field(record, key)

Remove a key-value pair from a record.

Parameters:
  • record (dict, required) – Dictionary of key-value pairs that constitute a record.
  • key (Record key, required) – Record key to be removed.

pipecat.store module

Functions for storing data.

class pipecat.store.Cache(source)

Bases: object

Cache records in memory for column-oriented access.

next()
table
class pipecat.store.Table

Bases: object

append(record)
items()
keys()
reset()
values()
pipecat.store.cache(source)

Create an in-memory cache for records.

Parameters:source (generator, required) – A source of records to be cached.
Returns:cache
Return type:instance of pipecat.store.Cache.

pipecat.store.csv module

Functions for reading and writing CSV data.

pipecat.store.csv.write(source, fobj)

Append records to a CSV file.

pipecat.store.pickle module

Functions for reading and writing data using Python pickle files.

Warning

Pickle files written using Python 2 cannot be read using Python 3, and vice-versa.

pipecat.store.pickle.read(fobj)

Read records from a pickle file.

pipecat.store.pickle.write(source, fobj)

Append records to a pickle file.

pipecat.udp module

Functions for working with UDP messages.

pipecat.udp.receive(address, maxsize)

Receive messages from a UDP socket.

Parameters:
  • address ((host, port) tuple, required) – TCP address and IP port to be bound for listening for UDP packets.
  • maxsize (int, required) – Maximum length of packets returned from the UDP socket.
Yields:

record (dict) – Records will contain a client field with the address of the sending client, and a message field containing the content of the message.

pipecat.utility module

Convenience functions for working with data sources.

pipecat.utility.add_field(source, key, value)

Adds a key-value pair to every record returned from a source.

Parameters:
Yields:

record (dict) – Input records containing an additional field key with value value.

pipecat.utility.add_timestamp(source, key='timestamp')

Add a timestamp to every record returned from a source.

Parameters:
Yields:

record (dict) – Input records containing an additional field key with a arrow.arrow.Arrow UTC timestamp value.

pipecat.utility.extract_quantities(source, value='value', units='units')

Convert values with separate magnitude / units data into pipecat.quantity instances.

If a field value is a dict containing value and units keys, it will be replaced with a matching pipecat.quantity.

Parameters:
Yields:

record (dict) – Input records with field values converted to pipecat.quantity instances.

pipecat.utility.promote_field(source, key)

Promote key-value pairs from one field into their own fields.

The field key must exist and contain a dict or dict-like object.

Parameters:
Yields:

record (dict) – Input records containing additional fields promoted from the value stored in key.

pipecat.utility.readline(fobj, encoding='utf-8')

Extract lines from a file or file-like object.

Parameters:fobj (file-like object, required) – This could be an open file, instance of io.StringIO, a serial connection, or any other object from which lines of text can be read.
Yields:record (dict) – Records containing a “string” field that stores one line of text.
pipecat.utility.remove_field(source, key)

Removes a key-value pair from every record returned by a source.

Records that don’t contain key are returned unmodified (this is not an error condition).

Parameters:
Yields:

record (dict) – Input records with the key field removed.

pipecat.utility.split_keys(source, delimiter)

Convert flat keys into hierarchical keys using a delimiter.

This is useful working with data such as JSON that can’t represent hierarchical keys explicitly.

Parameters:
Yields:

record (dict) – Input records with keys containing the delimiter converted into hierarchical keys.

pipecat.utility.trace(source, name=None)

Log the behavior of a source for debugging.

Use pipecat.utility.trace() to log the behavior of another record generator, including the records it generates, any exceptions thrown, and notifications when it starts and finishes. Records are passed through this function unmodified, so it can be inserted into an existing pipe without altering its behavior.

Parameters:source (Record generator, required)
Yields:record (dict) – Unmodified input records.

pipecat.xml module

Functions for working with XML data.

pipecat.xml.parse(source, key='string', keyout='xml')

Parse XML data from a record.

Parameters:
  • source (Record generator, required)
  • key (Record key, optional) – Input key containing a string to be parsed as XML.
  • keyout (Record key, optional) – Output key where the parsed XML DOM will be stored.
Yields:

record (dict) – Input records with a new field key containing an XML DOM.

_images/pipecat.png

Support

The Pipecat documentation:

Visit our GitHub repository for access to source code, issue tracker, and the wiki:

We also have a continuous integration server that runs the Pipecat regression test suite anytime changes are committed to GitHub:

And here are our test coverage stats, also updated automatically when modifications are committed:

For Pipecat questions, comments, or suggestions, get in touch with the team at:

Otherwise, you can contact Tim directly:

_images/pipecat.png

Credits

Included in Pipecat

“FOSS Data-Logging for Junsi iCharger”, https://apollo.open-resource.org/mission:log:2012:10:06:free-and-open-source-junsi-icharger-data-logger, accessed December, 2016.

Influential Reading

D. Beazley, “Generator Tricks For Systems Programmers Version 2.0,” http://www.dabeaz.com/generators-uk/index.html, accessed December, 2016.

Indices and tables