Welcome to data-migrator!

data-migrator is database transformation in a pipes and filter, declarative way, Python/Django style.

Data transformation is an important process in software development when architectures evolve and data has to move along. This requires fast and easy restructuring of data in a frictionless and repetitive way. Quite quickly ad-hoc scripts or transfer data, upgrade it in sql second is not flexible enough. For those cases data-migrator is an answer.

Once you ‘ve installed data-migrator, we recommend:

  • to take a look at the Example first
  • after that continue with the Tutorial
  • or read the About if you want some more background

Contents

Example

Core of data-migrator is the unix pipe and filter paradigm to build data transformers. Source data is read from a database or some other source. It is piped to a filter written with data-migrator, which emits for example SQL INSERT statements. These can be piped to a target client and thereby loaded into the target database.

Consider the most basic datapump in mysql, which is direct pipe between source and target database:

$ mysqldump -u [uname] -p[pass] source_db table  | mysql target_db

In this example mysqldump will export the table as SQL statements and the new database will process them. Now if you want to do something extra and repeatable with respect to the data, you could use all kinds of unix filtering with sed, awk, or your favorite scripting. It is not very hard to imagine what Pythonista’s would do especially if extra transformations or simplifications are needed. The basic packages are quite expressive and one would easily setup something like:

$ mysql source_db -E 'select * from table' -B  | python my_filter.py | mysql target_db

With my_filter.py a basic implementation of csv:

import sys, csv

reader = csv.DictReader(sys.stdin)

for row in reader:
  print 'INSERT INTO `table` (a,b) VALUES ("%(a)s", %(b)s)' % row

To see the options for manipulation is left as an exercise to the reader, but do accept that as soon things become just a little more complex (think: splitting in two tables, column reverses, renaming of columns, mixing, joining, filtering, transforming), a more declarative support is helpful. That is why we came up with data-migrator. One could simply replace the transformation with:

from data_migrator import models, transform
from data_migrator.emitters import MySQLEmitter

def parse_b(v):
  if v == 'B':
    return 'transformed_B'
  else:
    return v.lower()

class Result(models.Model):
  id   = models.IntField(pos=0) # keep id
  uuid = models.UUIDField()     # generate new uuid4 field
  # replace NULLs and trim
  a    = models.StringField(pos=1, default='NO_NULL', max_length=5, nullable='NULL', replacement=lambda x:x.upper())
  # parse this field
  b    = models.StringField(pos=2, parse=parse_b, name='my_b')

  class Meta:
    table_name = 'new_table_name'

# django-esc like creating and saving of additional records (to a manager)
Result(a='my a', b='my b').object.save()

if __name__ == "__main__":
  transform.Transformer(models=[Result], emitter=MySQLEmitter).process()

  assert(len(Result.objects) > 1)

Hereby write a nice self explaining transformer, which will generate something like:

-- transformation for Result to table new_table_name
-- input headers: id,a,b
-- stats: in=10,dropped=0,out=10

SET SQL_SAFE_UPDATES = 0; -- you need this to delete without WHERE clause
DELETE FROM `new_table_name`;
ALTER TABLE `new_table_name` AUTO_INCREMENT = 1;

INSERT INTO `new_table_name` (`id`, `uuid`, `a`, `my_b`) VALUES (0, "ac7100b9-c9ad-4069-8ca5-8db1ebd36fa3", "MY A", "my b");
INSERT INTO `new_table_name` (`id`, `uuid`, `a`, `my_b`) VALUES (1, "38211712-0eb2-4433-b28f-e3fe33492e7a", "NO_NULL", "some value");
INSERT INTO `new_table_name` (`id`, `uuid`, `a`, `my_b`) VALUES (2, "a3478903-aed9-462c-8f47-7a89013bc6ea", "CHOPP", "transformed_B");
...

If you can see the value of this package, continue with the installation of data-migrator. After which there are some more pages in the Tutorial for you to get a grasp of this library.

Installation

Using pip (or …)

Category:Stable version
Precondition:pip (or setuptools) is installed

Execute the following command to install data-migrator with pip:

pip install data-migrator

To update an already installed data-migrator version, use:

pip install -U data-migrator

Hint

See also pip related information for installing Python packages.

Using a Source Distribution

After unpacking the data-migrator source distribution, enter the newly created directory “data-migrator-<version>” and run:

python setup.py install

Using the Github Repository

Category:Bleading edge
Precondition:pip is installed

Run the following command to install the newest version from the Github repository:

pip install -e git+https://github.com/schubergphilis/data-migrator

To install a tagged version from the Github repository, use:

pip install -e git+https://github.com/schubergphilis/data-migrator@<tag>

where <tag> is the placeholder for an existing tag. You can drop the -e and not install in editable mode if you expect not to edit the source. Running in editable mode allows to work on the code while the library is installed and available. Be sure to make that pull-request ;-)

Tutorial

Getting Started

Obviously first, install data-migrator.

Next create a directory for your migration scripts. Your milage may very, but we assume you have client access to source data. Same like we discussed in the example, we assume you have client like mysql spitting out CSV’s in some form and also expect you have client access to the target database. To automate and make it repetitive, just use make. We add some Makefile-foo here, but do not worry. Basically what we want to execute is something like this:

mysql [SOURCE] -e "SELECT * FROM table" -B" | python transform.py | mysql [TARGET]

In a more generic way and running the clients directly on the host, we will get:

TARGETS = table
OPTIONS ?=-p 2 --debug
OUT_DIR ?= results

table.qry='SELECT t.* FROM table LIMIT 0,100'

default: clean install all

all: $(TARGETS)

install:
      pip install data-migrator

clean:
      @rm -rf $(OUT_DIR)
      @find . -name *.pyc -delete

$(OUT_DIR)/%.sql: | $(OUT_DIR)
      ssh [SOURCE_HOST] "sudo mysql connect -e $($(@F)) -B" | python transform_$*.py  $(OPTIONS) -o $(OUT_DIR)

