Chad Vader, Shift Manager

shiftmanager

Admin tools for Amazon Redshift.

Travis CI build status

Installation

Install shiftmanager from PyPI:

pip install shiftmanager

Basic Usage

Get started by creating a Redshift instance with your cluster details:

from shiftmanager import Redshift
redshift = Redshift(host='myhost', user='myuser', password='mypass')

Or provide connection parameters via environment variables:

# Assumes PGHOST, PGUSER, and PGPASSWORD are set.
from shiftmanager import Redshift
redshift = Redshift()

A database connection will be established the first time it’s needed and persisted for the length of the session as Redshift.connection:

>>> statement = redshift.alter_user('chad', wlm_query_slot_count=2)
Connecting to myhost...

Methods that generate database commands will return a SQL string. You can review the statement and execute the changes in an additional step:

>>> statement = redshift.alter_user('chad', wlm_query_slot_count=2)
>>> print(statement)
ALTER USER chad SET wlm_query_slot_count = 2
>>> redshift.execute(statement)

Or execute the statement within the method call by specifying the execute keyword argument:

redshift.alter_user('chad', wlm_query_slot_count=2, execute=True)

In some cases, the returned SQL might not be a single statement but rather a batch of multiple statements. To provide some safety in these cases, the execute method (whether invoked explicitly or through a keyword argument) always initiates a transaction, performing a rollback if any statement produces an error.

You can use a Redshift instance within a larger script, or you can use shiftmanager as a command-line tool for one-off admin tasks. If you want to make jumping into shiftmanager as quick as possible, see Configuring shiftmanager For Your Environment.

Creating Users

Easily generate strong passwords with random_password and create new user accounts with create_user:

password = redshift.random_password()
# Create a new superuser account
statement = redshift.create_user('newuser', password, createuser=True)

To modify existing accounts, use alter_user.

Schema Reflection, Deep Copies, Deduping, and Migrations

shiftmanager provides several features that reflect existing schema structure from your cluster, powered by sqlalchemy-redshift, a Redshift dialect for SQLAlchemy.

Use table_definition as a pg_dump replacement that understands Redshift-specific structure like distkeys, sortkeys, and compression encodings:

>>> batch = redshift.table_definition('my_table', schema='my_schema')
>>> print(batch)
CREATE TABLE my_schema.my_table (
        id CHAR(36) ENCODE lzo,
        email_address VARCHAR(256) ENCODE raw
) DISTSTYLE KEY DISTKEY (id) SORTKEY (id)

;
ALTER TABLE my_schema.my_table OWNER TO chad;
GRANT ALL ON my_schema.my_table TO clarissa

Reflecting table structure can be particularly useful when performing deep copies. Amazon’s documentation on deep copies suggests four potential strategies, but advises:

Use the original table DDL. If the CREATE TABLE DDL is available, this is the best method.

The deep_copy method codifies this best practice, using table_definition behind the scenes to recreate the relevant DDL:

>>> batch = redshift.deep_copy('my_table', schema='my_schema')
>>> print(batch)
LOCK TABLE my_schema.my_table;
ALTER TABLE my_schema.my_table RENAME TO my_table$outgoing;

CREATE TABLE my_schema.my_table (
        id CHAR(36) ENCODE lzo,
        email_address VARCHAR(256) ENCODE raw
) DISTSTYLE KEY DISTKEY (id) SORTKEY (id)

;
ALTER TABLE my_schema.my_table OWNER TO chad;
GRANT ALL ON my_schema.my_table TO clarissa

INSERT INTO my_schema.my_table SELECT * from my_schema.my_table$outgoing;
DROP TABLE my_schema.my_table$outgoing

To remove duplicate records while recreating the table, pass in the distinct=True keyword argument. If you have rows you consider duplicates despite some difference in their values, you can use the deduplicate_partition_by option. For example, say that rows include a message_id column that should be unique, but also a when_recorded column set when the record is ingested. To retain only the most recently ingested row for each unique id, call:

deep_copy('my_table',
          deduplicate_partition_by='message_id',
          deduplicate_order_by='when_recorded DESC NULLS LAST')

When using deduplicate_partition_by, only the first row returned for any given value of the partitioning columns is retained. It’s strongly suggested that you supply a value for deduplicate_order_by to determine how that initial row is chosen.

deep_copy can also be used to migrate an existing table to a new structure, providing a convenient way to alter distkeys, sortkeys, and column encodings. Additional keyword arguments will be passed to the reflected_table method, altering the reflected Table:

>>> kwargs = dict(redshift_distkey='email_address', redshift_sortkey=('email_address', 'id'))
>>> batch = redshift.deep_copy('my_table', schema='my_schema', **kwarg)
>>> print(batch)
LOCK TABLE my_schema.my_table;
ALTER TABLE my_schema.my_table RENAME TO my_table$outgoing;

CREATE TABLE my_schema.my_table (
        id CHAR(36) ENCODE lzo,
        email_address VARCHAR(256) ENCODE raw
) DISTSTYLE KEY DISTKEY (email_address) SORTKEY (email_address, id);

ALTER TABLE my_schema.my_table OWNER TO chad;
GRANT ALL ON my_schema.my_table TO clarissa;

INSERT INTO my_schema.my_table SELECT * from my_schema.my_table$outgoing;
DROP TABLE my_schema.my_table$outgoing;

If you pass analyze_compression=True to deep_copy, compression encodings will be updated in the resultant table based on results of running ANALYZE COMPRESSION to determine optimal encodings for the existing data.

Copy JSON to Redshift

To be written. See copy_json_to_table.

Configuring shiftmanager For Your Environment

If you use shiftmanager as a command line interface for administering Redshift, it can be inconvenient to type in cluster details every time you open a new session. We recommend writing a short setup script as a ~/.shiftmanager.py file or the like:

from shiftmanager import Redshift

# We're assuming PGPASSWORD and PGUSER are set here.
# You might want to pull those in from an encrypted file.
dev = Redshift(host="my-dev-host")
prod = Redshift(host="my-prod-host")

You can then invoke your script interactively like:

$ ipython -i ~/.shiftmanager.py

And have immediate access to the objects you set up.

To make this super convenient, add an alias to your .bashrc or the like:

alias shiftmanager="ipython -i ~/.shiftmanager.py"

Acknowledgments

Thanks to Blame Society Productions for letting us use a screenshot from Chad Vader: Day Shift Manager as our banner image.

API

Contents:

Redshift API

class shiftmanager.Redshift(database=None, user=None, password=None, host=None, port=5439, aws_access_key_id=None, aws_secret_access_key=None, security_token=None, **kwargs)

Interface to Redshift.

This class will default to environment params for all arguments.

For methods requiring S3, aws keys are not required if you have environmental params set for boto to pick up: http://boto.readthedocs.org/en/latest/s3_tut.html#creating-a-connection

Parameters:
  • database (str) – envvar equivalent: PGDATABASE
  • user (str) – envvar equivalent: PGUSER
  • password (str) – envvar equivalent: PGPASSWORD
  • host (str) – envvar equivalent: PGHOST
  • port (int) – envvar equivalent: PGPORT
  • aws_access_key_id (str) – envvar equivalent: AWS_ACCESS_KEY_ID
  • aws_secret_access_key (str) – envvar equivalent: AWS_SECRET_ACCESS_KEY
  • security_token (str) – envvar equivalent: AWS_SECURITY_TOKEN or AWS_SESSION_TOKEN
  • kwargs (dict) – Additional keyword arguments sent to psycopg2.connect
alter_user(name, password=None, valid_until=None, createdb=None, createuser=None, rename=None, execute=False, **parameters)

Return a SQL str that alters an existing user.

Parameters:
  • name (str) – The name of the user account to create.
  • password (str) – The password for the new user.
  • valid_until (str or datetime) – An absolute time after which the user account password is no longer valid.
  • createdb (boolean) – Allow the new user account to create databases.
  • createuser (boolean) – Create a superuser with all database privileges.
  • rename (str) – New name to assign the user.
  • execute (boolean) – Execute the command in addition to returning it.
  • parameters – Additional keyword arguments are interpreted as configuration parameters whose values will be set by additional ALTER USER statements added to the batch. For values set to None, the parameter will be reset, letting system defaults take effect.
aws_credentials
static chunked_json_slices(*args, **kwds)

Given an iterator of dicts, chunk them into slices and write to temp files on disk. Clean up when leaving scope.

Parameters:
  • data (iter of dicts) – Iterable of dictionaries to be serialized to chunks
  • slices (int) – Number of chunks to generate
  • dir (str) – Dir to write chunks to. Will default to $HOME/.shiftmanager/tmp/
  • clean_on_exit (bool, default True) – Clean up chunks on disk when context exits
Returns:

  • stamp (str) – Timestamp that prepends the filenames of chunks written to disc
  • chunk_files (list) – List of filenames

connection

A psycopg2.connect connection to Redshift.

Instantiation is delayed until the object is first used.

