
shiftmanager¶
Admin tools for Amazon Redshift.
Basic Usage¶
Get started by creating a Redshift
instance with your cluster details:
from shiftmanager import Redshift
redshift = Redshift(host='myhost', user='myuser', password='mypass')
Or provide connection parameters via environment variables:
# Assumes PGHOST, PGUSER, and PGPASSWORD are set.
from shiftmanager import Redshift
redshift = Redshift()
A database connection will be established the first time it’s needed
and persisted for the length of the session as Redshift.connection
:
>>> statement = redshift.alter_user('chad', wlm_query_slot_count=2)
Connecting to myhost...
Methods that generate database commands will return a SQL string. You can review the statement and execute the changes in an additional step:
>>> statement = redshift.alter_user('chad', wlm_query_slot_count=2)
>>> print(statement)
ALTER USER chad SET wlm_query_slot_count = 2
>>> redshift.execute(statement)
Or execute the statement within the method call by specifying
the execute
keyword argument:
redshift.alter_user('chad', wlm_query_slot_count=2, execute=True)
In some cases, the returned SQL might not be a single statement
but rather a batch of multiple statements.
To provide some safety in these cases, the execute
method
(whether invoked explicitly or through a keyword argument)
always initiates a transaction, performing a rollback if any
statement produces an error.
You can use a Redshift
instance within a larger script, or you
can use shiftmanager as a command-line tool for one-off admin tasks.
If you want to make jumping into shiftmanager as quick as possible,
see Configuring shiftmanager For Your Environment.
Creating Users¶
Easily generate strong passwords with random_password
and create new user accounts with create_user
:
password = redshift.random_password()
# Create a new superuser account
statement = redshift.create_user('newuser', password, createuser=True)
To modify existing accounts, use alter_user
.
Schema Reflection, Deep Copies, Deduping, and Migrations¶
shiftmanager
provides several features that reflect existing schema
structure from your cluster, powered by
sqlalchemy-redshift,
a Redshift dialect for SQLAlchemy.
Use table_definition
as a pg_dump
replacement
that understands Redshift-specific structure like distkeys, sortkeys,
and compression encodings:
>>> batch = redshift.table_definition('my_table', schema='my_schema')
>>> print(batch)
CREATE TABLE my_schema.my_table (
id CHAR(36) ENCODE lzo,
email_address VARCHAR(256) ENCODE raw
) DISTSTYLE KEY DISTKEY (id) SORTKEY (id)
;
ALTER TABLE my_schema.my_table OWNER TO chad;
GRANT ALL ON my_schema.my_table TO clarissa
Reflecting table structure can be particularly useful when performing deep copies. Amazon’s documentation on deep copies suggests four potential strategies, but advises:
Use the original table DDL. If the CREATE TABLE DDL is available, this is the best method.
The deep_copy
method codifies this best practice, using table_definition
behind the scenes to recreate the relevant DDL:
>>> batch = redshift.deep_copy('my_table', schema='my_schema')
>>> print(batch)
LOCK TABLE my_schema.my_table;
ALTER TABLE my_schema.my_table RENAME TO my_table$outgoing;
CREATE TABLE my_schema.my_table (
id CHAR(36) ENCODE lzo,
email_address VARCHAR(256) ENCODE raw
) DISTSTYLE KEY DISTKEY (id) SORTKEY (id)
;
ALTER TABLE my_schema.my_table OWNER TO chad;
GRANT ALL ON my_schema.my_table TO clarissa
INSERT INTO my_schema.my_table SELECT * from my_schema.my_table$outgoing;
DROP TABLE my_schema.my_table$outgoing
To remove duplicate records while recreating the table,
pass in the distinct=True
keyword argument.
If you have rows you consider duplicates despite some difference in their
values, you can use the deduplicate_partition_by
option.
For example, say that rows include a message_id
column that should be
unique, but also a when_recorded
column set when the record is ingested.
To retain only the most recently ingested row for each unique id, call:
deep_copy('my_table',
deduplicate_partition_by='message_id',
deduplicate_order_by='when_recorded DESC NULLS LAST')
When using deduplicate_partition_by
, only the first row returned for
any given value of the partitioning columns is retained. It’s strongly
suggested that you supply a value for deduplicate_order_by
to determine
how that initial row is chosen.
deep_copy
can also be used to migrate an existing table to a new structure,
providing a convenient way to alter distkeys, sortkeys, and column encodings.
Additional keyword arguments will be passed to the reflected_table
method,
altering the reflected Table
:
>>> kwargs = dict(redshift_distkey='email_address', redshift_sortkey=('email_address', 'id'))
>>> batch = redshift.deep_copy('my_table', schema='my_schema', **kwarg)
>>> print(batch)
LOCK TABLE my_schema.my_table;
ALTER TABLE my_schema.my_table RENAME TO my_table$outgoing;
CREATE TABLE my_schema.my_table (
id CHAR(36) ENCODE lzo,
email_address VARCHAR(256) ENCODE raw
) DISTSTYLE KEY DISTKEY (email_address) SORTKEY (email_address, id);
ALTER TABLE my_schema.my_table OWNER TO chad;
GRANT ALL ON my_schema.my_table TO clarissa;
INSERT INTO my_schema.my_table SELECT * from my_schema.my_table$outgoing;
DROP TABLE my_schema.my_table$outgoing;
If you pass analyze_compression=True
to deep_copy
, compression encodings
will be updated in the resultant table based on results of running
ANALYZE COMPRESSION to determine optimal encodings for the existing data.
Copy JSON to Redshift¶
To be written. See copy_json_to_table
.
Configuring shiftmanager For Your Environment¶
If you use shiftmanager as a command line interface for administering
Redshift, it can be inconvenient to type in cluster details every time
you open a new session. We recommend writing a short setup script
as a ~/.shiftmanager.py
file or the like:
from shiftmanager import Redshift
# We're assuming PGPASSWORD and PGUSER are set here.
# You might want to pull those in from an encrypted file.
dev = Redshift(host="my-dev-host")
prod = Redshift(host="my-prod-host")
You can then invoke your script interactively like:
$ ipython -i ~/.shiftmanager.py
And have immediate access to the objects you set up.
To make this super convenient, add an alias to your .bashrc
or the like:
alias shiftmanager="ipython -i ~/.shiftmanager.py"
Acknowledgments¶
Thanks to Blame Society Productions for letting us use a screenshot from Chad Vader: Day Shift Manager as our banner image.
API¶
Contents:
Redshift API¶
-
class
shiftmanager.
Redshift
(database=None, user=None, password=None, host=None, port=5439, aws_access_key_id=None, aws_secret_access_key=None, security_token=None, **kwargs)¶ Interface to Redshift.
This class will default to environment params for all arguments.
For methods requiring S3, aws keys are not required if you have environmental params set for boto to pick up: http://boto.readthedocs.org/en/latest/s3_tut.html#creating-a-connection
Parameters: - database (str) – envvar equivalent: PGDATABASE
- user (str) – envvar equivalent: PGUSER
- password (str) – envvar equivalent: PGPASSWORD
- host (str) – envvar equivalent: PGHOST
- port (int) – envvar equivalent: PGPORT
- aws_access_key_id (str) – envvar equivalent: AWS_ACCESS_KEY_ID
- aws_secret_access_key (str) – envvar equivalent: AWS_SECRET_ACCESS_KEY
- security_token (str) – envvar equivalent: AWS_SECURITY_TOKEN or AWS_SESSION_TOKEN
- kwargs (dict) – Additional keyword arguments sent to psycopg2.connect
-
alter_user
(name, password=None, valid_until=None, createdb=None, createuser=None, rename=None, execute=False, **parameters)¶ Return a SQL str that alters an existing user.
Parameters: - name (str) – The name of the user account to create.
- password (str) – The password for the new user.
- valid_until (str or datetime) – An absolute time after which the user account password is no longer valid.
- createdb (boolean) – Allow the new user account to create databases.
- createuser (boolean) – Create a superuser with all database privileges.
- rename (str) – New name to assign the user.
- execute (boolean) – Execute the command in addition to returning it.
- parameters – Additional keyword arguments are interpreted as configuration parameters whose values will be set by additional ALTER USER statements added to the batch. For values set to None, the parameter will be reset, letting system defaults take effect.
-
aws_credentials
¶
-
static
chunked_json_slices
(*args, **kwds)¶ Given an iterator of dicts, chunk them into slices and write to temp files on disk. Clean up when leaving scope.
Parameters: Returns: - stamp (str) – Timestamp that prepends the filenames of chunks written to disc
- chunk_files (list) – List of filenames
-
connection
¶ A
psycopg2.connect
connection to Redshift.Instantiation is delayed until the object is first used.
-
copy_json_to_table
(*args, **kwargs)¶ Given a list of JSON-able dicts, COPY them to the given table_name
This function will partition the blobs into slices number of files, write them to the s3 bucket, write the jsonpaths file, COPY them to the table, then optionally clean up everything in the bucket.
Parameters: - bucket (str) – S3 bucket for writes
- keypath (str) – S3 key path for writes
- data (iterable of dicts) – Iterable of JSON-able dicts
- jsonpaths (dict) – Redshift jsonpaths file. If None, will autogenerate with alphabetical order
- table (str) – Table name for COPY
- slices (int) – Number of slices in your cluster. This many files will be generated on S3 for efficient COPY.
- clean_up_s3 (bool) – Clean up S3 bucket after COPY completes
- local_path (str) – Local path to write chunked JSON. Defaults to $HOME/.shiftmanager/tmp/
- clean_up_local (bool) – Clean up local chunked JSON after COPY completes.
-
copy_table_to_redshift
(redshift_table_name, bucket_name, key_prefix, pg_table_name=None, pg_select_statement=None, temp_file_dir=None, cleanup_s3=True, delete_statement=None, manifest_max_keys=None, line_bytes=104857600, canned_acl=None)¶ Writes the contents of a Postgres table to Redshift.
The approach here attempts to maximize speed and minimize local disk usage. The fastest method of extracting data from Postgres is the COPY command, which we use here, but pipe the output to the
split
andgzip
shell utilities to create a series of compressed files. As files are created, a separate thread uploads them to S3 and removes them from local disk.Due to the use of external shell utilities, this function can only run on an operating system with GNU core-utils installed (available by default on Linux, and via homebrew on MacOS).
Parameters: - redshift_table_name (str) – Redshift table to which json files are to be written
- bucket_name (str) – The name of the S3 bucket to be written to
- key_prefix (str) – The key path within the bucket to write to
- pg_table_name (str) – Optional Postgres table name to be written to json if user does not want to specify subset
- pg_select_statement (str) – Optional select statement if user wants to specify subset of table
- temp_file_dir (str) – Optional Specify location of temporary files
- cleanup_s3 (bool) – Optional Clean up S3 location on failure. Defaults to True.
- delete_statement (str or None) – When not None, this statement will be run in the same transaction as the (final) COPY statement. This is useful when you want to clean up a previous backfill at the same time as issuing a new backfill.
- manifest_max_keys (int or None) – If None, all S3 keys will be sent to Redshift in a single COPY transaction. Otherwise, this parameter sets an upper limit on the number of S3 keys included in the COPY manifest. If more keys were produced, then additional COPY statements will be issued. This is useful for particularly large loads that may timeout in a single transaction.
- line_bytes (int) – The maximum number of bytes to write to a single file (before compression); defaults to 100 MB
- canned_acl (str) – A canned ACL to apply to objects uploaded to S3
-
copy_table_to_s3
(bucket_name, key_prefix, pg_table_name=None, pg_select_statement=None, temp_file_dir=None, cleanup_s3=True, line_bytes=104857600, canned_acl=None)¶ Writes the contents of a Postgres table to S3.
The approach here attempts to maximize speed and minimize local disk usage. The fastest method of extracting data from Postgres is the COPY command, which we use here, but pipe the output to the
split
andgzip
shell utilities to create a series of compressed files. As files are created, a separate thread uploads them to S3 and removes them from local disk.Due to the use of external shell utilities, this function can only run on an operating system with GNU core-utils installed (available by default on Linux, and via homebrew on MacOS).
Parameters: - bucket_name (str) – The name of the S3 bucket to be written to
- key_prefix (str) – The key path within the bucket to write to
- pg_table_name (str) – Optional Postgres table name to be written to json if user does not want to specify subset
- pg_select_statement (str) – Optional select statement if user wants to specify subset of table
- temp_file_dir (str) – Optional Specify location of temporary files
- cleanup_s3 (bool) – Optional Clean up S3 location on failure. Defaults to True.
- line_bytes (int) – The maximum number of bytes to write to a single file (before compression); defaults to 100 MB
- canned_acl (str) – A canned ACL to apply to objects uploaded to S3
Returns: Return type: (Final key prefix, List of S3 keys)
-
create_pg_connection
(**kwargs)¶ Create a
psycopg2.connect
connection to Redshift.See https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS for supported parameters.
-
create_user
(name, password, valid_until=None, createdb=False, createuser=False, groups=None, execute=False, **parameters)¶ Return a SQL str defining a new user.
Parameters: - name (str) – The name of the user account to create.
- password (str) – The password for the new user.
- valid_until (str or datetime) – An absolute time after which the user account password is no longer valid.
- createdb (boolean) – Allow the new user account to create databases.
- createuser (boolean) – Create a superuser with all database privileges.
- groups (list of str) – Existing groups that the user will belong to.
- execute (boolean) – Execute the command in addition to returning it.
- parameters – Additional keyword arguments are interpreted as configuration parameters whose values will be set by additional ALTER USER statements added to the batch.
-
deep_copy
(table, schema=None, copy_privileges=True, use_cache=True, cascade=False, distinct=False, analyze_compression=False, analyze=True, deduplicate_partition_by=None, deduplicate_order_by=None, execute=False, **kwargs)¶ Return a SQL str defining a deep copy of table.
This method can be used to simply sort and clean an unvacuumable table, or it can be used to migrate to a revised table structure. You can use the
reflected_table
method with overrides to generate a new table structure, then pass that revised object in as table.Parameters: - table (
str
orTable
) – The table to reflect - schema (
str
) – The database schema in which to look for table (only used if table is str) - copy_privileges (
bool
) – Reflect ownership and grants on the existing table and include them in the return value - use_cache (
bool
) – Use cached results for the privilege query, if available - cascade (
bool
) – Drop any dependent views when dropping the source table - distinct (
bool
) – Deduplicate the table by adding DISTINCT to the SELECT statement; also see deduplicate_partition_by for more control - analyze_compression (
bool
) – Update the column compression encodings based on results of an ANALYZE COMPRESSION statement on the table. - analyze (
bool
) – Add an ‘ANALYZE table’ command at the end of the batch to update statistics, since this is not done automatically for INSERTs - deduplicate_partition_by (
str
orNone
) – A string giving a list of columns like ‘col1, col2’ to be passed to ‘ROW_NUMBER() OVER (PARTITION BY {columns})’ so that only the first row for a given set of values will be retained; it’s strongly suggested that you also set deduplicate_order_by so that results are deterministic - deduplicate_order_by (
str
orNone
) – A string like ‘col3 DESC NULLS LAST, col4 ASC NULLS LAST’ to be passed to the ‘PARTITION BY’ clause for deduplication, with the first row in sort order being the one retained; will be ignored if deduplicate_partition_by is not also set - execute (
bool
) – Execute the command in addition to returning it. - kwargs – Additional keyword arguments will be passed unchanged to the
reflected_table
method.
- table (
-
engine
¶ A sqlalchemy.engine which wraps
connection
.
-
execute
(batch, parameters=None)¶ Execute a batch of SQL statements using this instance’s connection.
Statements are executed within a transaction.
Parameters: - batch (str) – The batch of SQL statements to execute.
- parameters (list or dict) – Values to bind to the batch, passed to
cursor.execute
-
static
gen_jsonpaths
(json_doc, list_idx=None)¶ Generate Redshift jsonpath file for given JSON document or dict.
If an array is present, you can specify an index to use for that field in the jsonpaths result. Right now only a single index is supported.
Results will be ordered alphabetically by default.
Parameters: Returns: Return type: Dict
-
get_bucket
(*args, **kwargs)¶ Get boto.s3.bucket. Caches existing buckets.
Parameters: bucket_name (str) –
-
get_s3_connection
(ordinary_calling_fmt=False)¶ Get new S3 Connection
Parameters: ordinary_calling_fmt (bool) – Initialize connection with OrdinaryCallingFormat
-
get_table_names
(schema=None, **kwargs)¶ Return a list naming all tables and views defined in schema.
-
meta
¶ A
MetaData
instance used for reflection calls.
-
mogrify
(batch, parameters=None, execute=False)¶
-
pg_connection
¶ A
psycopg2.connect
connection to Postgres.Instantiation is delayed until the object is first used.
-
pg_execute_and_commit_single_statement
(statement)¶ Execute single Postgres statement
-
preparer
¶ A Redshift-aware identifier preparer.
-
static
random_password
(length=64)¶ Return a strong password valid for Redshift.
Constraints:
- 8 to 64 characters in length.
- Must contain at least one uppercase letter, one lowercase letter, and one number.
- Can use any printable ASCII characters (ASCII code 33 to 126)
except
'
(single quote),"
(double quote),\
,/
,@
,\`
or space. - See Redshift’s CREATE USER docs
-
reflected_privileges
(relation, schema=None, use_cache=True)¶ Return a SQL str which recreates all privileges for relation.
Parameters
-
reflected_table
(name, *args, **kwargs)¶ Return a
Table
reflected from the database.This is simply a convenience method which passes arguments to the
Table
constructor, so you may override various properties of the existing table. In particular, Redshift-specific attributes like distkey and sorkey can be set throughredshift_*
keyword arguments (redshift_distkey='col1'
,redshift_interleaved_sortkey=('col1', 'col2')
, etc.)The return value is suitable input for the
table_definition
ordeep_copy
methods, useful for changing the structure of an existing table.extend_existing is set to True by default.
Notes
See SQLAlchemy’s dcoumentation on Overriding Reflected Columns and
sqlalchemy-redshift
’s DDLCompiler docs
-
set_aws_credentials
(aws_access_key_id, aws_secret_access_key, security_token=None)¶ Set AWS credentials. These will be required for any methods that need interaction with S3
Parameters:
-
set_aws_role
(aws_account_id, aws_role_name)¶ Set AWS IAM role. This rote will be assumed by the Redshift cluster when reading from S3 during COPY statements. If not set, the access key and secret from set_aws_credentials will be used.
Parameters:
-
table_definition
(table, schema=None, copy_privileges=True, use_cache=True, analyze_compression=False)¶ Return a str containing the necessary SQL statements to recreate table.
Parameters: - table (
str
orTable
) – The table to reflect - schema (
str
) – The database schema in which to look for table (only used if table is str) - copy_privileges (
bool
) – Reflect ownership and grants on the existing table and include them in the return value - use_cache (
bool
) – Use cached results for the privilege query, if available
- table (
-
table_exists
(table_name)¶ Check Redshift for whether a table exists.
Parameters: table_name (str) – The name of the table for whose existence we’re checking Returns: Return type: boolean
-
unload_table_to_s3
(*args, **kwargs)¶ Given a table in Redshift, UNLOAD it to S3
Parameters: - bucket (str) – S3 bucket for writes
- keypath (str) – S3 key path for writes
- table (str) – Table name for UNLOAD
- schema (str) – Schema that table resides in Defaults to None
- col_str (str) – Comma separated string of columns to unload Defaults to ‘*’
- where (str) – SQL where clause string to filter select statement in unload Defaults to None, meaning no WHERE clause is applied
- to_json (boolean) – Defaults to True
- options (str) – Additional options to be included in UNLOAD command Defaults to None and the following options are used: - MANIFEST - GZIP - ALLOWOVERWRITE
-
view_definition
(view, schema=None, copy_privileges=True, use_cache=True, execute=False, **kwargs)¶ Return a SQL str defining view.
Parameters: - view (
str
orTable
) – The view to reflect - schema (
str
) – The database schema in which to look for view (only used if view is str) - copy_privileges (
bool
) – Reflect ownership and grants on the existing view and include them in the return value - use_cache (
bool
) – Use cached results for the privilege query, if available - execute (
bool
) – Execute the command in addition to returning it - kwargs – Additional keyword arguments will be passed unchanged to
get_view_definition()
- view (
-
write_dict_to_key
(data, key, close=False)¶ Given a Boto S3 Key, write a given dict to that key as JSON.
Parameters:
-
write_file_to_s3
(f, bucket, s3_key_path)¶ Given a string chunk that represents a piece of a CSV file, write the chunk to an S3 key.
Parameters:
-
write_filename_to_s3
(filename, bucket, s3_key_path)¶ Given a string chunk that represents a piece of a CSV file, write the chunk to an S3 key.
Parameters:
-
write_string_to_s3
(chunk, bucket, s3_key_path, canned_acl=None)¶ Given a string chunk that represents a piece of a CSV file, write the chunk to an S3 key.
Parameters: