Azure Distributed Data Engineering Toolkit¶
Azure Distributed Data Engineering Toolkit (AZTK) is a python CLI application for provisioning on-demand Spark on Docker clusters in Azure. It’s a cheap and easy way to get up and running with a Spark cluster, and a great tool for Spark users who want to experiment and start testing at scale.
This toolkit is built on top of Azure Batch but does not require any Azure Batch knowledge to use.
Getting Started¶
The minimum requirements to get started with this package are:
- Python 3.5+, pip 9.0.1+
- An Azure account
Installation¶
Before you start, ensure you are running python 3.5 or greater by running: python --version
.
Install from pip¶
It is recommended that you install aztk
in a virtual environment:
# install venv
pip install python-venv
# create a virutal environment called env
python -m venv env
# activate the virtual environment (linux)
source env/bin/activate
# activate the virtual environment (windows)
env/Scripts/activate
To install aztk
using pip
, run:
pip install aztk
Install from source¶
Clone the repo
git clone https://github.com/Azure/aztk.git
Install
aztk
:pip install -e .
Initialize your environment¶
Navigate to the directory you wish to use as your spark development environment, and run:
aztk spark init
This will create a .aztk folder with preset configuration files in your current working directory.
If you would like to initialize your aztk
clusters with a specific development toolset, please pass one of the following flags:
aztk spark init --python
aztk spark init --R
aztk spark init --scala
aztk spark init --java
If you wish to have global configuration files that will be read regardless of your current working directory, run:
aztk spark init --global
This will put default configuration files in your home directory, ~/. Please note that configuration files in your current working directory will take precedence over global configuration files in your home directory.
Account Setup¶
To create the necessary Azure Resources, either:
Account Setup Script¶
Overview¶
The account setup script creates and configures all of the required Azure resources.
The script will create and configure the following resources:
- Resource group
- Storage account
- Batch account
- Azure Active Directory application and service principal
The script outputs all of the necessary information to use aztk
, copy the output into the .aztk/secrets.yaml
file created when running aztk spark init
.
Usage¶
Copy and paste the following into an Azure Cloud Shell:
wget -q https://raw.githubusercontent.com/Azure/aztk/v0.10.2/account_setup.sh &&
chmod 755 account_setup.sh &&
/bin/bash account_setup.sh
A series of prompts will appear, and you can set the values you desire for each field. Default values appear in brackets []
and will be used if no value is provided.
Azure Region [westus]:
Resource Group Name [aztk]:
Storage Account Name [aztkstorage]:
Batch Account Name [aztkbatch]:
Active Directory Application Name [aztkapplication]:
Active Directory Application Credential Name [aztk]:
Once the script has finished running you will see the following output:
service_principal:
tenant_id: <AAD Diretory ID>
client_id: <AAD App Application ID>
credential: <AAD App Password>
batch_account_resource_id: </batch/account/resource/id>
storage_account_resource_id: </storage/account/resource/id>
Copy the entire service_principal
section in your .aztk/secrets.yaml
. If you do not have a secrets.yaml
file, you can create one in your current working directory by running aztk spark init
.
Now you are ready to create your first aztk
cluster. See Creating a Cluster.
Manual resource creation¶
To finish setting up, you need to fill out your Azure Batch and Azure Storage secrets in .aztk/secrets.yaml. We’d also recommend that you enter SSH key info in this file too.
Please note that if you use ssh keys and a have a non-standard ssh key file name or path, you will need to specify the location of your ssh public and private keys. To do so, set them as shown below:
# SSH keys used to create a user and connect to a server.
# The public key can either be the public key itself(ssh-rsa ...) or the path to the ssh key.
# The private key must be the path to the key.
ssh_pub_key: ~/.ssh/my-public-key.pub
ssh_priv_key: ~/.ssh/my-private-key
Log into Azure¶
If you do not already have an Azure account, go to https://azure.microsoft.com and create an account.
Once you have one, log in and go to the Azure Portal to create your Azure Batch account and Azure Storage account.
Using Azure Active Directory Authentication¶
To get the required keys for your Azure Active Directory (AAD) Service Principal, Azure Batch Account and Azure Storage Account, please follow these instructions. Note that this is the recommended path for use with aztk
, as some features require AAD and are disabled if using the alternative Shared Key authentication.
- Register an Azure Active Directory (AAD) Application
- Navigate to Azure Active Directory by searching in “All Services”. Click “Properties” and record the value in the “Directory ID” field. This is your tenant ID.
- Navigate to App Registrations by searching in “All Services”.
- Click the “+ New application registration” option at the top left of the window. Fill in the necessary fields for the “Create” form. For “Application type” use “Web app/ API.”
- Click on the newly created App to reveal more info. Record the Application ID (for use as Client ID). Then click “Settings”, then “Keys.” Create a new password using the provided form, ensure to copy and save the password as it will only be revealed once. This password is used as the credential in secrets.yaml.
- Create a Storage Account
- Click the ‘+’ button at the top left of the screen and search for ‘Storage’. Select ‘Storage account - blob, file, table, queue’ and click ‘Create’
- Fill in the form and create the Storage account.
- Record the Storage account’s resource ID.
- Give your AAD App “Contributor” permissions to your Batch Account. Click “Access Control (IAM)”, then “+ Add” at the top left. Fill in the “Add Permissions” form and save.
- Create a Batch Account
- Click the ‘+’ button at the top left of the screen and search for ‘Compute’. Select ‘Batch’ and click ‘Create’
- Fill in the form and create the Batch account.
- Navigate to your newly created Batch Account and record it’s resource ID by clicking “Properties” and copying.
- Give your AAD App “Contributor” permissions to your Batch Account. Click “Access Control (IAM)”, then “+ Add” at the top left. Fill in the “Add Permissions” form and save.
- Save your account credentials into the secrets.yaml file
- Open the secrets.yaml file in the .aztk folder in your current working directory (if .aztk doesn’t exist, run
aztk spark init
). Fill in all of the fields as described below. - Fill in the service_principal block with your recorded values as shown below:
service_principal:
tenant_id: <AAD Diretory ID>
client_id: <AAD App Application ID>
credential: <AAD App Password>
batch_account_resource_id: </batch/account/resource/id>
storage_account_resource_id: </storage/account/resource/id>
Next Steps¶
Clusters¶
In the Azure Distributed Data Engineering Toolkit, a cluster is primarily designed to run Spark jobs. This document describes how to create a cluster to use for Spark jobs. Alternatively for getting started and debugging you can also use the cluster in interactive mode which will allow you to log into the master node and interact with the cluster from there.
Creating a Cluster¶
Creating a Spark cluster only takes a few simple steps after which you will be able to SSH into the master node of the cluster and interact with Spark. You will be able to view the Spark Web UI, Spark Jobs UI, submit Spark jobs (with spark-submit), and even interact with Spark in a Jupyter notebook.
For the advanced user, please note that the default cluster settings are preconfigured in the .aztk/cluster.yaml file that is generated when you run aztk spark init
. More information on cluster config here.
Commands¶
Create a Spark cluster:
aztk spark cluster create --id <your_cluster_id> --vm-size <vm_size_name> --size <number_of_nodes>
For example, to create a cluster of 4 Standard_A2 nodes called ‘spark’ you can run:
aztk spark cluster create --id spark --vm-size standard_a2 --size 4
You can find more information on VM sizes here. Please note that you must use the official SKU name when setting your VM size - they usually come in the form: “standard_d2_v2”.
Note: The cluster id (--id
) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each cluster must have a unique cluster id.
By default, you cannot create clusters of more than 20 cores in total. Visit this page to request a core quota increase.
Low priority nodes¶
You can create your cluster with low-priority VMs at an 80% discount by using --size-low-pri
instead of --size
. Note that these are great for experimental use, but can be taken away at any time. We recommend against this option when doing long running jobs or for critical workloads.
Mixed Mode¶
You can create clusters with a mixed of low-priority and dedicated VMs to reach the optimal balance of price and availability. In Mixed Mode, your cluster will have both dedicated instances and low priority instances. To minimize the potential impact on your Spark workloads, the Spark master node will always be provisioned on one of the dedicated nodes while each of the low priority nodes will be Spark workers.
Please note, to use Mixed Mode clusters, you need to authenticate using Azure Active Directory (AAD) by configuring the Service Principal in .aztk/secrets.yaml
. You also need to create a Virtual Network (VNET), and provide the resource ID to a Subnet within the VNET in your ./aztk/cluster.yaml` configuration file.
Setting your Spark and/or Python versions¶
By default, the Azure Distributed Data Engineering Toolkit will use Spark v2.2.0 and Python v3.5.4. However, you can set your Spark and/or Python versions by configuring the base Docker image used by this package.
Listing clusters¶
You can list all clusters currently running in your account by running
aztk spark cluster list
Viewing a cluster¶
To view details about a particular cluster run:
aztk spark cluster get --id <your_cluster_id>
Note that the cluster is not fully usable until a master node has been selected and it’s state is idle
.
For example here cluster ‘spark’ has 2 nodes and node tvm-257509324_2-20170820t200959z
is the master and ready to run a job.
Cluster spark
------------------------------------------
State: active
Node Size: standard_a2
Nodes: 2
| Dedicated: 2
| Low priority: 0
Nodes | State | IP:Port | Master
------------------------------------|-----------------|----------------------|--------
tvm-257509324_1-20170820t200959z | idle | 40.83.254.90:50001 |
tvm-257509324_2-20170820t200959z | idle | 40.83.254.90:50000 | *
Deleting a cluster¶
To delete a cluster run:
aztk spark cluster delete --id <your_cluster_id>
Deleting a cluster also permanently deletes any data or logs associated with that cluster. If you wish to persist this data, use the --keep-logs
flag.
To delete all clusters:
aztk spark cluster delete --id $(aztk spark cluster list -q)
Skip delete confirmation by using the --force
flag.
You are charged for the cluster as long as the nodes are provisioned in your account. Make sure to delete any clusters you are not using to avoid unwanted costs.
Run a command on all nodes in the cluster¶
To run a command on all nodes in the cluster, run:
aztk spark cluster run --id <your_cluster_id> "<command>"
The command is executed through an SSH tunnel.
Run a command on a specific node in the cluster¶
To run a command on all nodes in the cluster, run:
aztk spark cluster run --id <your_cluster_id> --node-id <your_node_id> "<command>"
This command is executed through a SSH tunnel.
To get the id of nodes in your cluster, run aztk spark cluster get --id <your_cluster_id>
.
Copy a file to all nodes in the cluster¶
To securely copy a file to all nodes, run:
aztk spark cluster copy --id <your_cluster_id> --source-path </path/to/local/file> --dest-path </path/on/node>
The file will be securely copied to each node using SFTP.
Interactive Mode¶
All other interaction with the cluster is done via SSH and SSH tunneling. If you didn’t create a user during cluster create (aztk spark cluster create
), the first step is to add a user to each node in the cluster.
Make sure that the .aztk/secrets.yaml file has your SSH key (or path to the SSH key), and it will automatically use it to make the SSH connection.
aztk spark cluster add-user --id spark --username admin
Alternatively, you can add the SSH key as a parameter when running the add-user
command.
aztk spark cluster add-user --id spark --username admin --ssh-key <your_key_OR_path_to_key>
You can also use a password to create your user:
aztk spark cluster add-user --id spark --username admin --password <my_password>
Using a SSH key is the recommended method.
SSH and Port Forwarding¶
After a user has been created, SSH into the Spark container on the master node with:
aztk spark cluster ssh --id spark --username admin
If you would like to ssh into the host instead of the Spark container on it, run:
aztk spark cluster ssh --id spark --username admin --host
If you ssh into the host and wish to access the running Docker Spark environment, you can run the following:
sudo docker exec -it spark /bin/bash
Now that you’re in, you can change directory to your familiar $SPARK_HOME
cd $SPARK_HOME
To view the SSH command being called, pass the --no-connect
flag:
aztk spark cluster ssh --id spark --no-connect
Note that an SSH tunnel and shell will be opened with the default SSH client if one is present. Otherwise, a pure python SSH tunnel is created to forward the necessary ports. The pure python SSH tunnel will not open a shell.
Debugging your Spark Cluster¶
If your cluster is in an unknown or unusable state, you can debug by running:
aztk spark cluster debug --id <cluster-id> --output </path/to/output/directory/>
The debug utility will pull logs from all nodes in the cluster. The utility will check for:
- free diskspace
- docker image status
- docker container status
- docker container logs
- docker container process status
- aztk code & version
- spark component logs (master, worker, shuffle service, history server, etc) from $SPARK_HOME/logs
- spark application logs from $SPARK_HOME/work
Please be careful sharing the output of the debug
command as secrets and application code are present in the output.
Pass the --brief
flag to only download the most essential logs from each node:
aztk spark cluster debug --id <cluster-id> --output </path/to/output/directory/> --brief
This command will retrieve:
- stdout file from the node’s startup
- stderr file from the node’s startup
- the docker log for the spark container
Interact with your Spark cluster¶
By default, the aztk spark cluster ssh
command port forwards the Spark Web UI to localhost:8080, Spark Jobs UI to localhost:4040, and Spark History Server to your localhost:18080. This can be configured in .aztk/ssh.yaml.
Docker¶
Azure Distributed Data Engineering Toolkit runs Spark on Docker.
Supported Azure Distributed Data Engineering Toolkit images are hosted publicly on Docker Hub.
By default, the aztk/spark:v0.1.0-spark2.3.0-base
image will be used.
To select an image other than the default, you can set your Docker image at cluster creation time with the optional –docker-repo parameter:
aztk spark cluster create ... --docker-repo <name_of_docker_image_repo>
To customize Docker configuration, you can pass command line options to the docker run
command with the optional –docker-run-options parameter:
aztk spark cluster create ... "--docker-run-options=<command_line_options_for_docker_run>"
For example, if I wanted to use Spark v2.2.0 and start my container in privileged mode and with a kernel memory limit of 100MB, I could run the following cluster create command:
aztk spark cluster create ... --docker-repo aztk/base:spark2.2.0 "--docker-run-options=--privileged --kernel-memory 100m"
Using a custom Docker Image¶
You can build your own Docker image on top or beneath one of our supported base images OR you can modify the supported Dockerfiles and build your own image that way.
Once you have your Docker image built and hosted publicly, you can then use the –docker-repo parameter in your aztk spark cluster create command to point to it.
Using a custom Docker Image that is Privately Hosted¶
To use a private docker image you will need to provide a docker username and password that have access to the repository you want to use.
In .aztk/secrets.yaml
setup your docker config
docker:
username: <myusername>
password: <mypassword>
If your private repository is not on docker hub (Azure container registry for example) you can provide the endpoint here too
docker:
username: <myusername>
password: <mypassword>
endpoint: <https://my-custom-docker-endpoint.com>
Building Your Own Docker Image¶
Building your own Docker Image provides more customization over your cluster’s environment. For some, this may look like installing specific, and even private, libraries that their Spark jobs require. For others, it may just be setting up a version of Spark, Python or R that fits their particular needs.
The Azure Distributed Data Engineering Toolkit supports custom Docker images. To guarantee that your Spark deployment works, we recommend that you build on top of one of our supported images.
To build your own image, can either build on top or beneath one of our supported images OR you can just modify one of the supported Dockerfiles to build your own.
Building on top¶
You can build on top of our images by referencing the aztk/spark image in the FROM keyword of your Dockerfile:
# Your custom Dockerfile
FROM aztk/spark:v0.1.0-spark2.3.0-base
...
Building beneath¶
To build beneath one of our images, modify one of our Dockerfiles so that the FROM keyword pulls from your Docker image’s location (as opposed to the default which is a base Ubuntu image):
# One of the Dockerfiles that AZTK supports
# Change the FROM statement to point to your hosted image repo
FROM my_username/my_repo:latest
...
Please note that for this method to work, your Docker image must have been built on Ubuntu.
Custom Docker Image Requirements¶
If you are building your own custom image and not building on top of a supported image, the following requirements are necessary.
Please make sure that the following environment variables are set:
- AZTK_DOCKER_IMAGE_VERSION
- JAVA_HOME
- SPARK_HOME
You also need to make sure that PATH is correctly configured with $SPARK_HOME
- PATH=$SPARK_HOME/bin:$PATH
By default, these are set as follows:
ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk-amd64
ENV SPARK_HOME /home/spark-current
ENV PATH $SPARK_HOME/bin:$PATH
If you are using your own version of Spark, make that it is symlinked by “/home/spark-current”. $SPARK_HOME, must also point to “/home/spark-current”.
Configuration Files¶
This section refers to the files in the directory .aztk that are generated from the aztk spark init
command.
cluster.yaml¶
The core settings for a cluster are configured in the cluster.yaml file. Once you have set your desired values in .aztk/cluster.yaml, you can create a cluster using aztk spark cluster create
.
This is the default cluster configuration:
# id: <id of the cluster to be created>
id: spark_cluster
# Toolkit configuration [Required] You can use `aztk toolkit` command to find which toolkits are available
toolkit:
software: spark
version: 2.2
# environment: python
# Optional version for the environment
# environment_version:
# Optional docker repository(To bring your custom docker image. Just specify the Toolkit software, version and environment if using default images)
# docker_repo: <name of docker image repo (for more information, see https://github.com/Azure/aztk/blob/master/docs/12-docker-image.md)>
# Optional command line options to pass to `docker run`
# docker_run_options: <additional command line options to pass to `docker run` (for more information, see https://github.com/Azure/aztk/blob/master/docs/12-docker-image.md)>
# vm_size: <vm-size, see available options here: https://azure.microsoft.com/pricing/details/batch//>
vm_size: standard_a2
# size: <number of dedicated nodes in the cluster, not that clusters must contain all dedicated or all low priority nodes>
size: 2
# size_low_priority: <number of low priority nodes in the cluster, mutually exclusive with size setting>
# username: <username for the linux user to be created> (optional)
username: spark
# Enable plugins
plugins:
# - name: spark_ui_proxy
# - name: jupyterlab
# - name: jupyter
# - name: hdfs
# - name: rstudio_server
# Allow master node to also be a worker <true/false> (Default: true)
# worker_on_master: true
# wait: <true/false>
wait: true
Running aztk spark cluster create
will create a cluster of 4 Standard_A2 nodes called ‘spark_cluster’ with a linux user named ‘spark’. This is equivalent to running the command
aztk spark cluster create --id spark --vm-size standard_a2 --size 4 --username spark --wait
NOTE: This assumes that your SSH-key is configured in the .aztk/secrets.yaml file.
ssh.yaml¶
This is the default ssh cluster configuration:
# username: <name of the user account to ssh into>
username: spark
# job_ui_port: <local port where the job ui is forwarded to>
job_ui_port: 4040
# job_history_ui_port: <local port where the job history ui is forwarded to>
job_history_ui_port: 18080
# web_ui_port: <local port where the spark master web ui is forwarded to>
web_ui_port: 8080
# jupyter_port: <local port which where jupyter is forwarded to>
jupyter_port: 8888
# name_node_ui_port: <local port which where Name Node UI is forwarded to>
name_node_ui_port: 50070
# rstudio_server_port: <local port which where rstudio server is forwarded to>
rstudio_server_port: 8787
# connect: <true/false, connect to spark master or print connection string (--no-connect)>
connect: true
Running the command aztk spark cluster ssh --id <cluster_id>
will ssh into the master node of the Spark cluster. It will also forward the Spark Job UI to localhost:4040, the Spark master’s web UI to localhost:8080, and Jupyter to localhost:8888.
Note that all of the settings in ssh.yaml will be overridden by parameters passed on the command line.
Spark Configuration¶
The repository comes with default Spark configuration files which provision your Spark cluster just the same as you would locally. After running aztk spark init
to initialize your working environment, you can view and edit these files at .aztk/spark-defaults.conf
, .aztk/spark-env.sh
and .aztk/core-site.xml
. Please note that you can bring your own Spark configuration files by copying your spark-defaults.conf
, spark-env.sh
and core-site.xml
into your .aztk/
directory.
If using aztk
job submission, please note that both spark.shuffle.service.enabled
and spark.dynamicAllocation.enabled
must be set to true so that the number of executors registered with an application can scale as nodes in the job’s cluster come online.
The following settings available in spark-defaults.conf
and spark-env.sh
are not supported:
spark-env.sh
:
- SPARK_LOCAL_IP
- SPARK_PUBLIC_DNS
- SPARK_MASTER_HOST
- SPARK_MASTER_PORT
- SPARK_WORKER_PORT
- SPARK_MASTER_WEBUI_PORT
- Any options related to YARN client mode or Mesos
spark-defaults.conf
:
- spark.master
History Server¶
If you want to use Spark’s history server, please set the following values in your .aztk/spark-defaults.conf
file:
spark.eventLog.enabled true
spark.eventLog.dir <path>
spark.history.fs.logDirectory <path>
Please note that the path for spark.eventLog.dir
and spark.history.fs.logDirectory
should most likely match so that the history server reads the logs that each Spark job writes. Also note that while the paths can be local (file:/
), it is recommended that the paths be accessible by every node in the cluster so that the history server, which runs on the Spark master node, has access to all application logs. HDFS, WASB, ADL, or any other Hadoop API compliant storage system may be used.
If using WASB, ADL or other cloud storage services, be sure to set your keys in .aztk/core-site.xml
. For more information, see the Cloud Storage documentation.
Configuring Spark Storage¶
The Spark cluster can be configured to use different cloud supported storage offerings (such as Azure Storage Blobs, Azure Data Lake Storage, or any other supported Spark file system). More information can be found in the Cloud Storage documentation.
Placing JARS¶
Additional JAR files can be added to the cluster by simply adding them to the .aztk/jars directory. These JARS will automatically be added to Spark’s default JAR directory. In the case of a naming conflict, the file in .aztk/jars will overwrite the file in the cluster. Typically new JARS must be registered with Spark. To do this, either run the Spark Submit command with a path to the JARS
aztk spark cluster submit --id <my_cluster_id> --jars $SPARK_HOME/jars/my_jar_file_1.jar <my_application> <my_parameters>
Or update the .aztk/spark-default.conf file as shown below to have it registered for all Spark applications.
spark.jars $spark_home/jars/my_jar_file_1.jar,$spark_home/jars/my_jar_file_2.jar
Note: This tool automatically registers several JARS for default cloud storage in the spark-default.conf file. If you want to modify this file, simply append any additional JARS to the end of this list.
Next Steps¶
Azure Files¶
The ability to load a file share on the cluster is really useful when you want to be able to share data across all the nodes, and/or want that data to be persisted longer than the lifetime of the cluster. Azure Files provides a very easy way to mount a share into the cluster and have it accessible to all nodes. This is useful in cases where you have small data sets you want to process (less than 1GB) or have notebooks that you want to re-use between clusters.
Mounting an Azure Files share in the cluster only required updating the cluster.yaml file at .aztk/cluster.yaml
. For example, the following configuration will load two files shares into the cluster, one with my notebooks and one will a small data set that I have previously uploaded to Azure Files.
azure_files:
- storage_account_name: STORAGE_ACCOUNT_NAME
storage_account_key: STORAGE_ACCOUNT_KEY
# Name of the file share in Azure Files
file_share_path: data
# Mount point on the node in the cluster
mount_path: /mnt/data
- storage_account_name: STORAGE_ACCOUNT_NAME
storage_account_key: STORAGE_ACCOUNT_KEY
# Name of the file share in Azure Files
file_share_path: notebooks
# Mount point on the node in the cluster
mount_path: /mnt/notebooks
From the cluster I can now access both of these file shares directly simply by navigating to /mnt/data or /mnt/notebooks respectively.
Plugins¶
Plugins can either be one of the aztk
supported plugins or the path to a local file.
Supported Plugins¶
AZTK ships with a library of default plugins that enable auxiliary services to use with your Spark cluster.
Currently the following plugins are supported:
- JupyterLab
- Jupyter
- HDFS
- RStudioServer
- TensorflowOnSpark
- OpenBLAS
- mvBLAS
Enable a plugin using the CLI¶
If you are using the aztk
CLI and wish to enable a supported plugin, you need to update you .aztk/cluster.yaml
configuration file.
Add or uncomment the plugins
section and set the plugins you desire to enable as follows:
plugins:
- name: jupyterlab
- name: jupyter
- name: hdfs
- name: spark_ui_proxy
- name: rsutio_server
args:
version: "1.1.383"
Enable a plugin using the SDK¶
If you are using the aztk
SDK and wish to enable a supported plugin, you need to import the necessary plugins from the aztk.spark.models.plugin
module and add them to your ClusterConfiguration object’s plugin list:
from aztk.spark.models.plugins import RStudioServerPlugin, HDFSPlugin
cluster_config = ClusterConfiguration(
...# Other config,
plugins=[
JupyterPlugin(),
HDFSPlugin(),
]
)
Custom script plugin¶
This allows you to run your custom code on the cluster
Run a custom script plugin with the CLI¶
Example¶
plugins:
- script: path/to/my/script.sh
- name: friendly-name
script: path/to/my-other/script.sh
target: host
target_role: all-nodes
Options¶
script
: Required Path to the script you want to runname
: Optional Friendly name. By default will be the name of the script filetarget
: Optional Target on where to run the plugin(Default:spark-container
). Can bespark-container
orhost
target_role
: Optional What should be the role of the node where this script run(Default:master
). Can bemaster
,worker
orall-nodes
Submitting an Application¶
Submitting a job to your Spark cluster in this package mimics the experience of a typical standalone cluster. A spark job will be submitted to the system and run to completion.
Spark-Submit¶
The spark-submit experience is mostly the same as any regular Spark cluster with a few minor differences. You can take a look at aztk spark cluster --help
for more detailed information and options.
Run a Spark job:
aztk spark cluster submit --id <name_of_spark_cluster> --name <name_of_spark_job> <executable> <executable_params>
For example, to run a local pi.py file on a Spark cluster, simply specify the local path of the file:
aztk spark cluster submit --id spark --name pipy examples/src/main/python/pi.py 100
To run a remotely hosted pi.py file on a Spark cluster, specify the remote path of the file and use the ‘–remote’ flag:
aztk spark cluster submit --id spark --name pipy --remote wasbs://path@remote/pi.py 100
NOTE: The job name (–name) must be at least 3 characters long, can only contain alphanumeric characters including hyphens but excluding underscores, and cannot contain uppercase letters. Each job you submit must have a unique name.
Monitoring job¶
If you have set up a SSH tunnel with port forwarding, you can navigate to http://localhost:8080 and http://localhost:4040 to view the progress of the job using the Spark UI
Getting output logs¶
The default setting when running a job is –wait. This will simply submit a job to the cluster and wait for the job to run. If you want to just submit the job and not wait, use the –no-wait flag and tail the logs manually:
aztk spark cluster submit --id spark --name pipy --no-wait examples/src/main/python/pi.py 1000
aztk spark cluster app-logs --id spark --name pipy --tail
Cloud storage¶
Cloud storage for spark enables you to have a persisted storage system backed by a cloud provider. Spark supports this by placing the appropriate storage jars and updating the core-site.xml file accordingly.
Azure Storage Blobs (WASB)¶
Pre-built into this package is native support for connecting your Spark cluster to Azure Blob Storage (aka WASB). The required WASB jars are automatically placed in the Spark cluster and the permissions are pulled from your core-site.xml file under .aztk/core-site.xml.
To connect to your Azure Storage account, make sure that the storage fields in your .aztk/core-site.xml file are properly filled out. This tool already has the the basic template for using WASB filled out in the .aztk/core-site.xml file. Simply uncomment the in the “Azure Storage Blobs (WASB)” section and fill out the properties for MY_STORAGE_ACCOUNT_NAME, MY_STORAGE_ACCOUNT_SUFFIX and MY_STORAGE_ACCOUNT_KEY.
Once you have correctly filled out the .aztk/core-site.xml with your storage credentials, you will be able to access your storage accounts from your Spark job.
Reading and writing to and from Azure blobs is easily achieved by using the wasb
syntax. For example, reading a csv file using Pyspark would be:
# read csv data into data
dataframe = spark.read.csv('wasbs://MY_CONTAINER@MY_STORAGE_ACCOUNt.blob.core.windows.net/MY_INPUT_DATA.csv')
# print off the first 5 rows
dataframe.show(5)
# write the csv back to storage
dataframe.write.csv('wasbs://MY_CONTAINER@MY_STORAGE_ACCOUNt.blob.core.windows.net/MY_OUTPUT_DATA.csv')
Azure Data Lake (ADL)¶
Pre-built into this package is native support for connecting your Spark cluster to Azure Data Lake (aka ADL). The required ADL jars are automatically placed in the Spark cluster and the permissions are pulled from your core-site.xml file under .aztk/core-site.xml.
To connect to your Azure Storage account, make sure that the storage fields in your .aztk/core-site.xml file are properly filled out. This tool already has the the basic template for using ADL filled out in the .aztk/core-site.xml file. Simply uncomment the in the “ADL (Azure Data Lake) Configuration” section and fill out the properties for MY_AAD_TENANT_ID, MY_AAD_CLIENT_ID and MY_AAD_CREDENTIAL.
Once you have correctly filled out the .aztk/core-site.xml with your Azure Data Lake credentials, you will be able to access your ADL storage repositories from your Spark job.
Reading and writing to and from Azure Data Lake Storage is easily achieved by using the adl
syntax. For example, reading a csv file using Pyspark would be:
# read csv data into data
dataframe = spark.read.csv('adl://MY_ADL_STORAGE_ACCOUNT.azuredatalakestore.net/MY_INPUT_DATA.csv')
# print off the first 5 rows
dataframe.show(5)
# write the csv back to storage
dataframe.write.csv('adl://MY_ADL_STORAGE_ACCOUNT.azuredatalakestore.net/MY_OUTPUT_DATA.csv')
Note: The implementation of the ADL connector is designed to always access ADLS through a secure channel, so there is no adls file system scheme name. You will always use adl. For more information please take a look at https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-data-lake-store.
Note: In order to use ADL you must first create an AAD application and give it permissions to your ADL Storage account. There is a good tutorial on how to create the require AAD security objects to use ADL here. Not shown in this tutorial is that as a last step, you will need to give permissions the application you created permissions to your ADL Storage account.
Additional file system connectors¶
You can quickly add support for additional data repositories by adding the necessary JARS to your cluster, configuring the spark-defaults.conf and core-site.xml file accordingly.
Adding Jars¶
To add jar files to the cluster, simply add them to your local .aztk/jars directory. These will automatically get loaded into your cluster and placed under $SPARK_HOME/jars
Registering Jars¶
To register the jars, update the .aztk/spark-defaults.conf file and add the path to the jar file(s) to the spark.jars property
spark.jars $spark_home/jars/my_jar_file_1.jar,$spark_home/jars/my_jar_file_2.jar
Configuring the file system¶
Configuring the file system requires an update to the aztk/core-site.xml file. Each file system is unique and requires different setup in the core-site.xml. In .aztk/core-site.xml, we have preloaded templates to add WASB and ADL.
GPU¶
Use GPUs to accelerate your Spark applications. When using a GPU enabled Azure VM, your docker image will contain CUDA-8.0 and cuDnn-6.0 by default. See Docker Image for more information about the AZTK Docker images.
[NOTE: Azure does not have GPU enabled VMs in all regions. Please use this link to make sure that your Batch account is in a region that has GPU enabled VMs]
AZTK uses Nvidia-Docker to expose the VM’s GPU(s) inside the container. Nvidia drivers (ver. 384) are installed at runtime.
Tutorial¶
Create a cluster specifying a GPU enabled VM
aztk spark cluster create --id gpu-cluster --vm-size standard_nc6 --size 1
Submit your an application to the cluster that will take advantage of the GPU
aztk spark cluster submit --id gpu-cluster --name gpu-app ./examples/src/main/python/gpu/nubma_example.py
Installation Location¶
By default, CUDA is installed at /usr/local/cuda-8.0
.
Jobs¶
In the Azure Distributed Data Engineering Toolkit,a Job is an entity that runs against an automatically provisioned and managed cluster. Jobs run a collection of Spark applications and and persist the outputs.
Creating a Job¶
Creating a Job starts with defining the necessary properties in your .aztk/job.yaml
file. Jobs have one or more applications to run as well as values that define the Cluster the applications will run on.
Job.yaml¶
Each Job has one or more applications given as a List in Job.yaml. Applications are defined using the following properties:
applications:
- name:
application:
application_args:
-
main_class:
jars:
-
py_files:
-
files:
-
driver_java_options:
-
driver_library_path:
driver_class_path:
driver_memory:
executor_memory:
driver_cores:
executor_cores:
Please note: the only required fields are name and application. All other fields may be removed or left blank.
NOTE: The Application name can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each application must have a unique name.
Jobs also require a definition of the cluster on which the Applications will run. The following properties define a cluster:
cluster_configuration:
vm_size: <the Azure VM size>
size: <the number of nodes in the Cluster>
toolkit:
software: spark
version: 2.2
subnet_id: <resource ID of a subnet to use (optional)>
Please Note: For more information about Azure VM sizes, see Azure Batch Pricing. And for more information about Docker repositories see Docker.
The only required fields are vm_size and either size or size_low_priority, all other fields can be left blank or removed.
A Job definition may also include a default Spark Configuration. The following are the properties to define a Spark Configuration:
spark_configuration:
spark_defaults_conf: </path/to/your/spark-defaults.conf>
spark_env_sh: </path/to/your/spark-env.sh>
core_site_xml: </path/to/your/core-site.xml>
Please note: including a Spark Configuration is optional. Spark Configuration values defined as part of an application will take precedence over the values specified in these files.
Below we will define a simple, functioning job definition.
# Job Configuration
job:
id: test-job
cluster_configuration:
vm_size: standard_f2
size: 3
applications:
- name: pipy100
application: /path/to/pi.py
application_args:
- 100
- name: pipy200
application: /path/to/pi.py
application_args:
- 200
Once submitted, this Job will run two applications, pipy100 and pipy200, on an automatically provisioned Cluster with 3 dedicated Standard_f2 size Azure VMs. Immediately after both pipy100 and pipy200 have completed the Cluster will be destroyed. Application logs will be persisted and available.
Commands¶
Submit a Spark Job:
aztk spark job submit --id <your_job_id> --configuration </path/to/job.yaml>
NOTE: The Job id (--id
) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. Each Job must have a unique id.
Low priority nodes¶
You can create your Job with low-priority VMs at an 80% discount by using --size-low-pri
instead of --size
. Note that these are great for experimental use, but can be taken away at any time. We recommend against this option when doing long running jobs or for critical workloads.
Listing Jobs¶
You can list all Jobs currently running in your account by running
aztk spark job list
Viewing a Job¶
To view details about a particular Job, run:
aztk spark job get --id <your_job_id>
For example here Job ‘pipy’ has 2 applications which have already completed.
Job pipy
------------------------------------------
State: | completed
Transition Time: | 21:29PM 11/12/17
Applications | State | Transition Time
------------------------------------|----------------|-----------------
pipy100 | completed | 21:25PM 11/12/17
pipy200 | completed | 21:24PM 11/12/17
Deleting a Job¶
To delete a Job run:
aztk spark job delete --id <your_job_id>
Deleting a Job also permanently deletes any data or logs associated with that cluster. If you wish to persist this data, use the --keep-logs
flag.
You are only charged for the job while it is active, Jobs handle provisioning and destroying infrastructure, so you are only charged for the time that your applications are running.
Stopping a Job¶
To stop a Job run:
aztk spark job stop --id <your_job_id>
Stopping a Job will end any currently running Applications and will prevent any new Applications from running.
Get information about a Job’s Application¶
To get information about a Job’s Application:
aztk spark job get-app --id <your_job_id> --name <your_application_name>
Getting a Job’s Application’s log¶
To get a job’s application logs:
aztk spark job get-app-logs --id <your_job_id> --name <your_application_name>
Stopping a Job’s Application¶
To stop an application that is running or going to run on a Job:
aztk spark job stop-app --id <your_job_id> --name <your_application_name>
Migration Guide¶
0.6.0 to 0.7.0¶
This guide will describe the steps needed to update a 0.6.0 aztk installation to 0.7.0.
Installation from pip¶
AZTK is now published on pip! If you installed from github previously, please reinstall.
To uninstall run:
pip3 uninstall aztk
The following command will get the latest release of aztk (please ensure you are using python3.5+):
pip3 install aztk
Or, you can install 0.7.0 specifically using:
pip3 install aztk==0.7.0
Configuration Files¶
A number of changes have been made that affect previously init’ed aztk environments. To limit potential issues with previous versions, we recommend that you replace any existing .aztk
directories.
- Backup your existing
.aztk
directory by renaming it to.aztk.old
. - Run
aztk spark init
to create a new.aztk
directory - Copy the values from
.aztk.old/secrets.yaml
to.aztk/secrets.yaml
- Update the new
.aztk/cluster.yaml
with values from.aztk.old/cluster.yaml
if applicable. Please be aware of the newtoolkit
section that replacesdocker_repo
for supported images. Similarly for.aztk/job.yaml
. - Update the new defaults in
.aztk/spark-defaults.conf
,.aztk/core-site.xml
and.aztk/spark-env.sh
if applicable. - Be sure to not copy over the
.aztk.old/jars
directory. All jars that were placed here by default have been moved on the Docker image. You can add any custom jars you had by placing them in.aztk/jars/
. - Create your new 0.7.0 cluster!
cluster.yaml¶
In cluster.yaml, the toolkit
key has been added. It is used to select the default, supported Docker images. Please refer to the configuration file documentation.
Docker images¶
A major backwards-incompatible refactor of the Docker images has occurred. Previous Docker images will no longer work with 0.7.0. To update to a new supported docker image, you will need to update your .aztk/cluster.yaml
configuration file with the toolkit
block in place of docker_repo
. If you do not do so, cluster creation will fail!
Please refer to the the configuration file documentation for more information on the toolkit
in cluster.yaml
.
Custom scripts depreciation and Plugins¶
Custom scripts have been depreciated in favor of Plugins. Plugins have a number of advantages, including the ability to execute on the host (and not in the Spark Docker container). A number of supported plugins are shipped with aztk, please refer to the plugin documentation to learn more.
Read the Docs¶
SDK samples¶
Create the Spark client¶
You can get the values for this by either running the Getting Started script or using Batch Labs
import os
import sys
import time
import aztk.spark
from aztk.error import AztkError
# set your secrets
secrets_configuration = aztk.spark.models.SecretsConfiguration(
service_principal=aztk.spark.models.ServicePrincipalConfiguration(
tenant_id="<org>.onmicrosoft.com",
client_id="",
credential="",
batch_account_resource_id="",
storage_account_resource_id="",
),
ssh_pub_key="")
# create a client
client = aztk.spark.Client(secrets_configuration)
List available clusters¶
# list available clusters
clusters = client.cluster.list()
Create a new cluster¶
configuration_file_path = "/path/to/spark/configuration/files"
spark_configuration = aztk.spark.models.SparkConfiguration(
spark_defaults_conf=os.path.join(configuration_file_path, 'spark-defaults.conf'),
spark_env_sh=os.path.join(configuration_file_path, 'spark-env.sh'),
core_site_xml=os.path.join(configuration_file_path, 'core-site.xml'),
jars=[
os.path.join(configuration_file_path, 'jars', jar)
for jar in os.listdir(os.path.join(configuration_file_path, 'jars'))
])
# configure my cluster
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id="sdk-test",
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
size=2,
vm_size="standard_f2",
spark_configuration=spark_configuration)
# create a cluster, and wait until it is ready
try:
cluster = client.cluster.create(cluster_configuration, wait=True)
except AztkError as e:
raise e
Get an exiting cluster¶
# get details of the cluster
cluster = client.cluster.get(cluster.id)
Run an application on the cluster¶
# define a Spark application to run
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy1",
application=os.path.join(ROOT_PATH, 'examples', 'src', 'main', 'python', 'pi.py'),
application_args="10")
# submit the application and wait until it is finished running
client.cluster.submit(cluster.id, app1)
Get the logs of an application¶
# get logs for app, print to console
app1_logs = client.get_application_log(cluster_id=cluster_config.cluster_id, application_name=app1.name)
print(app1_logs.log)
Get status of app¶
# get status of application
status = client.cluster.get_application_state(cluster_configuration.cluster_id, app1.name)
Stream logs of app, print to console as it runs¶
# stream logs of app, print to console as it runs
current_bytes = 0
while True:
app1_logs = client.cluster.get_application_log(
id=cluster_configuration.cluster_id, application_name=app1.name, tail=True, current_bytes=current_bytes)
print(app1_logs.log, end="")
if app1_logs.application_state == 'completed':
break
current_bytes = app1_logs.total_bytes
time.sleep(1)
Stream logs of app, print to console as it runs¶
# delete the cluster
client.cluster.delete(cluster.id)
Define a custom plugin¶
Full example¶
from aztk.spark.models.plugins import PluginConfiguration, PluginFile,PluginPort, PluginTarget, PluginTargetRole
cluster_config = ClusterConfiguration(
...# Other config,
plugins=[
PluginConfiguration(
name="my-custom-plugin",
files=[
PluginFile("file.sh", "/my/local/path/to/file.sh"),
PluginFile("data/one.json", "/my/local/path/to/data/one.json"),
PluginFile("data/two.json", "/my/local/path/to/data/two.json"),
],
execute="file.sh", # This must be one of the files defined in the file list and match the target path,
env=dict(
SOME_ENV_VAR="foo"
),
args=["arg1"], # Those arguments are passed to your execute script
ports=[
PluginPort(internal="1234"), # Internal only(For node communication for example)
PluginPort(internal="2345", public=True), # Open the port to the public(When ssh into). Used for UI for example
],
# Pick where you want the plugin to run
target=PluginTarget.Host, # The script will be run on the host. Default value is to run in the spark container
target_role=PluginTargetRole.All, # If the plugin should be run only on the master worker or all. You can use environment variables(See below to have different master/worker config)
)
]
)
Parameters¶
PluginConfiguration
¶
name required
| string
¶
Name of your plugin(This will be used for creating folder, it is recommended to have a simple letter, dash, underscore only name)
files required
| List[PluginFile|PluginTextFile]
¶
List of files to upload
execute required
| str
¶
Script to execute. This script must be defined in the files above and must match its remote path
args optional
| List[str]¶
List of arguments to be passed to your execute scripts
env optional
| dict¶
List of environment variables to access in the script(This can be used to pass arguments to your script instead of args)
ports optional
| List[PluginPort]
¶
List of ports to open if the script is running in a container. A port can also be specific public and it will then be accessible when ssh into the master node.
target | optional
| PluginTarget
¶
Define where the execute script should be running. Potential values are PluginTarget.SparkContainer(Default)
and PluginTarget.Host
taget_role
| optional
| PluginTargetRole
¶
If the plugin should be run only on the master worker or all. You can use environment variables(See below to have different master/worker config)
PluginFile
¶
target
required
| str
¶
Where the file should be dropped relative to the plugin working directory
local_path
| required
| str
¶
Path to the local file you want to upload(Could form the plugins parameters)
Environment variables available in the plugin¶
AZTK provide a few environment variables that can be used in your plugin script
AZTK_IS_MASTER
: Is the plugin running on the master node. Can be eithertrue
orfalse
AZTK_IS_WORKER
: Is a worker setup on the current node(This might also be a master if you haveworker_on_master
set to true) Can be eithertrue
orfalse
AZTK_MASTER_IP
: Internal ip of the master
Debug your plugin¶
When your plugin is not working as expected there is a few things you do to investigate issues
Check the logs, you can either use the debug tool or BatchLabs
Navigate to startup/wd/logs/plugins
Now if you see a file named
<your-plugin-name>.txt
under that folder it means that your plugin started correctly and you can check this file to see what you execute script logged.IF this file doesn’t exists this means the script was not run on this node. There could be multiple reasons for this:
- If you want your plugin to run on the spark container check the
startup/wd/logs/docker.log
file for information about this - If you want your plugin to run on the host check the
startup/stdout.txt
andstartup/stderr.txt
The log could mention you picked the wrong target or target role for that plugin which is why this plugin is not running on this node.
- If you want your plugin to run on the spark container check the
aztk package¶
aztk.models package¶
-
class
aztk.models.
ApplicationLog
(name: str, cluster_id: str, log: str, total_bytes: int, application_state: azure.batch.models.batch_service_client_enums.TaskState, exit_code: int)[source]¶ Bases:
object
-
class
aztk.models.
Cluster
(pool: azure.batch.models.cloud_pool_py3.CloudPool, nodes: azure.batch.models.compute_node_paged.ComputeNodePaged = None)[source]¶ Bases:
object
-
class
aztk.models.
ClusterConfiguration
(*args, **kwargs)[source]¶ Bases:
aztk.core.models.model.Model
Cluster configuration model
Parameters: - cluster_id (str) – Id of the Aztk cluster
- toolkit (aztk.models.Toolkit) – Toolkit to be used for this cluster
- size (int) – Number of dedicated nodes for this cluster
- size_low_priority (int) – Number of low priority nodes for this cluster
- vm_size (int) – Azure Vm size to be used for each node
- subnet_id (str) – Full resource id of the subnet to be used(Required for mixed mode clusters)
- plugins (List[aztk.models.plugins.PluginConfiguration]) – List of plugins to be used
- file_shares (List[aztk.models.FileShare]) – List of File shares to be used
- user_configuration (aztk.models.UserConfiguration) – Configuration of the user to be created on the master node to ssh into.
-
cluster_id
¶ Model String field
-
toolkit
¶ Field is another model
Parameters: - model (aztk.core.models.Model) – Model object that field should be
- merge_strategy (ModelMergeStrategy) – When merging models how should the nested model be merged. Default: ModelMergeStrategy.merge
-
size
¶ Model Integer field
-
size_low_priority
¶ Model Integer field
-
vm_size
¶ Model String field
-
subnet_id
¶ Model String field
-
plugins
¶ Field that should be a list
Field that should be a list
-
user_configuration
¶ Field is another model
Parameters: - model (aztk.core.models.Model) – Model object that field should be
- merge_strategy (ModelMergeStrategy) – When merging models how should the nested model be merged. Default: ModelMergeStrategy.merge
-
scheduling_target
¶ Field that should be an enum
-
class
aztk.models.
ClusterState
[source]¶ Bases:
enum.Enum
An enumeration.
-
deleting
= 'deleting'¶
-
resizing
= 'resizing'¶
-
steady
= 'steady'¶
-
stopping_resize
= 'stopping'¶
-
-
class
aztk.models.
DockerConfiguration
(**kwargs)[source]¶ Bases:
aztk.core.models.model.Model
Configuration for connecting to private docker
Parameters: -
endpoint
¶ Model String field
-
username
¶ Model String field
-
password
¶ Model String field
-
-
class
aztk.models.
Enum
[source]¶ Bases:
object
Generic enumeration.
Derive from this class to define new enumerations.
Bases:
aztk.core.models.model.Model
Model String field
Model String field
Model String field
Model String field
-
class
aztk.models.
Model
(**kwargs)[source]¶ Bases:
object
Base class for all aztk models
To implement model wide validation implement __validate__ method
-
class
aztk.models.
NodeOutput
(id: str, output: Union[tempfile.SpooledTemporaryFile, str] = None, error: Exception = None)[source]¶ Bases:
object
-
class
aztk.models.
PluginConfiguration
(**kwargs)[source]¶ Bases:
aztk.core.models.model.Model
Plugin manifest that should be returned in the main.py of your plugin
- Args
- name: Name of the plugin. Used to reference the plugin runOn: Where the plugin should run execute: Path to the file to execute(This must match the target of one of the files) files: List of files to upload args: List of arguments to pass to the executing script env: Dict of environment variables to pass to the script
-
name
¶ Model String field
-
files
¶ Field that should be a list
-
execute
¶ Model String field
-
args
¶ Field that should be a list
-
env
¶ Field that should be a list
-
target
¶ Field that should be an enum
-
target_role
¶ Field that should be an enum
-
ports
¶ Field that should be a list
-
class
aztk.models.
PluginFile
(target: str = None, local_path: str = None)[source]¶ Bases:
aztk.core.models.model.Model
Reference to a file for a plugin.
-
target
¶ Model String field
-
local_path
¶ Model String field
-
-
class
aztk.models.
PluginPort
(**kwargs)[source]¶ Bases:
aztk.core.models.model.Model
Definition for a port that should be opened on node :param internal: Port on the node :param public: [Optional] Port available to the user. If none won’t open any port to the user :param name: [Optional] name to differentiate ports if you have multiple
-
internal
¶ Model Integer field
-
public
¶ Base class for all model fields
-
name
¶ Model Integer field
-
expose_publicly
¶
-
public_port
¶
-
-
class
aztk.models.
PluginTarget
[source]¶ Bases:
enum.Enum
Where this plugin should run
-
SparkContainer
= 'spark-container'¶
-
Host
= 'host'¶
-
-
class
aztk.models.
PluginTargetRole
[source]¶ Bases:
enum.Enum
An enumeration.
-
Master
= 'master'¶
-
Worker
= 'worker'¶
-
All
= 'all-nodes'¶
-
-
class
aztk.models.
PortForwardingSpecification
(**kwargs)[source]¶ Bases:
aztk.core.models.model.Model
-
remote_port
¶ Model Integer field
-
local_port
¶ Model Integer field
-
-
class
aztk.models.
SchedulingTarget
[source]¶ Bases:
enum.Enum
Target where task will get scheduled. For spark this is where the driver will live.
-
Master
= 'master'¶ Only master is allowed to run task
-
Any
= 'any'¶ Any node(Not recommended if using low pri) (Default)
-
-
class
aztk.models.
SecretsConfiguration
(**kwargs)[source]¶ Bases:
aztk.core.models.model.Model
-
service_principal
¶ Field is another model
Parameters: - model (aztk.core.models.Model) – Model object that field should be
- merge_strategy (ModelMergeStrategy) – When merging models how should the nested model be merged. Default: ModelMergeStrategy.merge
Field is another model
Parameters: - model (aztk.core.models.Model) – Model object that field should be
- merge_strategy (ModelMergeStrategy) – When merging models how should the nested model be merged. Default: ModelMergeStrategy.merge
-
docker
¶ Field is another model
Parameters: - model (aztk.core.models.Model) – Model object that field should be
- merge_strategy (ModelMergeStrategy) – When merging models how should the nested model be merged. Default: ModelMergeStrategy.merge
-
ssh_pub_key
¶ Model String field
-
ssh_priv_key
¶ Model String field
-
-
class
aztk.models.
ServicePrincipalConfiguration
(**kwargs)[source]¶ Bases:
aztk.core.models.model.Model
Container class for AAD authentication
-
tenant_id
¶ Model String field
-
client_id
¶ Model String field
-
credential
¶ Model String field
-
batch_account_resource_id
¶ Model String field
-
storage_account_resource_id
¶ Model String field
-
Bases:
aztk.core.models.model.Model
Container class for shared key authentication
Model String field
Model String field
Model String field
Model String field
Model String field
Model String field
-
class
aztk.models.
Software
[source]¶ Bases:
object
Enum with list of available softwares
-
spark
= 'spark'¶
-
-
class
aztk.models.
Task
(**kwargs)[source]¶ Bases:
aztk.core.models.model.Model
-
id
¶ Model String field
-
node_id
¶ Model String field
-
state
¶ Model String field
-
state_transition_time
¶ Model String field
-
command_line
¶ Model String field
-
exit_code
¶ Model Integer field
-
start_time
¶ Field that should be an datetime
-
end_time
¶ Field that should be an datetime
-
failure_info
¶ Model String field
-
-
class
aztk.models.
TaskState
[source]¶ Bases:
enum.Enum
An enumeration.
-
Running
= 'running'¶
-
Completed
= 'completed'¶
-
Failed
= 'failed'¶
-
Preparing
= 'preparing'¶
-
-
class
aztk.models.
TextPluginFile
(target: str, content: Union[str, _io.StringIO])[source]¶ Bases:
aztk.core.models.model.Model
Reference to a file for a plugin.
Args: target (str): Where should the file be uploaded relative to the plugin working dir content (str|io.StringIO): Content of the file. Can either be a string or a StringIO
-
target
¶ Model String field
-
-
class
aztk.models.
Toolkit
(**kwargs)[source]¶ Bases:
aztk.core.models.model.Model
Toolkit for a cluster. This will help pick the docker image needed
Parameters: - software (str) – Name of the toolkit(spark)
- version (str) – Version of the toolkit
- environment (str) – Which environment to use for this toolkit
- environment_version (str) – If there is multiple version for an environment you can specify which one
- docker_repo (str) – Optional docker repo
- docker_run_options (str) – Optional command-line options for docker run
-
software
¶ Model String field
-
version
¶ Model String field
-
environment
¶ Model String field
-
environment_version
¶ Model String field
-
docker_repo
¶ Model String field
-
docker_run_options
¶ Model String field
-
class
aztk.models.
Union
[source]¶ Bases:
typing.Final
Union type; Union[X, Y] means either X or Y.
To define a union, use e.g. Union[int, str]. Details:
The arguments must be types and there must be at least one.
None as an argument is a special case and is replaced by type(None).
Unions of unions are flattened, e.g.:
Union[Union[int, str], float] == Union[int, str, float]
Unions of a single argument vanish, e.g.:
Union[int] == int # The constructor actually returns int
Redundant arguments are skipped, e.g.:
Union[int, str, int] == Union[int, str]
When comparing unions, the argument order is ignored, e.g.:
Union[int, str] == Union[str, int]
When two arguments have a subclass relationship, the least derived argument is kept, e.g.:
class Employee: pass class Manager(Employee): pass Union[int, Employee, Manager] == Union[int, Employee] Union[Manager, int, Employee] == Union[int, Employee] Union[Employee, Manager] == Employee
Corollary: if Any is present it is the sole survivor, e.g.:
Union[int, Any] == Any
Similar for object:
Union[int, object] == object
To cut a tie: Union[object, Any] == Union[Any, object] == Any.
You cannot subclass or instantiate a union.
You cannot write Union[X][Y] (what would it mean?).
You can use Optional[X] as a shorthand for Union[X, None].
aztk.spark package¶
aztk.spark.models package¶
aztk.spark.models.plugins package¶
-
class
aztk.spark.models.plugins.
HDFSPlugin
[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.plugins.
ResourceMonitorPlugin
[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.plugins.
SimplePlugin
[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.
ApplicationConfiguration
(name=None, application=None, application_args=None, main_class=None, jars=None, py_files=None, files=None, driver_java_options=None, driver_library_path=None, driver_class_path=None, driver_memory=None, executor_memory=None, driver_cores=None, executor_cores=None, max_retry_count=None)[source]¶ Bases:
object
-
class
aztk.spark.models.
ApplicationLog
(application_log: aztk.models.application_log.ApplicationLog)[source]¶ Bases:
aztk.models.application_log.ApplicationLog
-
class
aztk.spark.models.
ApplicationState
[source]¶ Bases:
enum.Enum
An enumeration.
-
Running
= 'running'¶
-
Completed
= 'completed'¶
-
Failed
= 'failed'¶
-
Preparing
= 'preparing'¶
-
-
class
aztk.spark.models.
Cluster
(cluster: aztk.models.cluster.Cluster)[source]¶ Bases:
aztk.models.cluster.Cluster
-
class
aztk.spark.models.
ClusterConfiguration
(*args, **kwargs)[source]¶ Bases:
aztk.models.cluster_configuration.ClusterConfiguration
-
spark_configuration
¶ Field is another model
Parameters: - model (aztk.core.models.Model) – Model object that field should be
- merge_strategy (ModelMergeStrategy) – When merging models how should the nested model be merged. Default: ModelMergeStrategy.merge
-
worker_on_master
¶ Model Boolean field
-
-
class
aztk.spark.models.
DockerConfiguration
(**kwargs)[source]¶ Bases:
aztk.models.secrets_configuration.DockerConfiguration
-
class
aztk.spark.models.
Enum
[source]¶ Bases:
object
Generic enumeration.
Derive from this class to define new enumerations.
-
class
aztk.spark.models.
File
(name: str, payload: _io.StringIO)[source]¶ Bases:
aztk.models.file.File
Bases:
aztk.models.file_share.FileShare
-
class
aztk.spark.models.
Job
(cloud_job_schedule: azure.batch.models.cloud_job_schedule_py3.CloudJobSchedule, tasks: List[aztk.models.task.Task] = None, pool: azure.batch.models.cloud_pool_py3.CloudPool = None, nodes: azure.batch.models.compute_node_paged.ComputeNodePaged = None)[source]¶ Bases:
object
-
class
aztk.spark.models.
JobConfiguration
(id=None, applications=None, vm_size=None, spark_configuration=None, toolkit=None, max_dedicated_nodes=0, max_low_pri_nodes=0, subnet_id=None, scheduling_target: aztk.models.scheduling_target.SchedulingTarget = None, worker_on_master=None)[source]¶ Bases:
object
-
class
aztk.spark.models.
JobState
[source]¶ Bases:
enum.Enum
An enumeration.
-
active
= 'active'¶
-
completed
= 'completed'¶
-
disabled
= 'disabled'¶
-
terminating
= 'terminating'¶
-
deleting
= 'deleting'¶
-
-
class
aztk.spark.models.
List
[source]¶ Bases:
list
,typing.MutableSequence
-
class
aztk.spark.models.
Model
(**kwargs)[source]¶ Bases:
object
Base class for all aztk models
To implement model wide validation implement __validate__ method
-
class
aztk.spark.models.
PluginConfiguration
(**kwargs)[source]¶ Bases:
aztk.models.plugins.plugin_configuration.PluginConfiguration
-
class
aztk.spark.models.
PortForwardingSpecification
(**kwargs)[source]¶ Bases:
aztk.models.port_forward_specification.PortForwardingSpecification
-
class
aztk.spark.models.
RemoteLogin
(remote_login: aztk.models.remote_login.RemoteLogin)[source]¶ Bases:
aztk.models.remote_login.RemoteLogin
-
class
aztk.spark.models.
SchedulingTarget
[source]¶ Bases:
enum.Enum
Target where task will get scheduled. For spark this is where the driver will live.
-
Master
= 'master'¶ Only master is allowed to run task
-
Any
= 'any'¶ Any node(Not recommended if using low pri) (Default)
-
-
class
aztk.spark.models.
SecretsConfiguration
(**kwargs)[source]¶ Bases:
aztk.models.secrets_configuration.SecretsConfiguration
-
class
aztk.spark.models.
ServicePrincipalConfiguration
(**kwargs)[source]¶ Bases:
aztk.models.secrets_configuration.ServicePrincipalConfiguration
Bases:
aztk.models.secrets_configuration.SharedKeyConfiguration
-
class
aztk.spark.models.
SparkConfiguration
(*args, **kwargs)[source]¶ Bases:
aztk.core.models.model.Model
-
spark_defaults_conf
¶ Model String field
-
spark_env_sh
¶ Model String field
-
core_site_xml
¶ Model String field
-
jars
¶ Field that should be a list
-
-
class
aztk.spark.models.
SparkToolkit
(version: str, environment: str = None, environment_version: str = None)[source]¶ Bases:
aztk.models.toolkit.Toolkit
aztk.spark.client module¶
-
class
aztk.spark.client.
Client
(secrets_configuration: aztk.spark.models.models.SecretsConfiguration)[source]¶ Bases:
aztk.client.client.CoreClient
The client used to create and manage Spark clusters
-
cluster
¶
-
job
¶
-
-
class
aztk.spark.client.cluster.
ClusterOperations
(context)[source]¶ Bases:
aztk.spark.client.base.operations.SparkBaseOperations
Spark ClusterOperations object
-
_core_cluster_operations
¶
-
# _spark_base_cluster_operations
aztk.spark.client.cluster.CoreClusterOperations
-
create
(cluster_configuration: aztk.spark.models.models.ClusterConfiguration, wait: bool = False)[source]¶ Create a cluster.
Parameters: - cluster_configuration (
ClusterConfiguration
) – Configuration for the cluster to be created. - wait (
bool
) – if True, this function will block until the cluster creation is finished.
Returns: An Cluster object representing the state and configuration of the cluster.
Return type: - cluster_configuration (
-
delete
(id: str, keep_logs: bool = False)[source]¶ Delete a cluster.
Parameters: Returns: True if the deletion process was successful.
Return type:
-
get
(id: str)[source]¶ Get details about the state of a cluster.
Parameters: id ( str
) – the id of the cluster to get.Returns: A Cluster object representing the state and configuration of the cluster. Return type: aztk.spark.models.Cluster
-
list
()[source]¶ List all clusters.
Returns: - List of Cluster objects each representing the state
- and configuration of the cluster.
Return type: List[aztk.spark.models.Cluster]
-
submit
(id: str, application: aztk.spark.models.models.ApplicationConfiguration, remote: bool = False, wait: bool = False, internal: bool = False)[source]¶ Submit an application to a cluster.
Parameters: - id (
str
) – the id of the cluster to submit the application to. - application (
aztk.spark.models.ApplicationConfiguration
) – Application definition - remote (
bool
) – If True, the application file will not be uploaded, it is assumed to be reachable by the cluster already. This is useful when your application is stored in a mounted Azure File Share and not the client. Defaults to False. - internal (
bool
) – if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. This only applies if the cluster’s SchedulingTarget is not set to SchedulingTarget.Any. Defaults to False. - wait (
bool
, optional) – If True, this function blocks until the application has completed. Defaults to False.
Returns: - id (
-
create_user
(id: str, username: str, password: str = None, ssh_key: str = None)[source]¶ Create a user on every node in the cluster
Parameters: - username (
str
) – name of the user to create. - pool_id (
str
) – id of the cluster to create the user on. - ssh_key (
str
, optional) – ssh public key to create the user with, must use ssh_key or password. Defaults to None. - password (
str
, optional) – password for the user, must use ssh_key or password. Defaults to None.
Returns: - username (
-
get_application_state
(id: str, application_name: str)[source]¶ Get the state of a submitted application
Parameters: Returns: the state of the application
Return type:
-
list_applications
(id: str)[source]¶ Get all tasks that have been submitted to the cluster
Parameters: id ( str
) – the name of the cluster the tasks belong toReturns: list of aztk applications Return type: [aztk.spark.models.Application]
-
run
(id: str, command: str, host=False, internal: bool = False, timeout=None)[source]¶ Run a bash command on every node in the cluster
Parameters: - id (
str
) – the id of the cluster to run the command on. - command (
str
) – the bash command to execute on the node. - internal (
bool
) – if true, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. - container_name=None (
str
, optional) – the name of the container to run the command in. If None, the command will run on the host VM. Defaults to None. - timeout=None (
str
, optional) – The timeout in seconds for establishing a connection to the node. Defaults to None.
Returns: list of NodeOutput objects containing the output of the run command
Return type: List[aztk.spark.models.NodeOutput]
- id (
-
node_run
(id: str, node_id: str, command: str, host=False, internal: bool = False, timeout=None, block=True)[source]¶ Run a bash command on the given node
Parameters: - id (
str
) – the id of the cluster to run the command on. - node_id (
str
) – the id of the node in the cluster to run the command on. - command (
str
) – the bash command to execute on the node. - internal (
bool
) – if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. - container_name=None (
str
, optional) – the name of the container to run the command in. If None, the command will run on the host VM. Defaults to None. - timeout=None (
str
, optional) – The timeout in seconds for establishing a connection to the node. Defaults to None. - block=True (
bool
, optional) – If True, the command blocks until execution is complete.
Returns: object containing the output of the run command
Return type: aztk.spark.models.NodeOutput
- id (
-
copy
(id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout: int = None)[source]¶ Copy a file to every node in a cluster.
Parameters: - id (
str
) – the id of the cluster to copy files with. - source_path (
str
) – the local path of the file to copy. - destination_path (
str
, optional) – the path on each node the file is copied to. - container_name (
str
, optional) – the name of the container to copy to or from. If None, the copy operation will occur on the host VM, Defaults to None. - internal (
bool
, optional) – if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. - timeout (
int
, optional) – The timeout in seconds for establishing a connection to the node. Defaults to None.
Returns: A list of NodeOutput objects representing the output of the copy command.
Return type: List[aztk.spark.models.NodeOutput]
- id (
-
download
(id: str, source_path: str, destination_path: str = None, host: bool = False, internal: bool = False, timeout: int = None)[source]¶ Download a file from every node in a cluster.
Parameters: - id (
str
) – the id of the cluster to copy files with. - source_path (
str
) – the path of the file to copy from. - destination_path (
str
, optional) – the local directory path where the output should be written. If None, a SpooledTemporaryFile will be returned in the NodeOutput object, else the file will be written to this path. Defaults to None. - container_name (
str
, optional) – the name of the container to copy to or from. If None, the copy operation will occur on the host VM, Defaults to None. - internal (
bool
, optional) – if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. - timeout (
int
, optional) – The timeout in seconds for establishing a connection to the node. Defaults to None.
Returns: A list of NodeOutput objects representing the output of the copy command.
Return type: List[aztk.spark.models.NodeOutput]
- id (
-
diagnostics
(id, output_directory: str = None, brief: bool = False)[source]¶ Download a file from every node in a cluster.
Parameters: Returns: A list of NodeOutput objects representing the output of the copy command.
Return type: List[aztk.spark.models.NodeOutput]
-
get_application_log
(id: str, application_name: str, tail=False, current_bytes: int = 0)[source]¶ Get the log for a running or completed application
Parameters: - id (
str
) – the id of the cluster to run the command on. - application_name (
str
) – str - tail (
bool
, optional) – If True, get the remaining bytes after current_bytes. Otherwise, the whole log will be retrieved. Only use this if streaming the log as it is being written. Defaults to False. - current_bytes (
int
) – Specifies the last seen byte, so only the bytes after current_bytes are retrieved. Only useful is streaming the log as it is being written. Only used if tail is True.
Returns: a model representing the output of the application.
Return type: - id (
-
get_remote_login_settings
(id: str, node_id: str)[source]¶ Get the remote login information for a node in a cluster
Parameters: Returns: Object that contains the ip address and port combination to login to a node
Return type:
-
wait
(id: str, application_name: str)[source]¶ Wait until the application has completed
Parameters: Returns:
-
get_configuration
(id: str)[source]¶ Get the initial configuration of the cluster
Parameters: id ( str
) – the id of the clusterReturns: aztk.spark.models.ClusterConfiguration
-
ssh_into_master
(id, username, ssh_key=None, password=None, port_forward_list=None, internal=False)[source]¶ Open an SSH tunnel to the Spark master node and forward the specified ports
Parameters: - id (
str
) – the id of the cluster - username (
str
) – the name of the user to open the ssh session with - ssh_key (
str
, optional) – the ssh_key to authenticate the ssh user with. Must specify either ssh_key or password. - password (
str
, optional) – the password to authenticate the ssh user with. Must specify either password or ssh_key. - port_forward_list (
aztk.spark.models.PortForwardingSpecification
, optional) – List of the ports to forward. - internal (
str
, optional) – if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False.
- id (
-
-
class
aztk.spark.client.job.
JobOperations
(context)[source]¶ Bases:
aztk.spark.client.base.operations.SparkBaseOperations
Spark ClusterOperations object
-
_core_job_operations
¶ aztk.client.cluster.CoreJobOperations
-
list
()[source]¶ List all jobs.
Returns: List of aztk.models.Job objects each representing the state and configuration of the job. Return type: List[Job]
-
delete
(id, keep_logs: bool = False)[source]¶ Delete a job.
Parameters: Returns: True if the deletion process was successful.
Return type:
-
get
(id)[source]¶ Get details about the state of a job.
Parameters: id ( str
) – the id of the job to get.Returns: A job object representing the state and configuration of the job. Return type: aztk.spark.models.job
-
get_application
(id, application_name)[source]¶ Get information on a submitted application
Parameters: Returns: object representing that state and output of an application
Return type:
-
get_application_log
(id, application_name)[source]¶ Get the log for a running or completed application
Parameters: Returns: a model representing the output of the application.
Return type:
-
list_applications
(id)[source]¶ List all application defined as a part of a job
Parameters: id ( str
) – the id of the job to list the applications ofReturns: a list of all applications defined as a part of the job Return type: List[aztk.spark.models.Application]
-
stop_application
(id, application_name)[source]¶ Stops a submitted application
Parameters: Returns: True if the stop was successful, else False
Return type:
-
submit
(job_configuration: aztk.spark.models.models.JobConfiguration, wait: bool = False)[source]¶ Submit a job
Jobs are a cluster definition and one or many application definitions which run on the cluster. The job’s cluster will be allocated and configured, then the applications will be executed with their output stored in Azure Storage. When all applications have completed, the cluster will be automatically deleted.
Parameters: - job_configuration (
aztk.spark.models.JobConfiguration
) – Model defining the job’s configuration. - wait (
bool
) – If True, blocks until job is completed. Defaults to False.
Returns: Model representing the state of the job.
Return type: - job_configuration (
-
aztk.client module¶
-
class
aztk.client.
CoreClient
[source]¶ Bases:
object
The base AZTK client that all other clients inherit from.
This client should not be used directly. Only software specific clients should be used.
-
class
aztk.client.base.
BaseOperations
(context)[source]¶ Bases:
object
Base operations that all other operations have as an attribute
-
batch_client
¶ azure.batch.batch_service_client.BatchServiceClient
– Client used to interact with the Azure Batch service.
-
blob_client
¶ azure.storage.blob.BlockBlobService
– Client used to interact with the Azure Storage Blob service.
-
secrets_configuration
¶ aztk.models.SecretsConfiguration
– Model that holds AZTK secrets used to authenticate with Azure and the clusters.
-
get_cluster_configuration
(id: str) → aztk.models.cluster_configuration.ClusterConfiguration[source]¶ Open an ssh tunnel to a node
Parameters: - id (
str
) – the id of the cluster the node is in - node_id (
str
) – the id of the node to open the ssh tunnel to - username (
str
) – the username to authenticate the ssh session - ssh_key (
str
, optional) – ssh public key to create the user with, must use ssh_key or password. Defaults to None. - password (
str
, optional) – password for the user, must use ssh_key or password. Defaults to None. - port_forward_list (
List[PortForwardingSpecification
, optional) – list of PortForwardingSpecifications. The defined ports will be forwarded to the client. - internal (
bool
, optional) – if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False.
Returns: Object representing the cluster’s configuration
Return type: - id (
-
get_cluster_data
(id: str) → aztk.internal.cluster_data.cluster_data.ClusterData[source]¶ Gets the ClusterData object to manage data related to the given cluster
Parameters: id ( str
) – the id of the cluster to getReturns: Object used to manage the data and storage functions for a cluster Return type: aztk.models.ClusterData
-
ssh_into_node
(id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False)[source]¶ Open an ssh tunnel to a node
Parameters: - id (
str
) – the id of the cluster the node is in - node_id (
str
) – the id of the node to open the ssh tunnel to - username (
str
) – the username to authenticate the ssh session - ssh_key (
str
, optional) – ssh public key to create the user with, must use ssh_key or password. Defaults to None. - password (
str
, optional) – password for the user, must use ssh_key or password. Defaults to None. - port_forward_list (
List[PortForwardingSpecification
, optional) – list of PortForwardingSpecifications. The defined ports will be forwarded to the client. - internal (
bool
, optional) – if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False.
Returns: - id (
-
create_user_on_node
(id, node_id, username, ssh_key=None, password=None)[source]¶ Create a user on a node
Parameters: - id (
str
) – id of the cluster to create the user on. - node_id (
str
) – id of the node in the cluster to create the user on. - username (
str
) – name of the user to create. - ssh_key (
str
, optional) – ssh public key to create the user with, must use ssh_key or password. - password (
str
, optional) – password for the user, must use ssh_key or password.
Returns: - id (
-
create_user_on_cluster
(id, nodes, username, ssh_pub_key=None, password=None)[source]¶ Create a user on every node in the cluster
Parameters: - username (
str
) – name of the user to create. - id (
str
) – id of the cluster to create the user on. - nodes (
List[ComputeNode]
) – list of nodes to create the user on - ssh_key (
str
, optional) – ssh public key to create the user with, must use ssh_key or password. Defaults to None. - password (
str
, optional) – password for the user, must use ssh_key or password. Defaults to None.
Returns: - username (
-
generate_user_on_node
(id, node_id)[source]¶ Create a user with an autogenerated username and ssh_key on the given node.
Parameters: Returns: A tuple of the form (username:
str
, ssh_key:Cryptodome.PublicKey.RSA
)Return type:
-
generate_user_on_cluster
(id, nodes)[source]¶ Create a user with an autogenerated username and ssh_key on the cluster
Parameters: Returns: A tuple of the form (username:
str
, ssh_key:Cryptodome.PublicKey.RSA
)Return type:
-
delete_user_on_node
(id: str, node_id: str, username: str) → str[source]¶ Delete a user on a node
Parameters: Returns:
-
delete_user_on_cluster
(username, id, nodes)[source]¶ Delete a user on every node in the cluster
Parameters: Returns:
-
node_run
(id, node_id, command, internal, container_name=None, timeout=None, block=True)[source]¶ Run a bash command on the given node
Parameters: - id (
str
) – the id of the cluster to run the command on. - node_id (
str
) – the id of the node in the cluster to run the command on. - command (
str
) – the bash command to execute on the node. - internal (
bool
) – if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. - container_name=None (
str
, optional) – the name of the container to run the command in. If None, the command will run on the host VM. Defaults to None. - timeout=None (
str
, optional) – The timeout in seconds for establishing a connection to the node. Defaults to None. - block=True (
bool
, optional) – If True, the command blocks until execution is complete.
Returns: object containing the output of the run command
Return type: - id (
-
get_remote_login_settings
(id: str, node_id: str)[source]¶ Get the remote login information for a node in a cluster
Parameters: Returns: Object that contains the ip address and port combination to login to a node
Return type:
-
run
(id, command, internal, container_name=None, timeout=None)[source]¶ Run a bash command on every node in the cluster
Parameters: - id (
str
) – the id of the cluster to run the command on. - command (
str
) – the bash command to execute on the node. - internal (
bool
) – if true, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. - container_name=None (
str
, optional) – the name of the container to run the command in. If None, the command will run on the host VM. Defaults to None. - timeout=None (
str
, optional) – The timeout in seconds for establishing a connection to the node. Defaults to None.
Returns: list of NodeOutput objects containing the output of the run command
Return type: List[azkt.models.NodeOutput]
- id (
-
get_application_log
(id: str, application_name: str, tail=False, current_bytes: int = 0)[source]¶ Get the log for a running or completed application
Parameters: - id (
str
) – the id of the cluster to run the command on. - application_name (
str
) – str - tail (
bool
, optional) – If True, get the remaining bytes after current_bytes. Otherwise, the whole log will be retrieved. Only use this if streaming the log as it is being written. Defaults to False. - current_bytes (
int
) – Specifies the last seen byte, so only the bytes after current_bytes are retrieved. Only useful is streaming the log as it is being written. Only used if tail is True.
Returns: a model representing the output of the application.
Return type: - id (
-
create_task_table
(id: str)[source]¶ Create an Azure Table Storage to track tasks
Parameters: id ( str
) – the id of the cluster
-
list_task_table_entries
(id)[source]¶ list tasks in a storage table
Parameters: id ( str
) – the id of the clusterReturns: a list of models representing all entries in the Task table Return type: [aztk.models.Task]
-
get_task_from_table
(id, task_id)[source]¶ Create a storage table to track tasks
Parameters: id ( str
) – the id of the clusterReturns: the task with id task_id from the cluster’s storage table Return type: [aztk.models.Task]
-
insert_task_into_task_table
(id, task)[source]¶ Insert a task into the table
Parameters: id ( str
) – the id of the clusterReturns: a model representing an entry in the Task table Return type: aztk.models.Task
-
update_task_in_task_table
(id, task)[source]¶ Update a task in the table
Parameters: id ( str
) – the id of the clusterReturns: a model representing an entry in the Task table Return type: aztk.models.Task
-
delete_task_table
(id)[source]¶ Delete the table that tracks tasks
Parameters: id ( str
) – the id of the clusterReturns: if True, the deletion was successful Return type: bool
-
list_tasks
(id)[source]¶ list tasks in a storage table
Parameters: id ( str
) – the id of the clusterReturns: a list of models representing all entries in the Task table Return type: [aztk.models.Task]
-
get_recent_job
(id)[source]¶ Get the most recently run job in an Azure Batch job schedule
Parameters: id ( str
) – the id of the job scheduleReturns: the most recently run job on the job schedule Return type: [azure.batch.models.Job]
-
get_task_state
(id: str, task_name: str)[source]¶ Get the status of a submitted task
Parameters: Returns: the status state of the task
Return type:
-
list_batch_tasks
(id: str)[source]¶ Get the status of a submitted task
Parameters: id ( str
) – the name of the cluster the task was submitted toReturns: list of aztk tasks Return type: [aztk.models.Task]
-
-
class
aztk.client.cluster.
CoreClusterOperations
(context)[source]¶ Bases:
aztk.client.base.base_operations.BaseOperations
-
create
(cluster_configuration: aztk.models.cluster_configuration.ClusterConfiguration, software_metadata_key: str, start_task, vm_image_model)[source]¶ Create a cluster.
Parameters: - cluster_configuration (
aztk.models.ClusterConfiguration
) – Configuration for the cluster to be created - software_metadata_key (
str
) – the key for the primary software that will be run on the cluster - start_task (
azure.batch.models.StartTask
) – Batch StartTask defintion to configure the Batch Pool - vm_image_model (
azure.batch.models.VirtualMachineConfiguration
) – Configuration of the virtual machine image and settings
Returns: A Cluster object representing the state and configuration of the cluster.
Return type: - cluster_configuration (
-
get
(id: str)[source]¶ Get the state and configuration of a cluster
Parameters: id ( str
) – the id of the cluster to get.Returns: A Cluster object representing the state and configuration of the cluster. Return type: aztk.models.Cluster
-
copy
(id, source_path, destination_path=None, container_name=None, internal=False, get=False, timeout=None)[source]¶ Copy files to or from every node in a cluster.
Parameters: - id (
str
) – the id of the cluster to copy files with. - source_path (
str
) – the path of the file to copy from. - destination_path (
str
, optional) – the local directory path where the output should be written. If None, a SpooledTemporaryFile will be returned in the NodeOutput object, else the file will be written to this path. Defaults to None. - container_name (
str
, optional) – the name of the container to copy to or from. If None, the copy operation will occur on the host VM, Defaults to None. - internal (
bool
, optional) – if True, this will connect to the node using its internal IP. Only use this if running within the same VNET as the cluster. Defaults to False. - get (
bool
, optional) – If True, the file are downloaded from every node in the cluster. Else, the file is copied from the client to the node. Defaults to False. - timeout (
int
, optional) – The timeout in seconds for establishing a connection to the node. Defaults to None.
Returns: A list of NodeOutput objects representing the output of the copy command.
Return type: List[aztk.models.NodeOutput]
- id (
-
delete
(id: str, keep_logs: bool = False)[source]¶ Copy files to or from every node in a cluster.
Parameters: Returns: A list of NodeOutput objects representing the output of the copy command.
Return type: List[aztk.models.NodeOutput]
-
list
(software_metadata_key)[source]¶ List clusters running the specified software.
Parameters: software_metadata_key ( str
) – the key of the primary softare running on the cluster. This filters out non-aztk clusters and aztk clusters running other software.Returns: list of clusters running the software defined by software_metadata_key Return type: List[aztk.models.Cluster]
-
-
class
aztk.client.job.
CoreJobOperations
(context)[source]¶ Bases:
aztk.client.base.base_operations.BaseOperations
-
submit
(job_configuration, start_task, job_manager_task, autoscale_formula, software_metadata_key: str, vm_image_model, application_metadata)[source]¶ Submit a job
Jobs are a cluster definition and one or many application definitions which run on the cluster. The job’s cluster will be allocated and configured, then the applications will be executed with their output stored in Azure Storage. When all applications have completed, the cluster will be automatically deleted.
Parameters: - job_configuration (
aztk.models.JobConfiguration
) – Model defining the job’s configuration. - start_task (
azure.batch.models.StartTask
) – Batch StartTask defintion to configure the Batch Pool - job_manager_task (
azure.batch.models.JobManagerTask
) – Batch JobManagerTask defintion to schedule the defined applications on the cluster. - autoscale_formula (
str
) – formula that defines the numbers of nodes allocated to the cluster. - software_metadata_key (
str
) – the key of the primary softare running on the cluster. - vm_image_model –
- application_metadata (
List[str]
) – list of the names of all applications that will be run as a part of the job
Returns: Model representing the Azure Batch JobSchedule state.
Return type: azure.batch.models.CloudJobSchedule
- job_configuration (
-
aztk.error module¶
Contains all errors used in Aztk. All error should inherit from AztkError
-
exception
aztk.error.
AztkAttributeError
[source]¶ Bases:
aztk.error.AztkError
-
exception
aztk.error.
ClusterNotReadyError
[source]¶ Bases:
aztk.error.AztkError
-
exception
aztk.error.
AzureApiInitError
[source]¶ Bases:
aztk.error.AztkError
-
exception
aztk.error.
InvalidPluginConfigurationError
[source]¶ Bases:
aztk.error.AztkError
-
exception
aztk.error.
InvalidModelError
(message: str, model=None)[source]¶ Bases:
aztk.error.AztkError
-
exception
aztk.error.
MissingRequiredAttributeError
(message: str, model=None)[source]¶ Bases:
aztk.error.InvalidModelError
-
exception
aztk.error.
InvalidPluginReferenceError
(message: str, model=None)[source]¶ Bases:
aztk.error.InvalidModelError
-
exception
aztk.error.
InvalidModelFieldError
(message: str, model=None, field=None)[source]¶ Bases:
aztk.error.InvalidModelError
Writing docs¶
Docs are located in the docs folder. We are using sphinx
to generate the docs and then hosting them on readthedocs
.
Start docs autobuild to test locally¶
sphinx-autobuild docs docs/_build/html --watch aztk
Open docs/_build/index.html
Publish the docs¶
Docs should be published automatically to read the docs as soon as you push to master under the latest
tag.
You when creating a git tag readthedocs can also build that one.
Writing a model¶
Getting started¶
In aztk/models
create a new file with the name of your model my_model.py
In aztk/models/__init__.py
add from .my_model import MyModel
Create a new class MyModel
that inherit Modle
from aztk.core.models import Model, fields
class MyModel(Model):
"""
MyModel is an sample model
Args:
input1 (str): This is the first input
"""
input1 = fields.String()
def __validate__(self):
pass
Available fields types¶
Check aztk/core/models/fields.py
for the sources
Field
: Base field classString
: Field that validate it is given a stringInteger
: Field that validate it is given a intFloat
: Field that validate it is given a floatBoolean
: Field that validate it is given a booleanList
: Field that validate it is given a list and can also automatically convert entries to the given model type.Model
: Field that map to another model. If passed a dict it will automatically try to convert to the Model typeEnum
: Field which value should be an enum. It will convert automatically to the enum if given the value.
Add validation¶
The fields provide basic validation automatically. A field without a default will be marked as required.
To provide model wide validation implement a __validate__
method and raise a InvalidModelError
if there is any problems with the values
def __validate__(self):
if 'secret' in self.input1:
raise InvalidModelError("Input1 contains secrets")
Convert dict to model¶
When inheriting from Model
it comes with a from_dict
class method which allows to convert a dict to this class
Tests¶
AZTK comes with a testing library that can be used for verification, and debugging. Please note that some tests will provision and test real resources in Azure, and as a result, will cost money to run. See Integration Tests for more details.
Integration Tests¶
Integration tests use the credentials given in your .aztk/secrets.yaml
file to spin up real Clusters and Jobs to verify the functionality of the library. Please note that these tests will cost money to run. All created Clusters nad Jobs will be deleted when the test completes.
Since each integration test spins up a Cluster or Job, you may want to run the tests in parallel to reduce the time needed to complete the testing library:
pytest $path_to_repo_root -n <5>
Note: $path_to_repo_root represents the path to the root of the aztk repository, and is only required if you are running the tests from a different location.
Please note that the number passed to the -n
flag determines the number of tests you wish to run in parallel. Parallelizing the tests will increase the number of CPU cores used at one time, so please verify that you have the available core quota in your Batch account.