copy_json_to_table(*args, **kwargs)

Given a list of JSON-able dicts, COPY them to the given table_name

This function will partition the blobs into slices number of files, write them to the s3 bucket, write the jsonpaths file, COPY them to the table, then optionally clean up everything in the bucket.

Parameters:
  • bucket (str) – S3 bucket for writes
  • keypath (str) – S3 key path for writes
  • data (iterable of dicts) – Iterable of JSON-able dicts
  • jsonpaths (dict) – Redshift jsonpaths file. If None, will autogenerate with alphabetical order
  • table (str) – Table name for COPY
  • slices (int) – Number of slices in your cluster. This many files will be generated on S3 for efficient COPY.
  • clean_up_s3 (bool) – Clean up S3 bucket after COPY completes
  • local_path (str) – Local path to write chunked JSON. Defaults to $HOME/.shiftmanager/tmp/
  • clean_up_local (bool) – Clean up local chunked JSON after COPY completes.
copy_table_to_redshift(redshift_table_name, bucket_name, key_prefix, pg_table_name=None, pg_select_statement=None, temp_file_dir=None, cleanup_s3=True, delete_statement=None, manifest_max_keys=None, line_bytes=104857600, canned_acl=None)

Writes the contents of a Postgres table to Redshift.

The approach here attempts to maximize speed and minimize local disk usage. The fastest method of extracting data from Postgres is the COPY command, which we use here, but pipe the output to the split and gzip shell utilities to create a series of compressed files. As files are created, a separate thread uploads them to S3 and removes them from local disk.

Due to the use of external shell utilities, this function can only run on an operating system with GNU core-utils installed (available by default on Linux, and via homebrew on MacOS).

Parameters:
  • redshift_table_name (str) – Redshift table to which json files are to be written
  • bucket_name (str) – The name of the S3 bucket to be written to
  • key_prefix (str) – The key path within the bucket to write to
  • pg_table_name (str) – Optional Postgres table name to be written to json if user does not want to specify subset
  • pg_select_statement (str) – Optional select statement if user wants to specify subset of table
  • temp_file_dir (str) – Optional Specify location of temporary files
  • cleanup_s3 (bool) – Optional Clean up S3 location on failure. Defaults to True.
  • delete_statement (str or None) – When not None, this statement will be run in the same transaction as the (final) COPY statement. This is useful when you want to clean up a previous backfill at the same time as issuing a new backfill.
  • manifest_max_keys (int or None) – If None, all S3 keys will be sent to Redshift in a single COPY transaction. Otherwise, this parameter sets an upper limit on the number of S3 keys included in the COPY manifest. If more keys were produced, then additional COPY statements will be issued. This is useful for particularly large loads that may timeout in a single transaction.
  • line_bytes (int) – The maximum number of bytes to write to a single file (before compression); defaults to 100 MB
  • canned_acl (str) – A canned ACL to apply to objects uploaded to S3
copy_table_to_s3(bucket_name, key_prefix, pg_table_name=None, pg_select_statement=None, temp_file_dir=None, cleanup_s3=True, line_bytes=104857600, canned_acl=None)

Writes the contents of a Postgres table to S3.

The approach here attempts to maximize speed and minimize local disk usage. The fastest method of extracting data from Postgres is the COPY command, which we use here, but pipe the output to the split and gzip shell utilities to create a series of compressed files. As files are created, a separate thread uploads them to S3 and removes them from local disk.

Due to the use of external shell utilities, this function can only run on an operating system with GNU core-utils installed (available by default on Linux, and via homebrew on MacOS).

Parameters:
  • bucket_name (str) – The name of the S3 bucket to be written to
  • key_prefix (str) – The key path within the bucket to write to
  • pg_table_name (str) – Optional Postgres table name to be written to json if user does not want to specify subset
  • pg_select_statement (str) – Optional select statement if user wants to specify subset of table
  • temp_file_dir (str) – Optional Specify location of temporary files
  • cleanup_s3 (bool) – Optional Clean up S3 location on failure. Defaults to True.
  • line_bytes (int) – The maximum number of bytes to write to a single file (before compression); defaults to 100 MB
  • canned_acl (str) – A canned ACL to apply to objects uploaded to S3
Returns:

Return type:

(Final key prefix, List of S3 keys)

create_pg_connection(**kwargs)

Create a psycopg2.connect connection to Redshift.

See https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS for supported parameters.

create_user(name, password, valid_until=None, createdb=False, createuser=False, groups=None, execute=False, **parameters)

Return a SQL str defining a new user.

Parameters:
  • name (str) – The name of the user account to create.
  • password (str) – The password for the new user.
  • valid_until (str or datetime) – An absolute time after which the user account password is no longer valid.
  • createdb (boolean) – Allow the new user account to create databases.
  • createuser (boolean) – Create a superuser with all database privileges.
  • groups (list of str) – Existing groups that the user will belong to.
  • execute (boolean) – Execute the command in addition to returning it.
  • parameters – Additional keyword arguments are interpreted as configuration parameters whose values will be set by additional ALTER USER statements added to the batch.
deep_copy(table, schema=None, copy_privileges=True, use_cache=True, cascade=False, distinct=False, analyze_compression=False, analyze=True, deduplicate_partition_by=None, deduplicate_order_by=None, execute=False, **kwargs)

Return a SQL str defining a deep copy of table.

This method can be used to simply sort and clean an unvacuumable table, or it can be used to migrate to a revised table structure. You can use the reflected_table method with overrides to generate a new table structure, then pass that revised object in as table.

Parameters:
  • table (str or Table) – The table to reflect
  • schema (str) – The database schema in which to look for table (only used if table is str)
  • copy_privileges (bool) – Reflect ownership and grants on the existing table and include them in the return value
  • use_cache (bool) – Use cached results for the privilege query, if available
  • cascade (bool) – Drop any dependent views when dropping the source table
  • distinct (bool) – Deduplicate the table by adding DISTINCT to the SELECT statement; also see deduplicate_partition_by for more control
  • analyze_compression (bool) – Update the column compression encodings based on results of an ANALYZE COMPRESSION statement on the table.
  • analyze (bool) – Add an ‘ANALYZE table’ command at the end of the batch to update statistics, since this is not done automatically for INSERTs
  • deduplicate_partition_by (str or None) – A string giving a list of columns like ‘col1, col2’ to be passed to ‘ROW_NUMBER() OVER (PARTITION BY {columns})’ so that only the first row for a given set of values will be retained; it’s strongly suggested that you also set deduplicate_order_by so that results are deterministic
  • deduplicate_order_by (str or None) – A string like ‘col3 DESC NULLS LAST, col4 ASC NULLS LAST’ to be passed to the ‘PARTITION BY’ clause for deduplication, with the first row in sort order being the one retained; will be ignored if deduplicate_partition_by is not also set
  • execute (bool) – Execute the command in addition to returning it.
  • kwargs – Additional keyword arguments will be passed unchanged to the reflected_table method.
engine

A sqlalchemy.engine which wraps connection.

execute(batch, parameters=None)

Execute a batch of SQL statements using this instance’s connection.

Statements are executed within a transaction.

Parameters:
  • batch (str) – The batch of SQL statements to execute.
  • parameters (list or dict) – Values to bind to the batch, passed to cursor.execute
static gen_jsonpaths(json_doc, list_idx=None)

Generate Redshift jsonpath file for given JSON document or dict.

If an array is present, you can specify an index to use for that field in the jsonpaths result. Right now only a single index is supported.

Results will be ordered alphabetically by default.

Parameters:
  • json_doc (str or dict) – Dictionary or JSON-able string
  • list_idx (int) – Index for array position
Returns:

Return type:

Dict

get_bucket(*args, **kwargs)

Get boto.s3.bucket. Caches existing buckets.

Parameters:bucket_name (str) –
get_s3_connection(ordinary_calling_fmt=False)

Get new S3 Connection

Parameters:ordinary_calling_fmt (bool) – Initialize connection with OrdinaryCallingFormat
get_table_names(schema=None, **kwargs)

Return a list naming all tables and views defined in schema.

meta

A MetaData instance used for reflection calls.

mogrify(batch, parameters=None, execute=False)
pg_connection

A psycopg2.connect connection to Postgres.

Instantiation is delayed until the object is first used.

pg_execute_and_commit_single_statement(statement)

Execute single Postgres statement

preparer

A Redshift-aware identifier preparer.

static random_password(length=64)

Return a strong password valid for Redshift.

Constraints:

  • 8 to 64 characters in length.
  • Must contain at least one uppercase letter, one lowercase letter, and one number.
  • Can use any printable ASCII characters (ASCII code 33 to 126) except ' (single quote), " (double quote), \, /, @, \` or space.
  • See Redshift’s CREATE USER docs
reflected_privileges(relation, schema=None, use_cache=True)

Return a SQL str which recreates all privileges for relation.

Parameters

relation : str or Table
The table or view to reflect
schema : str
The database schema in which to look for relation (only used if relation is str)
use_cache : bool
Use cached results for the privilege query, if available
reflected_table(name, *args, **kwargs)

Return a Table reflected from the database.

This is simply a convenience method which passes arguments to the Table constructor, so you may override various properties of the existing table. In particular, Redshift-specific attributes like distkey and sorkey can be set through redshift_* keyword arguments (redshift_distkey='col1', redshift_interleaved_sortkey=('col1', 'col2'), etc.)

The return value is suitable input for the table_definition or deep_copy methods, useful for changing the structure of an existing table.

extend_existing is set to True by default.

Notes

See SQLAlchemy’s dcoumentation on Overriding Reflected Columns and sqlalchemy-redshift’s DDLCompiler docs

set_aws_credentials(aws_access_key_id, aws_secret_access_key, security_token=None)

Set AWS credentials. These will be required for any methods that need interaction with S3

Parameters:
  • aws_access_key_id (str) –
  • aws_secret_access_key (str) –
  • security_token (str or None) – Temporary security token (if using creds from STS)
set_aws_role(aws_account_id, aws_role_name)

Set AWS IAM role. This rote will be assumed by the Redshift cluster when reading from S3 during COPY statements. If not set, the access key and secret from set_aws_credentials will be used.

Parameters:
  • aws_account_id (str) –
  • aws_role_name (str) –
table_definition(table, schema=None, copy_privileges=True, use_cache=True, analyze_compression=False)

Return a str containing the necessary SQL statements to recreate table.

Parameters:
  • table (str or Table) – The table to reflect
  • schema (str) – The database schema in which to look for table (only used if table is str)
  • copy_privileges (bool) – Reflect ownership and grants on the existing table and include them in the return value
  • use_cache (bool) – Use cached results for the privilege query, if available
table_exists(table_name)

Check Redshift for whether a table exists.

Parameters:table_name (str) – The name of the table for whose existence we’re checking
Returns:
Return type:boolean
unload_table_to_s3(*args, **kwargs)

Given a table in Redshift, UNLOAD it to S3

Parameters:
  • bucket (str) – S3 bucket for writes
  • keypath (str) – S3 key path for writes
  • table (str) – Table name for UNLOAD
  • schema (str) – Schema that table resides in Defaults to None
  • col_str (str) – Comma separated string of columns to unload Defaults to ‘*’
  • where (str) – SQL where clause string to filter select statement in unload Defaults to None, meaning no WHERE clause is applied
  • to_json (boolean) – Defaults to True
  • options (str) – Additional options to be included in UNLOAD command Defaults to None and the following options are used: - MANIFEST - GZIP - ALLOWOVERWRITE
view_definition(view, schema=None, copy_privileges=True, use_cache=True, execute=False, **kwargs)

Return a SQL str defining view.

Parameters:
  • view (str or Table) – The view to reflect
  • schema (str) – The database schema in which to look for view (only used if view is str)
  • copy_privileges (bool) – Reflect ownership and grants on the existing view and include them in the return value
  • use_cache (bool) – Use cached results for the privilege query, if available
  • execute (bool) – Execute the command in addition to returning it
  • kwargs – Additional keyword arguments will be passed unchanged to get_view_definition()
write_dict_to_key(data, key, close=False)

Given a Boto S3 Key, write a given dict to that key as JSON.

Parameters:
  • data (dict) –
  • key (boto.s3.Key) –
  • close (bool, default False) – Close key after write
write_file_to_s3(f, bucket, s3_key_path)

Given a string chunk that represents a piece of a CSV file, write the chunk to an S3 key.

Parameters:
  • chunk (str) – String blob representing a chunk of a larger CSV
  • bucket (boto.s3.bucket.Bucket) – The bucket to be written to
  • s3_key_path (str) – The key path to write the chunk to
write_filename_to_s3(filename, bucket, s3_key_path)

Given a string chunk that represents a piece of a CSV file, write the chunk to an S3 key.

Parameters:
  • chunk (str) – String blob representing a chunk of a larger CSV
  • bucket (boto.s3.bucket.Bucket) – The bucket to be written to
  • s3_key_path (str) – The key path to write the chunk to
write_string_to_s3(chunk, bucket, s3_key_path, canned_acl=None)

Given a string chunk that represents a piece of a CSV file, write the chunk to an S3 key.

Parameters:
  • chunk (str) – String blob representing a chunk of a larger CSV
  • bucket (boto.s3.bucket.Bucket) – The bucket to be written to
  • s3_key_path (str) – The key path to write the chunk to

Indices and tables