$(TARGETS):%:$(OUT_DIR)/%.sql

$(OUT_DIR):
      mkdir -p $@

upload:
      ssh [TARGET_HOST] "sudo mysql [TARGET_DB]" < $(OUT_DIR)/table.sql

See that we use a simple query and extract the first 100 lines. The rest of the magic of the Makefile is to separate the extraction from the loading, and allow to easily extend the script with more tables and source. Note that in this case, we are defining the extract query in the makefile, and we are using sudo rights to extract and upload the data. You might want to have an opinion about that.

We now have the ground work for extracting a table, transforming it and loading it. Next step is to build the filter and transform the data into something the target database can accept. Like in the example we can build a simple transformer:

from data_migrator import models, transform
from data_migrator.emitters import MySQLEmitter

def parse_b(v):
  if v == 'B':
    return 'transformed_B'
  else:
   return v.lower()

class Result(models.Model):
  id   = models.IntField(pos=0) # keep id
  uuid = models.UUIDField()     # generate new uuid4 field
  # replace NULLs and trim
  a    = models.StringField(pos=1, default='NO_NULL', max_length=5, nullable='NULL', replace=lambda x:x.upper())
  # parse this field
  b    = models.StringField(pos=2, parse=parse_b, name='my_b')

  class Meta:
    table_name = 'new_table_name'

# django-esc like creating and saving (to a manager)
Result(a='my a', b='my b').objects.save()

if __name__ == "__main__":
  transform.Transformer(models=[Result], emitter=MySQLEmitter).process()

  # prove we have objects
  assert(len(Result.objects) > 1)

And we now have a nice self explaining transformer, which will generate something like:

-- transformation for Result to table new_table_name
-- input headers: id,a,b
-- stats: in=10,dropped=0,out=10

SET SQL_SAFE_UPDATES = 0; -- you need this to delete without WHERE clause
DELETE FROM `new_table_name`;
ALTER TABLE `new_table_name` AUTO_INCREMENT = 1;

INSERT INTO `new_table_name` (`id`, `uuid`, `a`, `my_b`) VALUES (0, "ac7100b9-c9ad-4069-8ca5-8db1ebd36fa3", "MY A", "my b");
INSERT INTO `new_table_name` (`id`, `uuid`, `a`, `my_b`) VALUES (1, "38211712-0eb2-4433-b28f-e3fe33492e7a", "NO_NULL", "some value");
INSERT INTO `new_table_name` (`id`, `uuid`, `a`, `my_b`) VALUES (2, "a3478903-aed9-462c-8f47-7a89013bc6ea", "CHOPP", "transformed_B");

There you are, you have setup your first pipeline. Execute this by running:

$ make table  # extract the data from the database, transform it
$ make upload # load it into the database

You can lookup the intermediate result by viewing the generated sql results/new_table_name.sql. data-migrator does not focus on the database schema (yet!) so the table is expected to exist in the target system. But by default the system (or actually the emitter) is wiping the data, not recreating the table. If you have issues with the Python libraries, run make install do install the library from this makefile.

Now go ahead and add more fields. See fields reference for more details about the options of the fields.

Extending Types

This is the next tutorial, see the previous

Once you have your base transformation up and running it is time to extend the fields. Out of the box we already offer various extensions, the full list of Field types can be found in the reference.

Translating IDs

One of the patterns we see in the evolution with microservices is a move away from fully normalized database schema’s. Modern systems rely more on auto-completion, data is cheaper and probably a lot more reasons to store full strings instead of IDs. In data-migrator this is easily supported for hardcoded values with a small python function:

M = {
  0: "NOT SET",
  1: "Hallo",
  2: "Hello",
  3: "Bonjour"
}

def parse_b(v):
  return M.get(int(v), M[0])

class Result(models.Model):
  id = models.IntField(pos=0) # keep id
  ....
  # replace ID with value
  b  = models.StringField(pos=2, parse=parse_b, default="How are you doing?")

Note the values are parsed as string from the CSV reader.``NULL`` is by default translated to None, which is replaced by the default value and will never see the parse function.

Merging columns

Another migration pattern is to merge separate (boolean) columns back to a single enumeration column. To support that use a row parser instead of a single value parser. If no pos is given, the parser will be row based instead of a value parsed and linked to a single column value:

def employment_type(row):
  if row[26] == "1":          # contractor
    return 'contractor'
  elif row[27] == "1":        # intern
    return 'intern'
  else:
    return 'perm'

class Result(models.Model):
  ....
  b  = models.StringField(parse=employment_type, default="perm")

Dynamic lookups

At moments one needs to lookup values in the target database. Do not be shy to generate dynamic lookups in the target database using SELECT statements that run during import into the target database.

class Result(models.Model):
  recruiter_uuid = models.StringField(pos=38,
    replacement=lambda x:'(SELECT uuid FROM `persons` WHERE `mail`="%s" limit 0,1)' % x)

This can off course be combined with python based transformations to fix deleted values:

def recruiter(v):
  if v is None or v in ['missing1@mail.com', 'missing2@mail.com']:
      return 'default_person@mail.com'
  else:
    return v

class Result(models.Model):
  recruiter_uuid = models.StringField(pos=38, parse=recruiter,
    replacement=lambda x:'(SELECT uuid FROM `persons` WHERE `mail`="%s" limit 0,1)' % x)

The output is a value for the target database as being the input for a query on that target database. BTW replacement by output string is considered a default. Therefore the replacement string in format type of transformation can be provided directly:

...

class Result(models.Model):
  recruiter_uuid = models.StringField(pos=38, parse=recruiter,
    replacement='(SELECT uuid FROM `persons` WHERE `mail`="{}" limit 0,1)')

Table lookups

For larger tables there is support for table driven lookups from external CSV files. It is also possible for the map (just a key,value) to be ad-hoc generated by other means. data-migrator offers a helper function read_map_from_csv() to read the csv.

from data_migrator.contrib.read import read_map_from_csv

