Eskapade-Spark

  • Version: 0.9.0
  • Released: Dec 2018

Eskapade is a light-weight, python-based data analysis framework, meant for modularizing all sorts of data analysis problems into reusable analysis components. For documentation on Eskapade, please go to this link.

Eskapade-Spark is the Spark-based extension of Eskapade. For documentation on Eskapade-Spark, please go here.

Release notes

Version 0.9

Eskapade-Spark v0.9 (December 2018) contains only one update compared with v0.8:

  • All code has been updated to Eskapade v0.9, where the core functionality has been split off into the Eskapade-Core package. As such the code is backwards-incompatible with v0.8.

See release notes for previous versions of Eskapade-Spark.

Installation

requirements

Eskapade-Spark requires Python 3.5+, Eskapade v0.8+ and Spark v2.1.2. These are pre-installed in the Eskapade docker.

pypi

To install the package from pypi, do:

$ pip install Eskapade-Spark

github

Alternatively, you can check out the repository from github and install it yourself:

$ git clone https://github.com/KaveIO/Eskapade-Spark.git eskapade-spark

To (re)install the python code from your local directory, type from the top directory:

$ pip install -e eskapade-spark

python

After installation, you can now do in Python:

import eskapadespark

Congratulations, you are now ready to use Eskapade-Spark!

Quick run

To see the available Eskapade-Spark examples, do:

$ export TUTDIR=`pip show Eskapade-Spark | grep Location | awk '{ print $2"/eskapadespark/tutorials" }'`
$ ls -l $TUTDIR/

E.g. you can now run:

$ eskapade_run $TUTDIR/esk601_spark_configuration.py

For all available examples, please see the tutorials.

Contact and support

Contact us at: kave [at] kpmg [dot] com

Please note that the KPMG Eskapade group provides support only on a best-effort basis.

Contents

Tutorials

This section contains materials on how to use Eskapade-Spark. All command examples can be run from any directory with write access. For more in depth explanations on the functionality of the code-base, try the API docs.

All Spark Examples in Eskapade

All Eskapade-Spark example macros can be found in the tutorials directory. For ease of use, let’s make a shortcut to the directory containing the tutorials:

$ export TUTDIR=`pip show Eskapade-Spark | grep Location | awk '{ print $2"/eskapadespark/tutorials" }'`
$ ls -l $TUTDIR/

The numbering of the example macros follows the package structure:

  • esk600+: macros for processing Spark datasets and performing analysis with Spark.

These macros are briefly described below. You are encouraged to run all examples to see what they can do for you!

Example esk601: setting the spark configuration

Tutorial macro for configuring Spark in multiple ways.

$ eskapade_run $TUTDIR/esk601_spark_configuration.py
Example esk602: reading csv to a spark dataframe

Tutorial macro for reading CSV files into a Spark data frame.

$ eskapade_run $TUTDIR/esk602_read_csv_to_spark_df.py
Example esk603: writing spark data to csv

Tutorial macro for writing Spark data to a CSV file.

$ eskapade_run $TUTDIR/esk603_write_spark_data_to_csv.py
Example esk604: executing queries

Tutorial macro for applying a SQL-query to one more objects in the DataStore. Such SQL-queries can for instance be used to filter data.

$ eskapade_run $TUTDIR/esk604_spark_execute_query.py
Example esk605: creating Spark data frames from various input data

Tutorial macro for creating Spark data frames from different types of input data.

$ eskapade_run $TUTDIR/esk605_create_spark_df.py
Example esk606: converting Spark data frames into different data types

Tutorial macro for converting Spark data frames into a different data type and apply transformation functions on the resulting data.

$ eskapade_run $TUTDIR/esk606_convert_spark_df.py
Example esk607: adding a new column to a Spark dataframe

Tutorial macro for adding a new column to a Spark dataframe by applying a Spark built-in or user-defined function to a selection of columns in a Spark dataframe.

$ eskapade_run $TUTDIR/esk607_spark_with_column.py
Example esk608: making histograms of a Spark dataframe

