Introduction
The primary use cases of eventkit are
to send events between loosely coupled components;
to compose all kinds of event-driven data pipelines.
The interface is kept as Pythonic as possible, with familiar names from Python and its libraries where possible. For scheduling asyncio is used and there is seamless integration with it.
See the examples and the introduction notebook to get a true feel for the possibilities.
Installation
pip3 install eventkit
Python version 3.6 or higher is required.
Examples
Create an event and connect two listeners
import eventkit as ev
def f(a, b):
print(a * b)
def g(a, b):
print(a / b)
event = ev.Event()
event += f
event += g
event.emit(10, 5)
Create a simple pipeline
import eventkit as ev
event = (
ev.Sequence('abcde')
.map(str.upper)
.enumerate()
)
print(event.run()) # in Jupyter: await event.list()
Output:
[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]
Create a pipeline to get a running average and standard deviation
import random
import eventkit as ev
source = ev.Range(1000).map(lambda i: random.gauss(0, 1))
event = source.array(500)[ev.ArrayMean, ev.ArrayStd].zip()
print(event.last().run()) # in Jupyter: await event.last()
Output:
[(0.00790957852672618, 1.0345673260655333)]
Combine async iterators together
import asyncio
import eventkit as ev
async def ait(r):
for i in r:
await asyncio.sleep(0.1)
yield i
async def main():
async for t in ev.Zip(ait('XYZ'), ait('123')):
print(t)
asyncio.get_event_loop().run_until_complete(main()) # in Jupyter: await main()
Output:
('X', '1')
('Y', '2')
('Z', '3')
Real-time video analysis pipeline
self.video = VideoStream(conf.CAM_ID)
scene = self.video | FaceTracker | SceneAnalyzer
lastScene = scene.aiter(skip_to_last=True)
async for frame, persons in lastScene:
...
Distributed computing
The distex library provides a
poolmap
extension method to put multiple cores or machines to use:
from distex import Pool
import eventkit as ev
import bz2
pool = Pool()
# await pool # un-comment in Jupyter
data = [b'A' * 1000000] * 1000
pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()
print(pipe.run()) # in Jupyter: print(await pipe)
pool.shutdown()
Inspired by:
Documentation
The complete API documentation.
eventkit
Release 1.0.2.
Event
- class eventkit.event.Event(name='', _with_error_done_events=True)[source]
Enable event passing between loosely coupled components. The event emits values to connected listeners and has a selection of operators to create general data flow pipelines.
- Parameters:
name (
str
) – Name to use for this event.
- __await__()[source]
Asynchronously await the next emit of an event:
async def coro(): args = await event ...
If the event does an empty
emit()
, then the value ofargs
is set toutil.NO_VALUE
.wait()
and__await__()
are each other’s inverse.
- async __aiter__(skip_to_last=False, tuples=False)
Synonym for
aiter()
with default arguments:async def coro(): async for args in event: ...
aiterate()
and__aiter__()
are each other’s inverse.
-
error_event:
Optional
[Event
] Sub event that emits errors from this event as
emit(source, exception)
.
- done()[source]
True
if event has ended with no more emits coming,False
otherwise.- Return type:
bool
- connect(listener, error=None, done=None, keep_ref=False)[source]
Connect a listener to this event. If the listener is added multiple times then it is invoked just as many times on emit.
The
+=
operator can be used as a synonym for this method:import eventkit as ev def f(a, b): print(a * b) def g(a, b): print(a / b) event = ev.Event() event += f event += g event.emit(10, 5)
- Parameters:
listener – The callback to invoke on emit of this event. It gets the
*args
from an emit as arguments. If the listener is a coroutine function, or a function that returns an awaitable, the awaitable is run in the asyncio event loop.error – The callback to invoke on error of this event. It gets (this event, exception) as two arguments.
done – The callback to invoke on ending of this event. It gets this event as single argument.
keep_ref (
bool
) –True
: A strong reference to the callable is keptFalse
: If the callable allows weak refs and it is garbage collected, then it is automatically disconnected from this event.
- Return type:
- disconnect(listener, error=None, done=None)[source]
Disconnect a listener from this event.
The
-=
operator can be used as a synonym for this method.- Parameters:
listener – The callback to disconnect. The callback is removed at most once. It is valid if the callback is already not connected.
error – The error callback to disconnect.
done – The done callback to disconnect.
- disconnect_obj(obj)[source]
Disconnect all listeners on the given object. (also the error and done listeners).
- Parameters:
obj – The target object that is to be completely removed from this event.
- emit(*args)[source]
Emit a new value to all connected listeners.
- Parameters:
args – Argument values to emit to listeners.
- emit_threadsafe(*args)[source]
Threadsafe version of
emit()
that doesn’t invoke the listeners directly but via the event loop of the main thread.
- run()[source]
Start the asyncio event loop, run this event to completion and return all values as a list:
import eventkit as ev ev.Timer(0.25, count=10).run() -> [0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5]
- Return type:
List
Note
When running inside a Jupyter notebook this will give an error that the asyncio event loop is already running. This can be remedied by applying nest_asyncio or by using the top-level
await
statement of Jupyter:await event.list()
- pipe(*targets)[source]
Form several events into a pipe:
import eventkit as ev e1 = ev.Sequence('abcde') e2 = ev.Enumerate().map(lambda i, c: (i, i + ord(c))) e3 = ev.Star().pluck(1).map(chr) e1.pipe(e2, e3) # or: ev.Event.Pipe(e1, e2, e3) -> ['a', 'c', 'e', 'g', 'i']
- Parameters:
targets (
Event
) – One or more Events that have no source yet, orEvent
constructors that needs no arguments.
- fork(*targets)[source]
Fork this event into one or more target events. Square brackets can be used as a synonym:
import eventkit as ev ev.Range(2, 5)[ev.Min, ev.Max, ev.Sum].zip() -> [(2, 2, 2), (2, 3, 5), (2, 4, 9)]
The events in the fork can be combined by one of the join methods of
Fork
.- Parameters:
targets (
Event
) – One or more events that have no source yet, orEvent
constructors that need no arguments.- Return type:
Fork
- async aiter(skip_to_last=False, tuples=False)[source]
Create an asynchronous iterator that yields the emitted values from this event:
async def coro(): async for args in event.aiter(): ...
__aiter__()
is a synonym foraiter()
with default arguments,- Parameters:
skip_to_last (
bool
) –True
: Backlogged source values are skipped over to yield only the latest value. Can be used as a slipper clutch between a source that produces too fast and the handling that can’t keep up.False
: All events are yielded.
tuples (
bool
) –True
: Always yield arguments as a tuple.False
: Unpack single argument tuples.
- static init(obj, event_names)[source]
Convenience function for initializing multiple events as members of the given object.
- Parameters:
event_names (
Iterable
) – Names to use for the created events.
- static create(obj)[source]
Create an event from a async iterator, awaitable, or event constructor without arguments.
- Parameters:
obj – The source object. If it’s already an event then it is passed as-is.
- static wait(future)[source]
Create a new event that emits the value of the awaitable when it becomes available and then set this event done.
wait()
and__await__()
are each other’s inverse.- Parameters:
future (
Awaitable
) – Future to wait on.- Return type:
Wait
- static aiterate(ait)[source]
Create a new event that emits the yielded values from the asynchronous iterator.
The asynchronous iterator serves as a source for both the time and value of emits.
aiterate()
and__aiter__()
are each other’s inverse.- Parameters:
ait (
AsyncIterable
) –The asynchronous source iterator. It must
await
at least once; If necessary use:await asyncio.sleep(0)
- Return type:
Aiterate
- static sequence(values, interval=0, times=None)[source]
Create a new event that emits the given values. Supply at most one
interval
ortimes
.- Parameters:
values (
Iterable
) – The source values.interval (
float
) – Time interval in seconds between values.times (
Optional
[Iterable
[float
]]) – Relative times for individual values, in seconds since start of event. The sequence should matchvalues
.
- Return type:
Sequence
- static repeat(value=<NoValue>, count=1, interval=0, times=None)[source]
Create a new event that repeats
value
a number ofcount
times.- Parameters:
value – The value to emit.
count – Number of times to emit.
interval (
float
) – Time interval in seconds between values.times (
Optional
[Iterable
[float
]]) – Relative times for individual values, in seconds since start of event. The sequence should matchvalues
.
- Return type:
Repeat
- static range(*args, interval=0, times=None)[source]
Create a new event that emits the values from a range.
- Parameters:
args – Same as for built-in
range
.interval (
float
) – Time interval in seconds between values.times (
Optional
[Iterable
[float
]]) – Relative times for individual values, in seconds since start of event. The sequence should match the range.
- Return type:
Range
- static timerange(start=0, end=None, step=1)[source]
Create a new event that emits the datetime value, at that datetime, from a range of datetimes.
- Parameters:
start –
Start time, can be specified as:
datetime.datetime
.datetime.time
: Today is used as date.int
orfloat
: Number of seconds relative to now. Values will be quantized to the given step.
end –
End time, can be specified as:
datetime.datetime
.datetime.time
: Today is used as date.None
: No end limit.
step – Number of seconds, or
datetime.timedelta
, to space between values.
- Return type:
Timerange
- static timer(interval, count=None)[source]
Create a new timer event that emits at regularly paced intervals the number of seconds since starting it.
- Parameters:
interval (
float
) – Time interval in seconds between emits.count (
Optional
[int
]) – Number of times to emit, orNone
for no limit.
- Return type:
Timer
- static marble(s, interval=0, times=None)[source]
Create a new event that emits the values from a Rx-type marble string.
- Parameters:
s (
str
) – The string with characters that are emitted.interval (
float
) – Time interval in seconds between values.times (
Optional
[Iterable
[float
]]) – Relative times for individual values, in seconds since start of event. The sequence should match the marble string.
- Return type:
Marble
- filter(predicate=<class 'bool'>)[source]
For every source value, apply predicate and re-emit when True.
- Parameters:
predicate – The function to test every source value with. The default is to test the general truthiness with
bool()
.- Return type:
Filter
- skip(count=1)[source]
Drop the first
count
values from source and follow the source after that.- Parameters:
count (
int
) – Number of source values to drop.- Return type:
Skip
- take(count=1)[source]
Re-emit first
count
values from the source and then end.- Parameters:
count (
int
) – Number of source values to re-emit.- Return type:
Take
- takewhile(predicate=<class 'bool'>)[source]
Re-emit values from the source until the predicate becomes False and then end.
- Parameters:
predicate – The function to test every source value with. The default is to test the general truthiness with
bool()
.- Return type:
TakeWhile
- dropwhile(predicate=<function Event.<lambda>>)[source]
Drop source values until the predicate becomes False and after that re-emit everything from the source.
- Parameters:
predicate – The function to test every source value with. The default is to test the inverted general truthiness.
- Return type:
DropWhile
- takeuntil(notifier)[source]
Re-emit values from the source until the
notifier
emits and then end. If the notifier ends without any emit then keep passing source values.- Parameters:
notifier (
Event
) – Event that signals to end this event.- Return type:
TakeUntil
- constant(constant)[source]
On emit of the source emit a constant value:
emit(value) -> emit(constant)
- Parameters:
constant – The constant value to emit.
- Return type:
Constant
- iterate(it)[source]
On emit of the source, emit the next value from an iterator:
emit(a, b, ...) -> emit(next(it))
The time of events follows the source and the values follow the iterator.
- Parameters:
it – The source iterator to use for generating values. When the iterator is exhausted the event is set to be done.
- Return type:
Iterate
- count(start=0, step=1)[source]
Count and emit the number of source emits:
emit(a, b, ...) -> emit(count)
- Parameters:
start – Start count.
step – Add count by this amount for every new source value.
- Return type:
Count
- enumerate(start=0, step=1)[source]
Add a count to every source value:
emit(a, b, ...) -> emit(count, a, b, ...)
- Parameters:
start – Start count.
step – Increase by this amount for every new source value.
- Return type:
Enumerate
- timestamp()[source]
Add a timestamp (from time.time()) to every source value:
emit(a, b, ...) -> emit(timestamp, a, b, ...)
The timestamp is the float number in seconds since the midnight Jan 1, 1970 epoch.
- Return type:
Timestamp
- partial(*left_args)[source]
Pad source values with extra arguments on the left:
emit(a, b, ...) -> emit(*left_args, a, b, ...)
- Parameters:
left_args – Arguments to inject.
- Return type:
Partial
- partial_right(*right_args)[source]
Pad source values with extra arguments on the right:
emit(a, b, ...) -> emit(a, b, ..., *right_args)
- Parameters:
right_args – Arguments to inject.
- Return type:
PartialRight
- star()[source]
Unpack a source tuple into positional arguments, similar to the star operator:
emit((a, b, ...)) -> emit(a, b, ...)
star()
andpack()
are each other’s inverse.- Return type:
Star
- pack()[source]
Pack positional arguments into a tuple:
emit(a, b, ...) -> emit((a, b, ...))
star()
andpack()
are each other’s inverse.- Return type:
Pack
- pluck(*selections)[source]
Extract arguments or nested properties from the source values.
Select which argument positions to keep:
emit(a, b, c, d).pluck(1, 2) -> emit(b, c)
Re-order arguments:
emit(a, b, c).pluck(2, 1, 0) -> emit(c, b, a)
To do an empty emit leave
selections
empty:emit(a, b).pluck() -> emit()
Select nested properties from positional arguments:
emit(person, account).pluck( '1.number', '0.address.street') -> emit(account.number, person.address.street)
If no value can be extracted then
NO_VALUE
is emitted in its place.- Parameters:
selections (
Union
[int
,str
]) – The values to extract.- Return type:
Pluck
- map(func, timeout=None, ordered=True, task_limit=None)[source]
Apply a sync or async function to source values using positional arguments:
emit(a, b, ...) -> emit(func(a, b, ...))
or if
func
returns an awaitable then it will be awaited:emit(a, b, ...) -> emit(await func(a, b, ...))
In case of timeout or other failure,
NO_VALUE
is emitted.- Parameters:
func – The function or coroutine constructor to apply.
timeout – Timeout in seconds since coroutine is started
ordered –
True
: The order of emitted results preserves the order of the source values.False
: Results are in order of completion.
task_limit – Max number of concurrent tasks, or None for no limit.
- Return type:
Map
timeout
,ordered
andtask_limit
apply to async functions only.
- emap(constr, joiner)[source]
Higher-order event map that creates a new
Event
instance for every source value:emit(a, b, ...) -> new Event constr(a, b, ...)
- Parameters:
constr – Constructor function for creating a new event. Apart from returning an
Event
, the constructor may also return an awaitable or an asynchronous iterator, in which case anEvent
will be created.joiner (
AddableJoinOp
) – Join operator to combine the emits of nested events.
- Return type:
Emap
- mergemap(constr)[source]
emap()
that usesmerge()
to combine the nested events:marbles = [ 'A B C D', '_1 2 3 4', '__K L M N'] ev.Range(3).mergemap(lambda v: ev.Marble(marbles[v])) -> ['A', '1', 'K', 'B', '2', 'L', '3', 'C', 'M', '4', 'D', 'N']
- Return type:
Mergemap
- concatmap(constr)[source]
emap()
that usesconcat()
to combine the nested events:marbles = [ 'A B C D', '_ 1 2 3 4', '__ K L M N'] ev.Range(3).concatmap(lambda v: ev.Marble(marbles[v])) -> ['A', 'B', '1', '2', '3', 'K', 'L', 'M', 'N']
- Return type:
Concatmap
- chainmap(constr)[source]
emap()
that useschain()
to combine the nested events:marbles = [ 'A B C D ', '_ 1 2 3 4', '__ K L M N'] ev.Range(3).chainmap(lambda v: ev.Marble(marbles[v])) -> ['A', 'B', 'C', 'D', '1', '2', '3', '4', 'K', 'L', 'M', 'N']
- Return type:
Chainmap
- switchmap(constr)[source]
emap()
that usesswitch()
to combine the nested events:marbles = [ 'A B C D ', '_ K L M N', '__ 1 2 3 4' ] ev.Range(3).switchmap(lambda v: Event.marble(marbles[v])) -> ['A', 'B', '1', '2', 'K', 'L', 'M', 'N'])
- Return type:
Switchmap
- reduce(func, initializer=<NoValue>)[source]
Apply a two-argument reduction function to the previous reduction result and the current value and emit the new reduction result.
- Parameters:
func –
Reduction function:
emit(args) -> emit(func(prev_args, args))
initializer –
First argument of first reduction:
first_result = func(initializer, first_value)
If no initializer is given, then the first result is emitted on the second source emit.
- Return type:
Reduce
- product(start=1)[source]
Total product.
- Parameters:
start – Initial start value.
- Return type:
Product
- ema(n=None, weight=None)[source]
Exponential moving average.
- Parameters:
n (
Optional
[int
]) – Number of periods.weight (
Optional
[float
]) – Weight of new value.
- Return type:
Ema
Give either
n
orweight
. The relation isweight = 2 / (n + 1)
.
- previous(count=1)[source]
For every source value, emit the
count
-th previous value:source: -ab---c--d-e- output: --a---b--c-d-
Starts emitting on the
count + 1
-th source emit.- Parameters:
count (
int
) – Number of periods to go back.- Return type:
Previous
- pairwise()[source]
Emit
(previous_source_value, current_source_value)
tuples. Starts emitting on the second source emit:source: -a----b------c--------d----- output: ------(a,b)--(b,c)----(c,d)-
- Return type:
Pairwise
- changes()[source]
Emit only source values that have changed from the previous value.
- Return type:
Changes
- unique(key=None)[source]
Emit only unique values, dropping values that have already been emitted.
- Parameters:
key – The callable `’key(value)` is used to group values. The default of
None
groups values by equality. The resulting group must be hashable.- Return type:
Unique
- deque(count=0)[source]
Emit a
deque
with the lastcount
values from the source (or less in the lead-in phase).- Parameters:
count – Number of last periods to use, or 0 to use all.
- Return type:
Deque
- array(count=0)[source]
Emit a numpy array with the last
count
values from the source (or less in the lead-in phase).- Parameters:
count – Number of last periods to use, or 0 to use all.
- Return type:
Array
- chunk(size)[source]
Chunk values up in lists of equal size. The last chunk can be shorter.
- Parameters:
size (
int
) – Chunk size.- Return type:
Chunk
- chunkwith(timer, emit_empty=True)[source]
Emit a chunked list of values when the timer emits.
- Parameters:
timer (
Event
) – Event to use for timing the chunks.emit_empty (
bool
) – Emit empty list if no values present since last emit.
- Return type:
ChunkWith
- chain(*sources)[source]
Re-emit from a source until it ends, then move to the next source, Repeat until all sources have ended, ending the chain. Emits from pending sources are queued up:
source 1: -a----b---c| source 2: --2-----3--4| source 3: ------------x---------y--| output: -a----b---c2--3--4x---y--|
- Parameters:
sources (
Event
) – Source events.- Return type:
Chain
- merge(*sources)[source]
Re-emit everything from the source events:
source 1: -a----b-------------c------d-| source 2: ------1-----2------3--4-| source 3: --------x----y--| output: -a----b--1--x--2-y--c-3--4-d-|
- Parameters:
sources – Source events.
- Return type:
Merge
- concat(*sources)[source]
Re-emit everything from one source until it ends and then move to the next source:
source 1: -a----b-----| source 2: --1-----2-----3----4--| source 3: -----------x--y--| output: -a----b---------3----4----x--y--|
- Parameters:
sources – Source events.
- Return type:
Concat
- switch(*sources)[source]
Re-emit everything from one source and move to another source as soon as that other source starts to emit:
source 1: -a----b---c-----d---| source 2: -----------x---y-| source 3: ---------1----2----3-----| output: -a----b--1----2--x---y---|
- Parameters:
sources – Source events.
- Return type:
Switch
- zip(*sources)[source]
Zip sources together: The i-th emit has the i-th value from each source as positional arguments. Only emits when each source has emtted its i-th value and ends when any source ends:
source 1: -a----b------------------c------d---e--f---| source 2: --------1-------2-------3---------4-----| output emit: --------(a,1)---(b,2)----(c,3)----(d,4)-|
- Parameters:
sources – Source events.
- Return type:
Zip
- ziplatest(*sources, partial=True)[source]
Emit zipped values with the latest value from each of the source events. Emits every time when a source emits:
source 1: -a-------------------b-------c---| source 2: ---------------1--------------------2------| output emit: (a,NoValue)---(a,1)-(b,1)---(c,1)--(c,2)--|
- Parameters:
sources – Source events.
partial (
bool
) –True: Use
NoValue
for sources that have not emitted yet.False: Wait until all sources have emitted.
- Return type:
Ziplatest
- delay(delay)[source]
Time-shift all source events by a delay:
source: -abc-d-e---f---| output: ---abc-d-e---f---|
This applies to the source errors and the source done event as well.
- Parameters:
delay – Time delay of all events (in seconds).
- Return type:
Delay
- timeout(timeout)[source]
When the source doesn’t emit for longer than the timeout period, do an empty emit and set this event as done.
- Parameters:
timeout – Timeout value.
- Return type:
Timeout
- throttle(maximum, interval, cost_func=None)[source]
Limit number of emits per time without dropping values. Values that come in too fast are queued and re-emitted as soon as allowed by the limits.
A nested
status_event
emitsTrue
when throttling starts andFalse
when throttling ends.The limit can be dynamically changed with
set_limit
.- Parameters:
maximum – Maximum payload per interval.
interval – Time interval (in seconds).
cost_func – The sum of
cost_func(value)
for every source value inside theinterval
that is to remain under themaximum
. The default is to count every source value as 1.
- Return type:
Throttle
- debounce(delay, on_first=False)[source]
Filter out values from the source that happen in rapid succession.
- Parameters:
delay – Maximal time difference (in seconds) between successive values before debouncing kicks in.
on_first (
bool
) –True: First value is send immediately and following values in the rapid succession are dropped:
source: -abcd----efg- output: -a-------e---
False: Last value of a rapid succession is send after the delay and the values before that are dropped:
source: -abcd----efg-- output: ----d------g-
- Return type:
Debounce