class Result(models.Model):
  country = models.MappingField(pos=33, default='NLD',
    data_map=read_map_from_csv(f=open('data/country.csv'), delimiter=';', key='country_id', value='alpha3'))

Combining table lookups

The table lookup and column reduction can also be combined. Consider a multi state entity with specific lookup values to be merged in one lookup value:

from data_migrator.contrib.read import read_map_from_csv

LOOKUP1 = read_map_from_csv(f=open('data/state1.csv'), delimiter=';', key='id', value='name')
LOOKUP2 = read_map_from_csv(f=open('data/state2.csv'), delimiter=';', key='id', value='name')

def parse_lookup(row):
  return LOOKUP1.get(row[1], LOOKUP2.get(row[2], ''))

class Result(models.Model):
  ...
  state = models.StringField(parse=parse_lookup)

Flatten multi values

The most extensive many-2-many flattening is for example a tagging of multiple values to a main entity. This is mostly implemented in a 3 table structure, following the classic normalization approach:

  • A table with the main entity (for example persons)
  • a table with the attributes in a fixed id,value structure and last
  • a many-to-many table linking the attributes to the main entities.

A simple approach to flatten this is to encode this a JSON list, to transform the data use a four step approach:

  1. Extract the data from the old system fully expanded
  2. Read the CSV and flatten to a map of lists
  3. Link the values at read time replacing the main ID with lists
  4. Emit the whole as a JSON list

The first step relies on queries like:

SELECT
        P.person_id,
        S.name as skill
FROM person_skill_m2m P
INNER JOIN skills S
        ON S.id=P.skill_id;

After that, loading and emitting to JSON is simply using the MappingField

from data_migrator.contrib.read import read_map_from_csv

class Result(models.Model):
  skills = models.MappingField(pos=0, default=[], as_json=True,
    data_map=read_map_from_csv(f=open('results/skill.csv'), key='candidate_id', value='skill', as_list=True))

Now take these examples and mix your own. It is standard Python, we have no doubt you can come up with all kinds of amazing transformations.

Validation

See the previous tutorial.

Another aspect is to have some control over the data received. Out of the box data-migrator offers a wide range of facilities

None column test

The current list of Field types contains Nullables and non Nullable fields. In some cases you might want to exclude those nulls. This is a generic function in the manager and can be set on a model basis in the Meta block

class Result(models.Model):
  id = models.IntField(pos=0) # keep id
  a  = models.StringField(pos=1)
  b  = models.StringField(pos=2, nullable=None)

  class Meta:
    drop_if_none = ['b']

The moment records are saved they are validated and records are dropped if b turns out to be None

Unique columns

A common pattern is to drop non unique values (last to be dropped), for example when scanning unique email addresses. This functionality is also out of the box:

class Result(models.Model):
  id = models.IntField(pos=0) # keep id
  a  = models.StringField(pos=1, unique=True)

  class Meta:
    drop_non_unique = True

Uniqueness is checked at the parsing of the input, before any records are written. Use fail_non_unique if you do not want to drop, but completely fail the transformation.

Complex Unique columns

A more complex situation is when a combination of (input) columns needs to be checked. Consider for example the de-duplication of membership records. This is a solved by using a parse function and a hidden column:

def compound(row):
  return row[0] + row[1] + row[2]

class Result(models.Model):
  id  = models.IntField(pos=0) # keep id
  key = models.HiddenField(parse=compound, unique=True)
  a   = models.StringField(pos=1)

  class Meta:
    drop_non_unique = True

In this example key is set to check uniqueness but not used as output in the end result.

Advanced record generation

See the previous tutorial.

A typical case in system evolution is the growing complexity of entities into more detailed records. For example a system that maintains grouping can be expanding in more interlinked entities in the new system. This might generate more (linked) entities. data-migrator is build for these kind of situations.

One to many records

Linked to every model is a manager, which contains all logic to parse and emit records. By default this is a simple manager that scans one record per row. In those cases the system has to generate many, this is easily achieved by adding a dedicated manager:

from data_migrator import models

class ResultManager(models.Manager):
  def transform(self, row, previous, model):
    defaults = {'c':'default value'}
    res = [
      model(**defaults).scan(row),
      model(**defaults).scan(row),
      model(**defaults).scan(row)
    ]
    res[1].a += 'some postfix'
    res[2].a += 'alternative postfix'
    return res

class Result(models.Model):
  id = models.IntField(pos=0) # keep id
  a  = models.StringField(pos=1)
  b  = models.StringField(pos=2, nullable=None)
  c  = models.StringField()

  class Meta:
    manager = ResultManager

This example is not really useful, it just generates 3 records instead of one.

Multi models generation

The manager setup is one of the big reasons to use this package instead of simple scripts. In our model migrations we came across, steps in which we had to generate secondary models (for example permissions or links between models).

from data_migrator import models

class PermissionManager(models.Manager):
  def transform(self, row, previous, model):
    defaults = {
      'model_id': previous[0][0].a, # due to order permission will get a result
      'user': row[0]
    }
    res = [
      model(right='read_result', **defaults),
      model(right='update_result', **defaults),
      model(right='delete_result', **defaults)
    ]
    return res

class Permission(models.Model):
  id       = models.IntField()
  right    = models.StringField()
  model_id = models.StringField()
  user     = models.StringField()

  class Meta:
    manager = PermissionManager

class Result(models.Model):
  id = models.IntField(pos=0) # keep id
  a  = models.StringField(pos=1)
  b  = models.StringField(pos=2, nullable=None)


if __name__ == "__main__":
  transform.Transformer(models=[Result, Permission], emitter=MySQLEmitter).process()

The order in the transformer models list is important and determines the order of object creation. This is an example on how one model can be used in the generation of the second one.

Anonymization

See the previous tutorial.

With the ongoing focus on privacy and GDPR becoming enforceable, more emphasis will be put on anonymization of data. For this data-migrator is also quite useful.

Independent Field Anonymization

Anonymization is just a simple as adding a definition to a field:

from data_migrator import models
from data_migrator.anonymizors import SimpleStringAnonymizor

class Result(models.Model):
  id = models.IntField(pos=0) # keep id
  a  = models.StringField(pos=1, anonymize=SimpleStringAnonymizor)
  b  = models.StringField(pos=2, nullable=None)

It does not get more complex than this. The string is replaced by a garbled string with the same length.

Option Anonymization

Lets now assume you have some field that denotes gender and you want to garble it too, but also want to be true to some distribution:

from data_migrator import models
from data_migrator.anonymizors import ChoiceAnonymizor

class Result(models.Model):
  id     = models.IntField(pos=0) # keep id
  gender = models.StringField(pos=1, anonymize=ChoiceAnonymizor(['M','F',None], weights=[0.2,0.5,0.3]))
  b      = models.StringField(pos=2, nullable=None)

Now the field will be filled with a random choice according to a specific distribution.

For more details see the data_migrator.anonymizors

Datalake loads and Kinesis

See the previous tutorial.

A new use case is the initialization of data-lakes. In those cases, systems, applications and microservices are extended with event message emitters to be put on busses like Kafka and Kinesis. Sometimes one is better of to control the initial load of such systems by emitting the current database as messages. A first use case for data-migrator was implemented with AWS Kinesis.

Controlling Kinesis

For Kinesis, two extensions are added: a JSONEmitter and a new transformer. The KinesisTransformer is using Boto3, so see the Boto documentation for how to cope with credentials and profiles (you need to setup ~/.aws/credential and ~/.aws/config files).

from datetime import datetime

from data_migrator.contrib import KinesisTransformer
from data_migrator import models

def Meta(row):
    return {
        "eventType": "model.create",
        "timestamp": datetime.utcnow().isoformat() + 'Z',
        "producedBy": "data-migrator"
    }

class TrialModel(models.Model):
    _meta           = models.DictField(parse=Meta)
    uuid            = models.UUIDField(pos=3)
    value           = models.IntField(pos=8)
    created_by      = models.StringField(pos=10)
    created_at      = models.StringField(pos=9)
    updated_by      = models.StringField(pos=12)
    updated_at      = models.StringField(pos=11)

if __name__ == "__main__":
    KinesisTransformer(models=[TrialModel]).process(
      stream_name='my-stream',
      profile_name='prd',
    )

It does not get more complex than this, but obviously there is a little more to it to understand if the model is ok. The Transformer therefore has two options trial, to do a trial run, meaning do not actually send to Kinesis and output to set the folder to export the model to (name of the file is based on the model_name).

$ python transform_trialmodel.py --help
usage: transform_trialmodel.py [-h] [-o OUTDIR] [-i INPUT] [--debug]
                                    [-q] [-p ROWS] [-t] [--stream STREAM]
                                    [--profile PROFILE]

Basic Transformer parser

optional arguments:
  -h, --help            show this help message and exit
  -o OUTDIR, --outdir OUTDIR
                        output directory
  -i INPUT, --input INPUT
                        input file
  --debug               enter debug mode
  -q, --quiet           quiet mode, no output
  -p ROWS, --rows ROWS  input rows to print
  -t, --trial           trial run only, no actual upload to Kinesis
  --stream STREAM       name of the stream
  --profile PROFILE     name of the profile

Concepts

The Scan Emit Loop

Core in data-migrator is the declarative definition of the target model, indeed in a Django-esc way. Columns of the target table are defined as fields and each field has many settings. The Field is a definition of what to perform when scanning, transforming and emitting the record. Output is abstracted to an extensible set of output writers, called emitters. The whole is controlled with a standard transformer engine.

The scan-emit loop is the basis the data-migrator. Once everything is setup, by default the transformer will read stdin and send every CSV row to the model for scanning. Out of the box the fields define a scan loop:

  1. select the specified column from the row.
  2. nullable test if not allowed and replace by None.
  3. validate the input (if validator is provided).
  4. parse the input (if parser is provided).
  5. store as native python value (aka NULL=>None).

Once all fields are parsed, the resulting object can be checked for None or uniqueness. It can be dropped or the filter can fail because of violations. This are all declarative settings on the Model through the Meta settings. Otherwise the record is saved and (accessible by Model.objects.all()) is emitted. This is based on a dedicated emitter, like the MySQL INSERT statement generator. Emitting provides some of the following features:

  1. trim if string and max_length is set (note the full string is stored in the intermediate object!).
  2. validate the output (if output_validate is provided).
  3. replace the value with some output string (if provided).
  4. anonymize has been added to the output as of version 0.6.0
  5. write in a dedicated format as dictated by the emitter.

API Reference

These pages contain the api reference of data-migrator and is mostly generated automatically from source. See the tutorial for more instruction on how to use the various parts of this library.

Transformer class reference

This document covers features of the Transformer class.

Transformer

class data_migrator.transform.Transformer(models=None, reader=None, dataset=None, argparser=None, outdir=None, emitter=<class 'data_migrator.emitters.mysql.MySQLEmitter'>)[source]

Main transformation engine

Use this class as your main entry to build your Transformer

>>> if __name__ == "__main__":
>>>    t = transform.Transformer(models=[Model])
>>>    t.process()
__init__(models=None, reader=None, dataset=None, argparser=None, outdir=None, emitter=<class 'data_migrator.emitters.mysql.MySQLEmitter'>)[source]
Parameters:
  • models (list) – list of all models to be processed in this transformer
  • reader – reference to and external reader if not default
  • dataset – a tablib dataset to read from
  • argparse – reference to another argument parser if not default_parser
  • outdir – output directory for results, otherwise scan from argparser
  • emitter – emitter to be used for this transformation

Note that the order of models is relevant for the generation

process()[source]

Main processing loop

Model class reference

This document covers features of the Model class.

Model

class data_migrator.models.base.Model(**kwargs)[source]

Model is foundation for every transformation.