Tutorial macro for making histograms of a Spark dataframe using the Histogrammar package.

$ eskapade_run $TUTDIR/esk608_spark_histogrammar.py
Example esk609: applying map functions on groups of rows

Tutorial macro for applying map functions on groups of rows in Spark data frames.

$ eskapade_run $TUTDIR/esk609_map_df_groups.py
Example esk610: running Spark Streaming word count example

Tutorial macro running Spark Streaming word count example in Eskapade, derived from:

https://spark.apache.org/docs/latest/streaming-programming-guide.html

Counts words in UTF8 encoded, ‘n’ delimited text received from a stream every second. The stream can be from either files or network.

$ eskapade_run $TUTDIR/esk610_spark_streaming_wordcount.py
Example esk611: techniques for flattening a time-series in Spark

This macro demonstrates techniques for flattening a time-series in Spark.

$ eskapade_run $TUTDIR/esk611_flatten_time_series.py

Tutorial 6: going Spark

This section provides a tutorial on how to use Apache Spark in Eskapade. Spark works ‘out of the box’ in the Eskapade docker/vagrant image. For details on how to setup a custom Spark setup, see the Spark section in the Appendix.

In this tutorial we will basically redo Tutorial 1 but use Spark instead of Pandas for data processing. The following paragraphs describe step-by-step how to run a Spark job, use existing links and write your own links for Spark queries.

Note

To get familiar with Spark in Eskapade you can follow the exercises in python/eskapadespark/tutorials/tutorial_6.py.

Running the tutorial macro

The very first step to run the tutorial Spark job is:

$ eskapade_run python/eskapadespark/tutorials/tutorial_6.py

Eskapade will start a Spark session, do nothing, and quit - there are no chains/links defined yet. The Spark session is created via the SparkManager which, like the DataStore, is a singleton that configures and controls Spark sessions centrally. It is activated through the magic line:

process_manager.service(SparkManager).create_session(include_eskapade_modules=True)

Note that when the Spark session is created, the following line appears in logs:

Adding Python modules to egg archive <PATH_TO_ESKAPADE>/lib/es_python_modules.egg

This is the SparkManager that ensures all Eskapade source code is uploaded and available to the Spark cluster when running in a distributed environment. To include the Eskapade code the argument include_eskapade_modules need to be set to True (by default it is False).

If there was an ImportError: No module named pyspark then, most likely, SPARK_HOME and PYTHONPATH are not set up correctly. For details, see the Spark section in the Appendix.

Reading data

Spark can read data from various sources, e.g. local disk, HDFS, HIVE tables. Eskapade provides the SparkDfReader link that uses the pyspark.sql.DataFrameReader to read flat CSV files into Spark DataFrames, RDD’s, and Pandas DataFrames. To read in the Tutorial data, the following link should be added to the Data chain:

data = Chain('Data')
reader = SparkDfReader(name='Read_LA_ozone', store_key='data', read_methods=['csv'])
reader.read_meth_args['csv'] = (DATA_FILE_PATH,)
reader.read_meth_kwargs['csv'] = dict(sep=',', header=True, inferSchema=True)
data.add(reader)

The DataStore holds a pointer to the Spark dataframe in (distributed) memory. This is different from a Pandas dataframe, where the entire dataframe is stored in the DataStore, because a Spark dataframe residing on the cluster may not fit entirely in the memory of the machine running Eskapade. This means that Spark dataframes are never written to disk in DataStore pickles!

Spark Streaming

Eskapade supports the use of Spark Streaming as demonstrated in the word count example tutorials/esk610_spark_streaming_wordcount.py. The data is processed in (near) real-time as micro batches of RDD’s, so-called discretized streaming, where the stream originates from either new incoming files or network connection. As with regular Spark queries, various transformations can be defined and applied in subsequent Eskapade links.

For details on Spark Streaming, see also https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html.

File stream

The word count example using the file stream method can be run by executing in two different terminals:

terminal 1 $ eskapade_run -c stream_type='file' python/eskapadespark/tutorials/esk610_spark_streaming_wordcount.py

