Elkhound

https://travis-ci.org/elkhound/elkhound.svg?branch=master https://img.shields.io/codecov/c/github/elkhound/elkhound/master.svg?style=flat Documentation Status

Elkhound is an opinionated, data-centric workflow engine. It makes the following assumptions about your project:

  • Project workflow can be split into several tasks, each task has clearly defined input and output data files, ideally with no side effects. Tasks form a directed acyclic graph (i.e., no loops).
  • Each data file has clearly defined format and schema. Information between consecutive tasks is transferred primarily via the files. Preference for CSV or gzipped CSV files.

Elkhound will help you by:

  • Versioning data files by timestamping them (tasks read input files with the latest timestamp, write output files with current timestamp).
  • Supporting big data files by providing convenience schema-aware iterators over rows (using Python generators), lowering memory footprint.
  • Managing workflows of tasks, easily running arbitrary lists of tasks.
  • Automatic checkpointing of intermediate results, thanks to inter-task communication via data files and data file versioning.
  • Managing project’s parameters by injecting contents of configuration files and command line parameters as tasks’ contexts (less temptation to hardcode constants).
  • Logging workflow executions, reporting context and execution progress. Automatically collecting and archiving logs from different runs in one place to facilitate reproducibility.
  • Facilitating unit testing by injecting mockable data file objects as inputs and outputs of task classes.

Getting started

Install Elkhound by running:

pip install elkhound

In order to run Elkhound workflows, you need to create an engine configuration file and implement business logic in Task subclasses.

Engine configuration

Engine configuration file (in our example we’ll name it engine.yaml) has three sections:

  • specs, where you’ll describe data files (names, formats, schemas; these files are outputs for some tasks, inputs for other tasks),
  • tasks, where you’ll point to implementations of business logic. Tasks define transformations of data files (how to build an output file Z given input files X and Y)
  • workflows, where you’ll bundle groups of target data files.

Data files

Data files are first class citizens in Elkhound. They are identified by four digit codes (e.g. 1230, 2110, 4315, 5214). Design of a new workflow should begin with registering data files, and optionally defining their schemas (if applicable). Here is an example of data files defined in an engine configuration file:

specs:
  - code: 1230
    name: people
    extension: csv.gz
    flags:
      - gzipped
    schema:
      - name: name
        type: str
      - name: dob
        type: datetime
      - name: is_employee
        type: bool
  - code: 2110
    name: budget
    extension: xlsx
    flags:
      - binary
  - code: 4315
    name: report
    extension: txt
  - code: 5214
    name: plots
    extension: dir
    flags:
      - directory

Tasks

Tasks are Python classes that take zero or more data files on input and produce zero or more data files on output. Each task class has to implement three methods:

  • get_input_data_file_codes(self) returning a list of input data file codes,
  • get_output_data_file_codes(self) returning a list of output data file codes,
  • run(self, input_files, output_files, context) executing business logic.

Here is an example of tasks registered in an engine configuration file:

tasks:
  - class: myapp.DownloadDataTask
  - class: myapp.GenerateReportTask
  - class: myapp.PlotBudgetTask

In our example we will assume that:

  • DownloadDataTask takes no data files on input, produces 1230 and 2110 on output.
  • GenerateReportTask takes 1230 and 2110 on input, produces 4315 on output.
  • PlotBudgetTask takes 2110 on input, produces 5214 on output.

Workflows

Workflows are named lists of targets, i.e., data files to be created. Here is an example (excerpt of an engine configuration file):

workflows:
  monthly_briefing:
    - 4315
    - 5214

Business logic implementation

Each task is implemented as a subclass of elkhound.Task. Their task is to read the input files they need and create the output files. Here is a simple example:

class GenerateReportTask(Task):
    def get_input_data_file_codes(self):
        return [1230, 2110]

    def get_output_data_file_codes(self):
        return [4315]

    def run(self, input_files, output_files, context=None):
        with output_files[4315].open() as f:
            for _, input_file in input_files.items():
                f.write('Used input file {}\n'.format(input_file.get_path()))