Each non-abstract Model class must have a BaseManager instance added to it. data-migrator ensures that in your model class you have at least a standard SimpleManager specified, on case you do add your own specialization of BaseManager through the Meta class manager attribute.

objects

reference to manager

emit(escaper=None)[source]

output and escape this object instance to a dict.

Returns:object transfored according to field definitions
Return type:map

Note

HiddenFields are not emitted

classmethod json_schema()[source]

generate the json schema representation of this model.

Returns:dict with python representation of json schema.
save()[source]

Save this object and add it to the list.

Returns:self, so that methods can be chained
scan(row)[source]

scan model from row based on field definition scanners.

Returns:self, so that methods can be chained
update(**kwargs)[source]

Update method for chaining operations.

Returns:self, so that methods can be chained
Raises:DataException – raised if trying to set non defined field and strict model.

ModelBase

class data_migrator.models.base.ModelBase[source]

Metaclass for all models.

Note

the model structure is the foundation of data-migrator and is taken from Django (https://github.com/django/django)

Meta class reference

This document covers features of the Meta class. The meta class defines model specific settings and is used as an inner class in the model:

from data_migrator import models

class SampleModel(models.Model):
  a = models.IntField(pos=1)

  class Meta:
    drop_if_none = True

Every model can have its own meta class to define model specific options.

Note

Technically, Meta is just a container and forwarded to Options

class data_migrator.models.options.Options(cls, meta, fields)[source]
__init__(cls, meta, fields)[source]

Options is the Model Meta data container

The Options class is the true meta data container and parser for a Model. It contains all flag and fields references for model handling. Use these flags in the Meta sub class of a Model.

Parameters:
  • cls – the Model this Options object is refering too
  • meta – the reference to a Meta class
  • fields (list) – list of all field definitions
drop_if_none

list – names of the columns to check for None, Is a list of field names as defined. If set data-migrator will check if fields are not None and drop if one of the columns is.

drop_non_unique

boolean – If True, data-migrator will drop values if the column uniqueness check fails (after parsing). Default is False.

Any field can be defined as a unique column. Any field set so, is checked after scanning and just before save-ing.

emitter

BaseEmitter – If set, data-migrator will use this emitter instead of the default emitter.

fail_non_unique

boolean – If True, data-migrator will fail as a whole if the column uniqueness check fails (after parsing). Default is False.

Any field can be defined as a unique column. Any field set so, is checked after scanning and just before save-ing.

fail_non_validated

boolean – If True, data-migrator will fail as a whole if the column validation check fails (after parsing). Default is False.

Any field can have its own validator, this is a rough method to prevent bad data from being transformed and loaded.

fail_on_data_exception

boolean – If True, data-migrator will fail as a whole if row parsing contains data issues. Default is True.

file_name

string – If set, data-migrator will use this as file_name for the emitter instead of the default filename based on model_name.

table_name

string – If set, data-migrator will use this as table_name for the emitter instead of the default table_name based on model_name.

prefix

string – If set, data-migrator will use this list of statements as a preamble in the generation of the output statements.

By default an emitter uses this to clear the old state.

remark

string – If set, data-migrator will use this as the remark attribute in the Model, default=’remark’. Use this for example if you have a remark field in your model and need to free the keyword.

strict

boolean – If True, data-migrator will be strict on the model and does not allow values outside of the definitions. Default is None.

manager

BaseManager – If set, data-migrator will use this as the manager for this model.

This is useful if the transform method needs to be overridden.

Raises:DefinitionException – raised if any of the defintions is not to spec.

Note

Note that only NullXXXFields actually can be None after scanning and parsing. Non Null fields are set to their default value.

Model field reference

This document contains all API references of BaseClass BaseField including the field options and field types data_migrator offers.

Note

Technically, these models are defined in data_migrator.models.fields , but for convenience they’re imported into data_migrator.models, the standard convention is to use from data_migrator import models and refer to fields as models.<Foo>Field

Field options

class data_migrator.models.fields.BaseField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

Base column definition for the transformation DSL

The following arguments are available to all field types. All are optional.

Parameters:
  • pos (int) – If positive or zero this denotes the column in the source data to select and store in this field. If not set (or negative) the fields is interpreted as not selecting just a column from the source but to take the full row in the parse function
  • name (str) – The name of this field. By default this is the name provided in the model declaration. This attribute is to replace that name by the final column name.
  • default – The default value to use if the source column is found to be a null field or if the parse function returns None. This attribute has default values for Fields that are not Null<xxx>Fields. For example NullStringField has both NULL and empty string as empty value. StringField only has empty string as empty value. With this field it can be changed to some other standard value. Consider a Country field as string and setting it to the home country by default.
  • key (boolean) – If set, this indicates the field is a key field for identification of the object.
  • nullable (str) – If set it will match the source column value and consider this a None value. By default this attribute is set to None. Note that for none Null fields None will be translated to default.
  • replacement – If set, this is a pre-emit replacement function. This could be used to insert dynamic replacement lookup select queries, adding more indirection into the data generation. Value could be either function or a string.
  • required (boolean) – If set, this indicates the field is required to be set.
  • parse – If set this is the parsing function to replace the read value into something to use further down the data migration. Use this for example to clean phone numbers, translate country definitions into alpha3 codes, or to translate ID’s into values based on a separately loaded lookup table.
  • validate – Expects a function that returns a boolean, and used to validate the input data. Expecting data within a range or a specific format, add a column validator here. Raises ValidationException if set and false.
  • max_length (int) – In case of StringField use this to trim string values to maximum length.
  • unique (boolean) –

    If True, data-migrator will check uniqueness of intermediate values (after parsing). Default is False.

    In relationship with the default manager this will keep track of values for this field. The manager can raise exceptions if uniqueness is violated. Note that it is up to the manager to either fail or drop the record if the exception is raised.

  • anonymizer – Add an additional function that will be called at emit to anonymize the data
  • validate_output – A pre-emit validator used to scan the bare output and raise exceptions if output is not as expected.
  • creation_order – An automatically generated attribute used to determine order of specification, and used in the emitting of dataset.
emit(v, escaper=None)[source]

helper function to export this field.

Expects a value from the model to be emitted

Parameters:
  • v – value to emit
  • escaper – escaper function to apply on value
Returns:

emitted value.

Raises:

ValidationException – raised if explicit validation fails.

json_schema(name=None)[source]

generate json_schema representation of this field

Parameters:name – name if not taken from this field
Returns:python representation of json schema
scan(row)[source]

scan row and harvest distinct value.

Takes a row of data and parses the required fields out of this.

Parameters:row (list) – array of source data
Returns:parsed and processed value.
Raises:ValidationException – raised if explicit validation fails.

Note

Use this with HiddenField and a row parse function if some combination of fields (aka a compound key) is expected to be unique and not to be violated.

Field types

class data_migrator.models.fields.ArrayField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

JSON array field

class data_migrator.models.fields.BooleanField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

Boolean field handler.

a bool that takes any cased permutation of true, yes, 1 and translates this into True or False otherwise.

class data_migrator.models.fields.DateTimeField(f=None, **kwargs)[source]

Basic datetime field handler

__init__(f=None, **kwargs)[source]
Parameters:f – format of the datetime Default is %Y-%m-%dT%H:%M:%SZ (RFC3999)
data_migrator.models.fields.DictField

alias of ObjectField

class data_migrator.models.fields.HiddenField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

Non emitting Field for validation and checking.

a field that accepts, but does not emit. It is useful for uniqueness checked and more. Combine this with a row parse and check the complete row.

class data_migrator.models.fields.IntField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

Basic integer field handler

data_migrator.models.fields.IntegerField

alias of IntField

class data_migrator.models.fields.JSONField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

a field that takes the values and spits out a JSON encoding string. Great for maps and lists to be stored in a string like db field.

emit(v, escaper=None)[source]

Emit is overwritten to add the to_json option.

data_migrator.models.fields.ListField

alias of ArrayField

class data_migrator.models.fields.MappingField(data_map, as_json=False, strict=False, **kwargs)[source]

Map based field translator.

a field that takes the values translates these according to a map. Great for identity column replacements. If needed output can be translated as json, for example if the map returns lists.

__init__(data_map, as_json=False, strict=False, **kwargs)[source]
Parameters:
  • data_map – The data_map needed to translate. Note the fields returns default if it is not able to map the key.
  • as_json – If True, the field will be output as json encoded. Default is False
  • strict – If True, the value must by found in the map. Default is False
emit(v, escaper=None)[source]

Emit is overwritten to add the to_json option

class data_migrator.models.fields.ModelField(fields, strict=None, **kwargs)[source]

Model relation for hierarchical structures.

a field that takes another model to build hierarchical structures.

__init__(fields, strict=None, **kwargs)[source]
Parameters:
  • fields – relationship to another model.
  • strict (boolean) – model is considered strict.
emit(v, escaper=None)[source]

Emit is overwritten to add the to_json option

class data_migrator.models.fields.NullField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

NULL returning field by generating None

json_schema(name=None)[source]

generate json_schema representation of this field

class data_migrator.models.fields.NullIntField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

Null integer field handler.

a field that accepts the column to be integer and can also be None, which is not the same as 0 (zero).

class data_migrator.models.fields.NullStringField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

Null String field handler.

a field that accepts the column to be string and can also be None, which is not the same as empty string (“”).

class data_migrator.models.fields.ObjectField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

JSON object field

class data_migrator.models.fields.StringField(pos=-1, name='', default=None, nullable='NULL', key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None)[source]

String field handler, a field that accepts the column to be string.

class data_migrator.models.fields.UTCNowField(f=None, **kwargs)[source]

UTCNow generating field.

a field that generates a UTCNow

class data_migrator.models.fields.UUIDField(*args, **kwargs)[source]

UUID generating field.

a field that generates a str(uuid.uuid4())

Manager class reference

SimpleManager

class data_migrator.models.manager.SimpleManager(*args, **kwargs)[source]

The default manager to handle models, all standard logic and generates one object per row

transform(row, previous, model)[source]

specific transform implementation, instantiates one object from a row

BaseManager

class data_migrator.models.manager.BaseManager(*args, **kwargs)[source]

BaseManager is the foundation for all managers, contains to base logic to maintain models: parse, keep and emit.

Extend this class for actual managers

all()[source]

return all results

save(o)[source]

save object(s) to this list

scan_row(row, previous=None)[source]

scan one row and save to list

Parameters:
  • row – current row to scan
  • previous (list) – list of list of previous objects in this scan
Returns:

list of saved objects

scan_rows(rows)[source]

scan many rows

stats()[source]

return current stats

transform(row, previous, model)[source]

defines the instantiation of objects from a row

Override this function in your own manager. Models are ordered and generated records are offered in the consequtives managers too.

Parameters:
  • row (list) – all input data for a new row
  • previous (list) – all generated objects from previous managers in chain
  • model (Model) – Model this manager is linked to
Returns:

list of all generated objects

Emitter class reference

Emitters are used to export models to output format.

This module contains all classes for emitters: base and actuals. Currently the system has two emitters: CSVEmitter and MySQLEmitter implemented, of which the last is the default emitter. An emitter provides the export format for the scanned and cleaned datasets. It also provides preambles and postambles in the output files, for example to clean the target table before loading it.

The following classes are defined in this module:

The basic structure for emitting is a combination between BaseManager and BaseEmitter:

e = Emitter(manager=Model.objects)
print e.preamble(header=[..my header lines to add..])
for l in Model.objects.all():
  print e.emit(l)  # emit is returning a list of strings!

Note

At this moment data-migrator does not an actively take part in schema migrations of any sort. It is purely about cleaning, anonymizing and transforming data (yet!).

MySQLEmitter

class data_migrator.emitters.mysql.MySQLEmitter(*args, **kwargs)[source]

MySQL emitter to output MySQL specific insert statements

base_template

base template to output the object

extension

str – file extension for output file of this emitter. Defaults to .sql

emit(l)[source]

Output the result set of an object as MYSQL insert statement

preamble(headers)[source]

override the preamble method to make it specific for MySQL

CSVEmitter

class data_migrator.emitters.csv.CSVEmitter(*args, **kwargs)[source]

CSV emitter to output delimited data

base_template

base template to output the object

extension

str – file extension for output file of this emitter. Defaults to .csv

emit(l)[source]

Output the result set of an object as CSV string

JSONEmitter

class data_migrator.emitters.json_emit.JSONEmitter(*args, **kwargs)[source]

JSON emitter to output as JSON Messages

extension

str – file extension for output file of this emitter. Defaults to .json

emit(l)[source]

Output the result set of an object as JSON dump

UpdateEmitter

class data_migrator.emitters.update.UpdateEmitter(*args, **kwargs)[source]

MySQL emitter to output MySQL specific update statements

base_template

base template to output the object

extension

str – file extension for output file of this emitter. Defaults to .sql

Note: at least on field needs to have key=True

emit(l)[source]

Output the result set of an object as MYSQL insert statement

preamble(headers)[source]

override the preamble method to make it specific for MySQL

BaseEmitter

class data_migrator.emitters.base.BaseEmitter(extension=None, manager=None)[source]

Base for emitters of the data-migrator.

manager

BaseManager – reference to the manager that is calling this emitter to export objects from that manager

model_class

Model – reference to the model linked to the class

extension

str – file extension for output file of this emitter

note: model_class and manager are linked together

Anonymizor class reference

Anonymizers are used to anonymize data while outputting.

This module contains all classes for anonymizers:

Note

Also check the data_migrator.contrib module for some country specific anonymizers

SimpleStringAnonymizor

class data_migrator.anonymizors.strings.SimpleStringAnonymizor[source]

SimpleStringAnonymizor translates to random printable chars

TextAnonymizor

class data_migrator.anonymizors.strings.TextAnonymizor[source]

TextAnonymizor translates to random chars taking whitespace and and punctuation into account.

ChoiceAnonymizor

class data_migrator.anonymizors.lists.ChoiceAnonymizor(choices, weights=None)[source]

ChoiceAnonymizor returns some choices with optional probabilities

>>> ChoiceAnonymizor(['M', 'F', None], weights=[0.3, 0.3, 0.4])()
'M'
choices

list – list of choices to select from

weights

list – optional list of weights

BaseAnonymizor

class data_migrator.anonymizors.base.BaseAnonymizor[source]

BaseType for anonymizers of the data-migrator.

Instantiate the anonymizer and definition and call the instantiation at translation time.

Implement the __call__() method to implement your specific anonymizor.

__call__(v)[source]

output the anonymized object.

Parameters:v – object to anonymize
Returns:anonymized value

Contrib reference

Commonly used helper functions, to support in the setup of specific transformations.

contrib.read

data_migrator.contrib.read.read_map_from_csv(key=0, value=1, f=None, delimiter='\t', header=True, as_list=False, first=True, unique=False)[source]

Generates a map from a csv and adds some validation and list parsing. A function that returns a map for MappingField to use as input in its MappingField.data_map.

>>> from data_migrator.contrib.read import read_map_from_csv
>>> table_map = read_map_from_csv(f=open('table.csv'), delimiter=';',
                                  key='id', value='name')
>>> len(table_map)
10

Note that by default it is expected to have headers in the csv.

Parameters:
  • f – Filehandle to read the csv from into the map
  • delimiter – Option to select another delimiter, other than \t
  • key – Name or position of the Key, if header is false, the ordinal position is expected (default first)
  • value – Name or position of the Value, if header is false, the ordinal position is expected (default second)
  • as_list (boolean) – If True, data-migrator will treat add all values for key as a list. Default is False.
  • first (boolean) – If True, data-migrator will keep first if non unique values for key. Default is True.
  • unique (boolean) – If True, data-migrator will treat add all non unique values for key as a violation and raise a NonUniqueDataException. Default is False.
  • header (boolean) – If True, data-migrator will treat row as a header column. Default is True
Returns:

a key, value map from the csv

Return type:

map

Raises:
data_migrator.contrib.read.read_model_from_csv(model, f=None, delimiter='\t', header=True)[source]

Reads a model from a csv.

>>> from data_migrator.contrib.read import read_model_from_csv
>>> read_map_from_csv(model=model.TestModel, f=open('table.csv'),
                      delimiter=';'),
>>> len(model.TestModel)
10

Note that by default it is expected to have headers in the csv.

Parameters:
  • model – reference to Model to read
  • f – Filehandle to read the csv from into the map
  • delimiter – Option to select another delimiter, other than \t
  • header (boolean) – If True, data-migrator will treat row as a header column. Default is True
Returns:

None, but Model.objects will contain the read records

contrib.dutch

commonly used dutch support functions for cleaning and anonymization

class data_migrator.contrib.dutch.PhoneAnonymizor[source]

PhoneAnonymizor generates a random dutch phonenumber, like +3142097272

>>> len(PhoneAnonymizor()('020-1234583'))
11
>>> len(PhoneAnonymizor()('06-12345678'))
11
class data_migrator.contrib.dutch.ZipCodeAnonymizor[source]

ZipCodeAnonymizor generates a random dutch zipcode, like ‘4897 LD’

>>> len(ZipCodeAnonymizor()('1234 aa'))
7
data_migrator.contrib.dutch.clean_phone(v)[source]

Cleans phone numbers to dutch clean format

clean_phone clean phone numbers, replaces all characters and spaces adds dutch country code (+31) if no country code is provide

>>> clean_phone('00 31 6 - 20 20 20 20')
'+31620202020'
>>> clean_phone('06 20 20 20 20')
'+31620202020'
>>> clean_phone('020 -123 345 6')
'+31201233456'
>>> clean_phone('+440.203.020.23')
'+44020302023'
>>> clean_phone('+440a203a020a23')
'+44020302023'
>>> clean_phone('+440 ada 203.020 // 23')
'+44020302023'
>>> clean_phone('31 (6) - 20 20 20 20')
'+31620202020'
Parameters:v (str) – value to clean
Returns:cleaned phone number
Return type:str
data_migrator.contrib.dutch.clean_zip_code(v)[source]

Cleans a dutch zipcode

>>> clean_zip_code('1234 aa')
'1234AA'
>>> clean_zip_code('1234AB')
'1234AB'
>>> clean_zip_code('1234   Ba')
'1234BA'
>>> clean_zip_code('1 2 3 4 A B')
'1 2 3 4 A B'
>>> clean_zip_code('blabla')
'blabla'
Parameters:v (str) – zipcode to clean
Returns:cleaned zip code
Return type:str

Exceptions reference

Explicit exceptions for this package

exception data_migrator.exceptions.DataException[source]

Error in input data

exception data_migrator.exceptions.DefinitionException[source]

Error in definition

exception data_migrator.exceptions.InternalException[source]

Unexpected internal error

exception data_migrator.exceptions.NonUniqueDataException[source]

Non unique data based on key found

exception data_migrator.exceptions.NullDataException[source]

data = NULL exception

exception data_migrator.exceptions.ValidationException[source]

Validate Error in input data

About

Data transformation is a classic problem in compute, but often underestimated in modern software development. Everybody working with persistent data will be involved somehow in restructuring existing data in databases or files, when systems and architectures evolve. A wide range of practices exist, ranging from ad-hoc scripts to sophisticated ETL processes. We were upgrading existing codebases at Schuberg Philis, moving from a mono-lithical application to a microservice architecture. At that moment we found ourselves writing ad-hoc python data migration scripts. Table by table the transformation was done, simply by exporting existing data in CSV’s. With some simple python scripts - a single read/print loop - new INSERT statements for the target database were generated.

How hard can it be? Just some basic processing and INSERT statement emitting, but soon we had to cleaning/fixing data, generating multiple records out of single rows, combine columns, lookup data from other sources. The scripts became larger, more exceptions: unreadable and hard to maintain. That was the moment when we came up with the idea of applying a more declarative approach. Being pretty charmed by the Django model, we adopted that as a base. Soon after, a standardized system based on a declarative definitions originated.

This package is a simple alternative to doing ad-hoc scripts. It is easy to learn, easy to extend and highly expressive in building somewhat more complex transformations. Think of for example:

  • renaming, reordering columns
  • changing types
  • lookup data, advanced transformations
  • generating permission records in separate tables for main data

Alternatives

There are many packages that offer parts to do this stuff, some packages we know:

  • ruffus: A nice data transformation package, however more geared to long running computational processes (DNA Sequencing) and less to frictionless table transformation.
  • petl: the name says it all extract, tranform and load. You can do cool stuff with it. Basically moving table manipulation to Python. We preferred a more direct map and transform approach.
  • tablib: very handy if you need to open various sources, a nice extension to data-migrator
  • pandas: A big one, if you need more data analysis and less of datamigration, this could be your favorite poison.
  • Your average bulk of SQL scripts. Still a useful tool, we love the ease of repetiveness using the pipes-and-filters paradigm in data-migrator.
  • R: R is not Python, but tremendously handy if you need large data set processing and statistics.

Next step

Now see the example and move on to the installation

Contribute

https://circleci.com/gh/schubergphilis/data-migrator.svg?style=svg Documentation Status https://api.codacy.com/project/badge/Grade/bf6030e9e7e248979607802880336611 https://api.codacy.com/project/badge/Coverage/bf6030e9e7e248979607802880336611 https://badge.fury.io/py/data-migrator.svg
https://img.shields.io/pypi/pyversions/data-migrator.svg

Feel welcome if you would like to contribute. The details on how are described in the CONTRIBUTING.md

Build Environment

data-migrator uses the following tools for its build environment:

Function Tool Location
Repository Github Repo https://github.com/schubergphilis/data-migrator
Issues Github Issues https://github.com/schubergphilis/data-migrator/issues
Registry PyPI https://pypi.python.org/pypi/data-migrator
Build CircleCI https://circleci.com/gh/schubergphilis/data-migrator
Documentation Read the Docs / Sphinx http://data-migrator.readthedocs.io/en/latest/
Test Coverage Python Coverage to Codacy http://coverage.readthedocs.io/en/latest/
Code Quality Codacy https://www.codacy.com/app/schubergphilis/data-migrator
Packaging Setuptools https://setuptools.readthedocs.io/en/latest/
Python Version Tox https://tox.readthedocs.io/en/latest/

Contributor Code of Conduct

As contributors and maintainers of these projects, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities.

We are committed to making participation in these projects a harassment-free experience for everyone, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality.

Examples of unacceptable behavior by participants include:

  • The use of sexualized language or imagery
  • Personal attacks
  • Trolling or insulting/derogatory comments
  • Public or private harassment
  • Publishing other’s private information, such as physical or electronic addresses, without explicit permission
  • Other unethical or unprofessional conduct.

Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct. By adopting this Code of Conduct, project maintainers commit themselves to fairly and consistently applying these principles to every aspect of managing this project. Project maintainers who do not follow or enforce the Code of Conduct may be permanently removed from the project team.

This code of conduct applies both within project spaces and in public spaces when an individual is representing the project or its community.

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by opening an issue or contacting one or more of the project maintainers.

This Code of Conduct is copied from PyPA. and is adapted from the Contributor Covenant, version 1.2.0 available at http://contributor-covenant.org/version/1/2/0/.

Overview of releases

This is a short overview of releases:

  • [0.6.0] - 2017-08-18 data-migrator with anonymization.
  • [0.5.0] - 2017-04-21 data-migrator is fit for Python 3.6 compatible.
  • [0.4.6] - 2017-04-13 First full open source release with all build tools and documentation in place.

For more details see the CHANGELOG.md

Indices and tables