terminal 2 $ mkdir /tmp/eskapade_stream_test/
terminal 2 $ for ((i=0; i<=100; i++)); do echo "Hello world" > /tmp/eskapade_stream_test/dummy_$(printf %05d ${i}); sleep 0.2; done

Where bash for-loop will create a new file containing Hello world in the /tmp/eskapade_stream_test directory every 0.2 second. Spark Streaming will pick up and process these files and in terminal 1 a word count of the processed data will be displayed. Output is stored in results/esk610_spark_streaming/data/v0/dstream/wordcount. Only new files in /tmp/eskapade_stream_test are processed, do not forget to delete this directory.

TCP stream

The word count example using the TCP stream method can be run by executing in two different terminals:

terminal 1 $ eskapade_run -c stream_type='tcp' python/eskapadespark/tutorials/esk610_spark_streaming_wordcount.py

terminal 2 $ nc -lk 9999

Where nc (netcat) will stream data to port 9999 and Spark Streaming will listen to this port and process incoming data. In terminal 2 random words can be type (followed by enter) and in terminal 1 a word count of the processed data will by displayed. Output is stored in results/esk610_spark_streaming/data/v0/dstream/wordcount.

Release notes

Version 0.9

Eskapade-Spark v0.9 (December 2018) contains only one update compared with v0.8:

  • All code has been updated to Eskapade v0.9, where the core functionality has been split off into the Eskapade-Core package. As such the code is backwards-incompatible with v0.8.

Version 0.8

Version 0.8 of Eskapade-Spark (August 2018) is a split off of the spark-analysis module of Eskapade v0.7 into a separate package. This way, Eskapade v0.8 no longer depends on Spark. This new package Eskapade-Spark does require Spark to install, clearly.

In addition, we have included new analysis code for processing (“flattening”) time-series data, so it can be easily used as input for machine learning models. See tutorial example esk611 for details.

Developing and Contributing

Working on Eskapade-Spark

You have some cool feature and/or algorithm you want to add to Eskapade-Spark. How do you go about it?

First clone Eskapade-Spark.

git clone https://github.com/KaveIO/Eskapade-Spark.git eskapade-spark

then

pip install -e eskapade-spark

this will install Eskapade in editable mode, which will allow you to edit the code and run it as you would with a normal installation of eskapade.

To make sure that everything works try executing eskapade without any arguments, e.g.

eskapade_run --help

or you could just execute the tests using either the eskapade test runner, e.g.

eskapade_trial .

That’s it.

Contributing

When contributing to this repository, please first discuss the change you wish to make via issue, email, or any other method with the owners of this repository before making a change. You can find the contact information on the index page.

Note that when contributing that all tests should succeed.

References

API

API Documentation

EskapadeSpark
eskapadespark package
Subpackages
Submodules
eskapadespark.data_conversion module

Project: Eskapade - A python-based package for data analysis.

Module: spark_analysis.data_conversion

Created: 2017/05/30

Description:
Converters between Spark, Pandas, and Python data formats
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

eskapadespark.data_conversion.create_spark_df(spark, data, schema=None, process_methods=None, **kwargs)

Create a Spark data frame from data in a different format.

A Spark data frame is created with either a specified schema or a schema inferred from the input data. The schema can be specified with the keyword argument “schema”.

Functions to transform the data frame after creation can be specified by the keyword argument “process_methods”. The value of this argument is an iterable of (function, arguments, keyword arguments) tuples to apply.

The data frame is created with the createDataFrame function of the SparkSession. Remaining keyword arguments are passed to this function.

>>> spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>> df = create_spark_df(spark,
>>>                      [[1, 1.1, 'one'], [2, 2.2, 'two']],
>>>                      schema=['int', 'float', 'str'],
>>>                      process_methods=[('repartition', (), {'numPartitions': 6})])
>>> df.show()
+---+-----+---+
|int|float|str|
+---+-----+---+
|  2|  2.2|two|
|  1|  1.1|one|
+---+-----+---+
Parameters:
  • spark (pyspark.sql.SparkSession) – SparkSession instance
  • data – input dataset
  • schema – schema of created data frame
  • process_methods (iterable) – methods to apply on the data frame after creation
Returns:

created data frame

Return type:

pyspark.sql.DataFrame

eskapadespark.data_conversion.df_schema(schema_spec)

Create Spark data-frame schema.

Create a schema for a Spark data frame from a dictionary of (name, data type) pairs, describing the columns. Data types are specified by Python types or by Spark-SQL types from the pyspark.sql.types module.

>>> from collections import OrderedDict as odict
>>> schema_dict = odict()
>>> schema_dict['foo'] = pyspark.sql.types.IntegerType()
>>> schema_dict['bar'] = odict([('descr', str), ('val', float)])
>>> print(schema_dict)
OrderedDict([('foo', IntegerType), ('bar', OrderedDict([('descr', <class 'str'>), ('val', <class 'float'>)]))])
>>> spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>> df = spark.createDataFrame([(1, ('one', 1.1)), (2, ('two', 2.2))], schema=df_schema(schema_dict))
>>> df.show()
+---+---------+
|foo|      bar|
+---+---------+
|  1|[one,1.1]|
|  2|[two,2.2]|
+---+---------+
Parameters:schema_spec (dict) – schema specification
Returns:data-frame schema
Return type:pyspark.sql.types.StructType
Raises:TypeError if data type is specified incorrectly
eskapadespark.data_conversion.hive_table_from_df(spark, df, db, table)

Create a Hive table from a Spark data frame.

Parameters:
  • spark (pyspark.sql.SparkSession) – SparkSession instance
  • df (pyspark.sql.DataFrame) – input data frame
  • db (str) – database for table
  • table (str) – name of table
eskapadespark.decorators module

Project: Eskapade - A python-based package for data analysis.

Module: spark_analysis.decorators

Created: 2017/05/24

Description:
Decorators for Spark objects
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

eskapadespark.decorators.spark_cls_reduce(self)

Reduce function for Spark classes.

Spark objects connected to distributed data cannot be stored in Pickle files. This custom reduce function enables Pickling of a string representation of the Spark object.

eskapadespark.exceptions module

Project: Eskapade - A python-based package for data analysis.

Created: 2017/03/31

Description:
Eskapade exceptions
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

exception eskapadespark.exceptions.MissingPy4jError(message='', required_by='')

Bases: escore.exceptions.MissingPackageError

Exception raised if Py4J is missing.

__init__(message='', required_by='')

Set missing-package arguments.

Parameters:
  • message (str) – message to show when raised
  • required_by (str) – info on component that requires the package
exception eskapadespark.exceptions.MissingSparkError(message='', required_by='')

Bases: escore.exceptions.MissingPackageError

Exception raised if Spark is missing.

__init__(message='', required_by='')

Set missing-package arguments.

Parameters:
  • message (str) – message to show when raised
  • required_by (str) – info on component that requires the package
eskapadespark.functions module

Project: Eskapade - A python-based package for data analysis.

Module: spark_analysis.functions

Created: 2017/05/24

Description:
Collection of Spark functions defined for Eskapade
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

eskapadespark.functions.calc_asym(var1, var2)

Calculate asymmetry.

Calculate asymmetry between variables 1 and 2: >>> (var2 - var1) / (abs(var1) + abs(var2))

Returns:asymmetry value
Return type:float
eskapadespark.functions.is_inf(x)

Test if value is infinite.

eskapadespark.functions.is_nan(x)

Test if value is NaN/null/None.

eskapadespark.functions.spark_query_func(spec)

Get Eskapade Spark-query function.

Get a function that returns a string to be used as a function in a Spark SQL query:

