pypelined - stream and pipeline processing service¶
pypelined Service Templates¶
The pypelined
package is ready for use as a daemon, but does not attempt to provide definitions for every system service.
Depending on the target system, adjustments for deployment into virtual environments and similar are required.
systemd¶
# /etc/systemd/system/pypelined@.service
[Unit]
Description=stream and pipeline processing service
Documentation=https://pypi.python.org/pypi/pypelined
[Service]
Type=simple
Restart=on-failure
ExecStart=/usr/bin/python -m pypelined /etc/pypelined/%i*.py
User=daemon
Nice=-19
[Install]
WantedBy=multi-user.target
DefaultInstance=default
pypelined Changelog¶
Beta Releases¶
v0.1.2 2017-07-20¶
Bugfixes
AliceApMonBackend: added guard against invalid ApMon INSTANCE_ID
AliceApMonBackend: directly forwarding raw reports
v0.1.1 2017-06-30¶
Bugfixes
DFSCounter: tweaked acquisition to avoid deadlocks and stale counters
Debug output removed
Missing dependencies added
v0.1.0 2017-??-??¶
New Features
Basic pypelined core in working condition
pypelined¶
pypelined package¶
Subpackages¶
pypelined.conf package¶
-
pypelined.conf.
pipelines
= []¶ pipelines to run
Submodules¶
pypelined.consumer package¶
pypelined.provider package¶
pypelined.utilities package¶
-
pypelined.utilities.
safe_eval
(literal)¶ Evaluate a literal value or fall back to string
Safely performs the evaluation of a literal. If the literal is not valid, it is assumed to be a regular string and returned unchanged.
Parameters: literal (str) – literal to evaluate, e.g. “1.0” or “{‘foo’: 3}” Returns: evaluated or original literal
The pypelined
service and framework lets you build and deploy iterative processing pipelines.
Using generator/coroutines with the chainlet
library, it is trivial to create pipelines to fetch, process and transform streams of data.
Configuration files are written using pure Python, allowing for maximum customization:
# this is a pure python configuration file
from chainlet import funclet, filterlet
from pypelined.conf import pipelines
# new pipeline processing element as simple python function
@funclet
def add_time(chunk):
chunk['tme'] = time.time()
return chunk
# new pipeline receiving process monitoring reports, modifying them, and sending them to another service
process_chain = Socket(10331) >> decode_json() >> filterlet(lambda value: value.get('rcode') == 0) >> \
add_time() >> Telegraf(address=('localhost', 10332), name='valid_processes')
# add pipeline for deployment
pipelines.append(process_chain)
python -m pypelined myconfig.py
Contributing and Feedback¶
The project is hosted on github.
If you have issues or suggestion, check the issue tracker:
For direct contributions, feel free to fork the development branch and open a pull request.