When method run is called by the engine, the input_files and output_files arguments contain DataFile objects that know the exact path of the files and can assist in opening them in the right mode (read or write, text or binary, gzipped or not). Data file objects have utility methods for specific situations, for example when an input file is in CSV format, the corresponding data file object has methods like read_data_frame() that returns a Pandas data frame, and iterate_records() which returns a generator yielding records one-by-one (useful when scanning huge files that won’t fit into memory).

Running workflows

Here’s an example of how to run a workflow:

python -m elkhound.runner --dir /workspace/foo --engine engine.yaml --targets monthly_briefing --deps

Detailed documentation

Elkhound API

Elkhound is an opinionated, data-centric workflow engine.

class elkhound.CSVDataFileSpec(code, name, extension='csv', flags=0, schema=None, dialect=<class 'csv.excel'>)

Specification of a CSV data file format.

is_csv() → bool
Returns:Whether the file is a CSV file.
class elkhound.DataFileSpec(code, name, extension, flags=0)

Specification of a data file format.

is_binary() → bool
Returns:Whether the file is binary. If the file is gzipped, specifies whether the underlying file (i.e. after unpacking) is binary.
is_csv() → bool
Returns:Whether the file is a CSV file.
is_directory() → bool
Returns:Whether the “file” is actually a directory. If yes, binary and gzipped flags should be ignored.
is_gzipped() → bool
Returns:Whether the file is zipped using gzip.
class elkhound.Flag

Flags that can be set for a data file specification.

class elkhound.Task

Task describes how to produce output data files having input data files.

This is an abstract base class. Each subclass of Task should implement three methods: get_input_data_file_codes(), get_output_data_file_codes(), and run().

get_input_data_file_codes() → typing.List[int]

Returns data file codes that can be read by the task. One data file can serve as an input for several tasks.

Returns:List of input data file codes.
get_output_data_file_codes() → typing.List[int]

Returns data file codes that can be written by the task. The lowest output file code has to be greater than the highest input file code. At most one task registered in an engine can write a given data file.

Returns:List of output data file codes.
run(input_files: typing.Dict[int, elkhound.file.DataFile], output_files: typing.Dict[int, elkhound.file.DataFile], context: typing.Dict[str, typing.Any])

Runs the task.

Parameters:
  • input_files – Dictionary of DataFile objects, one for each input file code.
  • output_files – Dictionary of DataFile objects, one for each output file code.
  • context – Dictionary of additional information necessary to run the task.
class elkhound.Engine(timestamp: typing.Union[int, NoneType] = None)

Engine orchestrates execution of tasks. In particular, engine is responsible for:

  • managing information about data files, tasks and workflows;
  • versioning intermediate data files;
  • sequencing tasks and supplying right input files.

Engine configuration containing information about data files, tasks and workflows can be read from a YAML file using read() method.

expand_targets(targets: typing.List[str], dependencies: bool = False) → typing.List[int]

Resolve workflow names and optionally add dependencies.

Parameters:
  • targets – List of data file codes and workflow names to resolve.
  • dependencies – Whether to add upstream tasks.
Returns:

List of data file codes after expanding workflows and (optionally) adding dependencies.

read(config_file_name)

Reads complete engine configuration from a YAML file.

Parameters:config_file_name – configuration file name.
run(workspace: str, targets: typing.List[int], context: typing.Dict[str, typing.Any] = None)

Run tasks producing the specified targets (data file codes).

Parameters:
  • workspace – Path to workspace directory containing data files.
  • targets – List of codes of data file to produce.
  • context – Dictionary of additional information necessary to run tasks.
elkhound.run_engine(timestamp: int = None, callback=None, logs: bool = True)

Set up and run an engine instance. Read config files, parse command-line arguments, register file specs and tasks found in the config, set up logging, etc. and run the engine.

It is suggested to run this function in the main function. If additional tweaking is needed between setting up the engine and running it, provide a callback function.

Parameters:
  • timestamp – Optional timestamp of the run, as integer in YYYYMMDDHHMMSS format.
  • callback – Optional function to call just before running the engine. The callback function will receive two arguments: the configured engine instance, and a dictionary of arguments that were about to be passed to the engine’s run() method.
  • logs – Whether to configure logging.