>>> count_fun = spark_query_func('count')
>>> count_fun()
'count(*)'
>>> cov_fun = spark_query_func('cov')
>>> cov_fun('x', 'y')
'covar_pop(if(is_nan(x) or is_inf(x), NULL, x),if(is_nan(y) or is_inf(y), NULL, y))'
>>> my_fun = spark_query_func('my_func::count(if({0:s} == 0, 1, NULL))')
>>> my_fun.__name__
'my_func'
>>> my_fun('my_var')
'count(if(my_var == 0, 1, NULL))'
Parameters:spec (str) – function specification: “name” or “name::definition”
Returns:query function
eskapadespark.functions.spark_sql_func(name, default_func=None)

Get Spark SQL function.

Get a function from pyspark.sql.functions by name. If function does not exist in the SQL-functions module, return a default function, if specified.

Parameters:
  • name (str) – name of function
  • default_func – default function
Returns:

Spark SQL function

Raises:

RuntimeError if function does not exist

eskapadespark.functions.to_date_time(dt, tz_in=None, tz_out=None)

Convert value to date/time object.

Parameters:
  • dt – value representing a date/time (parsed by pandas.Timestamp)
  • tz_in – time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize)
  • tz_out – time zone to convert data/time value into (parsed by pandas.Timestamp.tz_convert)
Returns:

date/time object

Return type:

datetime.datetime

eskapadespark.functions.to_timestamp(dt, tz_in=None)

Convert value to Unix timestamp (ns).

Parameters:
  • dt – value representing a date/time (parsed by pandas.Timestamp)
  • tz_in – time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize)
Returns:

Unix timestamp (ns)

Return type:

int

eskapadespark.resources module

Used by autodoc_mock_imports.

eskapadespark.spark_manager module

Project: Eskapade - A python-based package for data analysis.

Created: 2017/02/27

Class: SparkManager

Description:
Process service for managing Spark operations
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

class eskapadespark.spark_manager.SparkManager(config_path=None)

Bases: escore.core.process_services.ProcessService, escore.core.mixin.ConfigMixin

Process service for managing Spark operations.

__init__(config_path=None)

Initialize Spark manager instance.

create_session(enable_hive_support=False, include_eskapade_modules=False, **conf_kwargs)

Get or create Spark session.

Return the Spark-session instance. Create the session if it does not exist yet. If no SparkConfig is set yet, it is created. All keyword arguments are passed to the _create_spark_conf method in this case.

Parameters:
  • enable_hive_support (bool) – switch for enabling Spark Hive support
  • include_eskapade_modules (bool) – switch to include Eskapade modules in Spark job submission. Default is False. Optional.
finish()

Stop Spark session.

get_session()

Get Spark session.

Return running Spark session and check if the Spark context is still alive.

spark_streaming_context

Spark Streaming Context.

eskapadespark.version module

THIS FILE IS AUTO-GENERATED BY ESKAPADE SETUP.PY.

Module contents

Appendices

Miscellaneous

Collection of miscelleneous Eskapade related items.

  • See Apache Spark for details on using Spark with Eskapade.
Apache Spark

Eskapade supports the use of Apache Spark for parallel processing of large data volumes. Jobs can run on a single laptop using Spark libraries as well as on a Spark/Hadoop cluster in combination with YARN. This section describes how to setup and configure Spark for use with Eskapade. For examples on running Spark jobs with Eskapade, see the Spark tutorial.

Note

Eskapade supports both batch and streaming processing with Apache Spark.

Requirements

A working setup of the Apache Spark libraries is included in both the Eskapade docker and vagrant image (see section Installation). For installation of Spark libraries in a custom setup, please refer to the Spark documentation.

Spark installation

The environment variables SPARK_HOME and PYTHONPATH need be set and to point to the location of the Spark installation and the Python libraries of Spark and py4j (dependency). In the Eskapade docker, for example, it is set to:

$ echo $SPARK_HOME
/opt/spark/pro/
$ echo $PYTHONPATH
/opt/spark/pro/python:/opt/spark/pro/python/lib/py4j-0.10.4-src.zip:...
Configuration

The Spark configuration can be set in two ways:

  1. an Eskapade macro (preferred)
  2. an Eskapade link

This is demonstrated in the following tutorial macro:

$ eskapade_run python/eskapade/tutorials/esk601_spark_configuration.py

Both methods are described below. For a full explanation of Spark configuration settings, see Spark Configuration. In case configuration settings seem not to be picked up correctly, please check Notes at the end of this section.

Eskapade macro (preferred)

This method allows to specify settings per macro, i.e. per analysis, and is therefore the preferred way for bookkeeping analysis-specific settings.

The most easy way to start a Spark session is:

from eskapade import process_manager
from eskapade.spark_analysis import SparkManager

spark = sm.create_session(eskapade_settings=settings)
sc = spark.sparkContext

The default Spark configuration file python/eskapade/config/spark/spark.cfg will be picked up. It contains the following settings:

[spark]
spark.app.name=es_spark
spark.jars.packages=org.diana-hep:histogrammar-sparksql_2.11:1.0.4
spark.master=local[*]
spark.driver.host=localhost

The default Spark settings can be adapted here for all macros at once. In case, alternative settings are only relevant for a single analysis, those settings can also be specified in the macro using the argument variables in the create_session method of the SparkManager:

from eskapade import process_manager
from eskapade.spark_analysis import SparkManager

spark = sm.create_session(spark_settings=[('spark.app.name', 'es_spark_alt_config'), ('spark.master', 'local[42]')])

sm = process_manager.service(SparkManager)
spark = sm.create_session(eskapade_settings=settings,
                          spark_settings=spark_settings,
                          config_path='/path/to/alternative/spark.cfg',
                          enable_hive_support=False,
                          include_eskapade_modules=False
                         )

Where all arguments are optional:

  • eskapade_settings default configuration file as specified by the sparkCfgFile key in ConfigObject (i.e. spark.cfg)
  • config_path alternative path to configuration file
  • spark_settings list of key-value pairs to specify additional Spark settings
  • enable_hive_support: switch to disable/enable Spark Hive support
  • include_eskapade_modules: switch to include/exclude Eskapade modules in Spark job submission (e.g. for user-defined functions)
Parameters

The most important parameters to play with for optimal performance:

  • num-executors
  • executor-cores
  • executor-memory
  • driver-memory
Dynamic allocation

Since version 2.1, Spark allows for dynamic resouce allocation. This requires the following settings:

  • spark.dynamicAllocation.enabled=true
  • spark.shuffle.service.enabled=true

Depending on the mode (standalone, YARN, Mesos), an additional shuffle service needs to be set up. See the documentation for details.

Logging

The logging level of Spark can be controlled in two ways:

  1. through $SPARK_HOME/conf/log4j.properties
log4j.logger.org.apache.spark.api.python.PythonGatewayServer=INFO
  1. through the SparkContext in Python:
spark = process_manager.service(SparkManager).get_session()
spark.sparkContext.setLogLevel('INFO')

PS: the loggers in Python can be controlled through:

import logging
print(logging.Logger.manager.loggerDict) # obtain list of all registered loggers
logging.getLogger('py4j').setLevel('INFO')
logging.getLogger('py4j.java_gateway').setLevel('INFO')

However, not all Spark-related loggers are available here (as they are JAVA-based).

Notes

There are a few pitfalls w.r.t. setting up Spark correctly:

1. If the environment variable PYSPARK_SUBMIT_ARGS is defined, its settings may override those specified in the macro/link. This can be prevented by unsetting the variable:

$ unset PYSPARK_SUBMIT_ARGS

or in the macro:

import os
del os.environ['PYSPARK_SUBMIT_ARGS']

The former will clear the variable from the shell session, whereas the latter will only clear it in the Python session.

2. In client mode not all driver options set via SparkConf are picked up at job submission because the JVM has already been started. Those settings should therefore be passed through the SPARK_OPTS environment variable, instead of using SparkConf in an Eskapade macro or link:

SPARK_OPTS=--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info --driver-memory 2g

3. In case a Spark machine is not connected to a network, setting the SPARK_LOCAL_HOSTNAME environment variable or the spark.driver.host key in SparkConf to the value localhost may fix DNS resolution timeouts which prevent Spark from starting jobs.

Indices and tables