Welcome to tlpipe’s documentation!

Introduction

Introduction

  • This is a Python project for the Tianlai data processing pipeline.
  • This software can simply run as a single process on a single compute node, but for higher performance, it can also use the Message Passing Interface (MPI) to run data processing tasks distributed and parallelly on multiple computie nodes / supercomputers.
  • It mainly focuses on the Tianlai cylinder array’s data processing, though its basic framework and many tasks also work for Tianlai dish array’s data, it does not specifically tuned for that currently.
  • It can fulfill data processing tasks from reading data from raw observation data files, to RFI flagging, to relative and absolute calibration, to map-making, etc. It also provides some plotting tasks for data visualization.
  • Currrently, foreground subtractiong and power spectrum estimation have not been implemented, but they will come maybe in the near future.

Installation

Installation

Python version

The package works only with python 2 with version >= 2.75. Python 3 does not supported currently.

Prerequisites

For the installation and proper work of tlpipe, the following packages are required:

  • h5py, Pythonic interface to the HDF5 binary data format;
  • healpy, Healpix tools package for Python;
  • pyephem, Basic astronomical computations for the Python;
  • numpy, Base N-dimensional array package for Python;
  • scipy, The fundamental package for scientific computing with Python;
  • matplotlib, A python 2D plotting library;
  • caput, Cluster Astronomical Python Utilities;
  • cora, A package for simulating skies for 21cm Intensity Mapping;
  • aipy, Astronomical Interferometry in PYthon;
  • cython, An static compiler for Python, *optional;
  • mpi4py, MPI for Python, optional.

Note

tlpipe can work without MPI support, in which case, only a single process is invoked, but in order to process large amounts of data in parallel and distributed manner, mpi4py is needed.

Installation guide

After you have successfully installed the prerequisites, do the following.

First clone this package

$ git clone https://github.com/TianlaiProject/tlpipe.git

Then change to the top directory of this package, install it by the usual methods, either the standard

$ python setup.py install [--user]

or to develop the package

$ python setup.py develop [--user]

It should also be installable directly with pip using the command

$ pip install [-e] git+https://github.com/TianlaiProject/tlpipe.git

Note

If you have installed tlpipe in the develop mode, you doesn’t need to re-install the package every time after you have changed its (pure python) code. This is useful when you are the developer of the package or you want to do some development/contributions to the package.

Tutorial

Tutorial

Note

This is intended to be a tutorial for the user of tlpipe package, who will just use the already presented tasks in the package to do some data analysis. For the developers of this package and those who want to do some developments/continuations, you may want to refer Developer’s guide for a deeper introduction.

Prepare for the input pipe file

An input pipe file is actually a python script file, so it follows plain python syntax, but to emphasis that it is just used as an input pipe file for a data analysis pipeline, usually it is named with a suffix “.pipe” instead of “.py”.

The only required argument to run a data analysis pipeline is the input pipe file, in which one specifies all tasks to be imported and excuted, all parameter settings for each task and also the excuting order (or flow controlling) of the pipeline.

Here we take the waterfall plot as an example to show how to write an input pipe file.

Non-iterative pipeline
  1. Create and open an file named plot_wf.pipe (the name can be choosen arbitrary);

  2. Speicify a variable pipe_tasks to hold analysis tasks that will be imported and excuted, and (optionally) a variable pipe_outdir to set the output directory (the default value is ‘./output/’). You can set other parameters related to the pipeline according to your need or just use the default values. All paramters and their default values can be checked by method show_params(), note: all these parameters should be prepended with a prefix “pipe_”;

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    # -*- mode: python; -*-
    
    # input file for the analysis pipeline
    # execute this pipeline by either command of the following two:
    # tlpipe dir/to/plot_wf.pipe
    # mpiexec -n N tlpipe dir/to/plot_wf.pipe
    
    
    pipe_tasks = []
    pipe_outdir = './output/'
    pipe_logging = 'notset'
    # pipe_logging = 'info'
    
  3. Import tasks and set task parameters:

    1. Import Dispatch to select data to plot;

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      import glob
      data_dir = 'dir/to/data' # your data directory
      files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list
      
      
      # data selection
      from tlpipe.timestream import dispatch
      pipe_tasks.append(dispatch.Dispatch)
      ### parameters for Dispatch
      dp_input_files = files # data files as list
      dp_freq_select = (500, 510) # frequency indices, from 500 to 510
      dp_feed_select = [1, 2, 32, 33] # feed no. as a list
      dp_out = 'dp'
      
    2. Import Detect to find and mask noise source signal;

      1
      2
      3
      4
      5
      6
      7
      # find and mask noise source signal
      from tlpipe.timestream import detect_ns
      pipe_tasks.append(detect_ns.Detect)
      ### parameters for Detect
      dt_in = dp_out
      # dt_feed = 1
      dt_out = 'dt'
      
    3. Import Plot to plot;

      1
      2
      3
      4
      5
      6
      7
      from tlpipe.plot import plot_waterfall
      pipe_tasks.append(plot_waterfall.Plot)
      ### parameters for Plot
      pwf_in = dt_out
      pwf_flag_ns = True # mask noise source signal
      pwf_fig_name = 'waterfall/wf' # figure name to save
      pwf_out = 'pwf'
      

The final input pipe file looks like download:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# -*- mode: python; -*-

# input file for the analysis pipeline
# execute this pipeline by either command of the following two:
# tlpipe dir/to/plot_wf.pipe
# mpiexec -n N tlpipe dir/to/plot_wf.pipe


pipe_tasks = []
pipe_outdir = './output/'
pipe_logging = 'notset'
# pipe_logging = 'info'


import glob
data_dir = 'dir/to/data' # your data directory
files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list


# data selection
from tlpipe.timestream import dispatch
pipe_tasks.append(dispatch.Dispatch)
### parameters for Dispatch
dp_input_files = files # data files as list
dp_freq_select = (500, 510) # frequency indices, from 500 to 510
dp_feed_select = [1, 2, 32, 33] # feed no. as a list
dp_out = 'dp'

# find and mask noise source signal
from tlpipe.timestream import detect_ns
pipe_tasks.append(detect_ns.Detect)
### parameters for Detect
dt_in = dp_out
# dt_feed = 1
dt_out = 'dt'

# plot waterfall of selected data
from tlpipe.plot import plot_waterfall
pipe_tasks.append(plot_waterfall.Plot)
### parameters for Plot
pwf_in = dt_out
pwf_flag_ns = True # mask noise source signal
pwf_fig_name = 'waterfall/wf' # figure name to save
pwf_out = 'pwf'

Note

  1. To show all pipeline related parameters and their default values, you can do:

    >>> from tlpipe.pipeline import pipeline
    >>> pipeline.Manager.prefix
    'pipe_'
    >>> pipeline.Manager.show_params()
    Parameters of Manager:
    copy:  True
    tasks:  []
    logging:  info
    flush:  False
    timing:  False
    overwrite:  False
    outdir:  output/
    
  2. Each imported task should be appended into the list pipe_tasks in order to be excuted by the pipeline;

  3. Each task’s paramters should be prepended with its own prefix. See the source file of each task to get the prefix and all paramters that can be set. You can also get the prefix and paramters (and their default values) by the following method (take Dispatch for example):

    >>> from tlpipe.timestream import dispatch
    >>> dispatch.Dispatch.prefix
    'dp_'
    >>> dispatch.Dispatch.show_params()
    Parameters of task Dispatch:
    out:  None
    requires:  None
    in:  None
    iter_start:  0
    iter_step:  1
    input_files:  None
    iter_num:  None
    copy:  False
    iterable:  False
    output_files:  None
    time_select:  (0, None)
    stop:  None
    libver:  latest
    corr:  all
    exclude:  []
    check_status:  True
    dist_axis:  0
    freq_select:  (0, None)
    feed_select:  (0, None)
    tag_output_iter:  True
    tag_input_iter:  True
    start:  0
    mode:  r
    pol_select:  (0, None)
    extra_inttime:  150
    days:  1.0
    drop_days:  0.0
    exclude_bad:  True
    
  4. Usally the input of one task should be ether read from the data files, for example:

    1
    dp_input_files = files # data files as list
    

    or is the output of a previously excuted task (to construct a task chain), for example:

    1
    dt_in = dp_out
    
    1
    pwf_in = dt_out
    
Iterative pipeline

To make the pipeline iteratively run for several days data, or more than one group (treat a list of files as a separate group) of data, you should set the parameter iterable of each task you want to iterate to True, and optionally specify an iteration number. If no iteration number is specified, the pipeline will iteratively run until all input data has been processed. Take again the above waterfall plot as an example, suppose you want to iteratively plot the waterfall of 2 days data, or two separate groups of data, the input pipe file plot_wf_iter.pipe download is like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# -*- mode: python; -*-

# input file for the analysis pipeline
# execute this pipeline by either command of the following two:
# tlpipe dir/to/plot_wf_iter.pipe
# mpiexec -n N tlpipe dir/to/plot_wf_iter.pipe


pipe_tasks = []
pipe_outdir = './output/'
pipe_logging = 'notset'
# pipe_logging = 'info'


import glob
data_dir1 = 'dir1/to/data' # your data directory
data_dir2 = 'dir2/to/data' # your data directory

###  one way
files = sorted(glob.glob(data_dir1+'/*.hdf5')) # more than 1 day's data files as a list

### or another way
group1 = sorted(glob.glob(data_dir1+'/*.hdf5'))
group2 = sorted(glob.glob(data_dir2+'/*.hdf5'))
files = [ group1, group2 ] # or two groups of data, each as a list of data files


# data selection
from tlpipe.timestream import dispatch
pipe_tasks.append(dispatch.Dispatch)
### parameters for Dispatch
dp_input_files = files # data files as list
dp_freq_select = (500, 510) # frequency indices, from 500 to 510
dp_feed_select = [1, 2, 32, 33] # feed no. as a list
dp_iterable = True
dp_iter_num = 2 # set the number of iterations
dp_tag_input_iter = False
dp_out = 'dp'

# find and mask noise source signal
from tlpipe.timestream import detect_ns
pipe_tasks.append(detect_ns.Detect)
### parameters for Detect
dt_in = dp_out
# dt_feed = 1
dt_iterable = True
dt_out = 'dt'

# plot waterfall of selected data
from tlpipe.plot import plot_waterfall
pipe_tasks.append(plot_waterfall.Plot)
### parameters for Plot
pwf_in = dt_out
pwf_iterable = True
pwf_flag_ns = True # mask noise source signal
pwf_fig_name = 'waterfall/wf' # figure name to save
pwf_out = 'pwf'

Note

The number of iterations can be set only once in the first task, as after the first task has been executed the specified number of iterations, it will no longer produce its output for the subsequent tasks, those task will stop to iterate when there is no input for it.

Non-trivial control flow

You can run several tasks iteratively, and then run some other tasks non-iteratively when the iterative tasks all have done.

For example, if you want the waterfall plot of two days averaged data, you can iteratively run several tasks, each iteration for one day data, and then combine (accumulate and average) the two days data and plot its waterfall, just as follows shown in plot_wf_nontrivial.pipe download:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# -*- mode: python; -*-

# input file for the analysis pipeline
# execute this pipeline by either command of the following two:
# tlpipe dir/to/plot_wf_nontrivial.pipe
# mpiexec -n N tlpipe dir/to/plot_wf_nontrivial.pipe


pipe_tasks = []
pipe_outdir = './output/'
pipe_logging = 'notset'
# pipe_logging = 'info'


import glob
data_dir = 'dir/to/data' # your data directory
files = sorted(glob.glob(data_dir+'/*.hdf5')) # at least 2 days data files as a list


# data selection
from tlpipe.timestream import dispatch
pipe_tasks.append(dispatch.Dispatch)
### parameters for Dispatch
dp_input_files = files # data files as list
dp_freq_select = (500, 510) # frequency indices, from 500 to 510
dp_feed_select = [1, 2, 32, 33] # feed no. as a list
dp_iterable = True
dp_iter_num = 2 # set the number of iterations
dp_tag_input_iter = False
dp_out = 'dp'

# find and mask noise source signal
from tlpipe.timestream import detect_ns
pipe_tasks.append(detect_ns.Detect)
### parameters for Detect
dt_in = dp_out
# dt_feed = 1
dt_iterable = True
dt_out = 'dt'

# plot waterfall of selected data
from tlpipe.plot import plot_waterfall
pipe_tasks.append(plot_waterfall.Plot)
### parameters for Plot
pwf_in = dt_out
pwf_iterable = True
pwf_flag_ns = True # mask noise source signal
pwf_fig_name = 'waterfall/wf' # figure name to save
pwf_out = 'pwf'

# convert raw timestream to timestream
from tlpipe.timestream import rt2ts
pipe_tasks.append(rt2ts.Rt2ts)
### parameters for Rt2ts
r2t_in = dt_out # can also be pwf_out as it is the same
r2t_iterable = True
r2t_out = 'r2t'

# re-order the data to have RA from 0 to 2pi
from tlpipe.timestream import re_order
pipe_tasks.append(re_order.ReOrder)
### parameters for ReOrder
ro_in = r2t_out
ro_iterable = True
ro_out = 'ro'

# accumulate the re-ordered data from different days
from tlpipe.timestream import accumulate
pipe_tasks.append(accumulate.Accum)
### parameters for Accum
ac_in = ro_out
ac_iterable = True
ac_out = 'ac'

# barrier above iterative tasks before executing the following tasks.
from tlpipe.timestream import barrier
pipe_tasks.append(barrier.Barrier)
### parameters for Barrier

# average the accumulated data
from tlpipe.timestream import average
pipe_tasks.append(average.Average)
### parameters for Average
av_in = ac_out
av_output_files = [ 'average/file_%d.hdf5' %i for i in range(1, 7) ] # here save intermediate results
av_out = 'av'

# waterfall plot of the averaged data
from tlpipe.plot import plot_waterfall
pipe_tasks.append((plot_waterfall.Plot, 'pwf1_')) # here use a new prefix pwf1_ instead of the default pwf_ to discriminate from the previous plot_waterfall
### parameters for Plot
pwf1_in = av_out
pwf1_input_files = av_output_files # here you can read data from the saved intermediate data files if you do not set pwf1_in
pwf1_flag_ns = True
pwf1_fig_name = 'vis_av/vis'
pwf1_out = 'pwf1'

Note

Notice the use of the task Barrier to block the control flow before the executing of its subsequent tasks. As the task Barrier won’t get its input from any other tasks, the pipeline will restart at the begining every time when it gets to execute Barrier. Once everything before Barrier has been executed, it will unblocks its subsequent tasks and allow them to proceed normally.

Note

Note in real data analysis, the data should be RFI flagged, calibrated, and maybe some other processes done before the data accumulating and averaging, here for simplicity and easy understanding, we have omitted all those processes. One can refer to the real data analysis pipeline input files in the package’s input directory.

Execute several times a same task

Special care need to be taken when executing several times a same task. Since the input pipe file is just a plain python script, it will be first executed before the parameters parsing process, the assignment of a variable will override the same named variable before it during the excuting of the pipe file script. So for the need of executing several times a same task, different prefixes should be set for each of these tasks (i.e., except for the first appeared which could have just use the default prefix of the task, all others need to set a different prefix). To do this, you need to append a 2-tuple to the list pipe_tasks, with its first element being the imported task, and the second element being a new prefix to use. See for example the line

1
pipe_tasks.append((plot_waterfall.Plot, 'pwf1_')) # here use a new prefix pwf1_ instead of the default pwf_ to discriminate from the previous plot_waterfall

in plot_wf_nontrivial.pipe in the above example.

Save intermediate data

To save data that has been processed by one task (used for maybe break point recovery, etc.), you can just set the output_files paramter of this task to be a list of file names (can only save as hdf5 data files), then data will be split into almost equal chunks along the time axis and save each chunk to one of the data file. For example, see the line

1
av_output_files = [ 'average/file_%d.hdf5' %i for i in range(1, 7) ] # here save intermediate results

in plot_wf_nontrivial.pipe in the above example.

Recovery from intermediate data

You can recovery the pipeline from a break point (where you have saved the intermediate data) by reading data from data files you have saved. To do this, instead of set the in parameter, you need to set the input_files paramter to a list with elements being the saved data files. For example, see the line

1
pwf1_input_files = av_output_files # here you can read data from the saved intermediate data files if you do not set pwf1_in

in plot_wf_nontrivial.pipe in the above example.

Note

If the in paramter and the input_files parameter are both set, the task will get its input from the in paramter instead of reading data from the input_files as it is much slower to read the data from the files. So in order to recovery from the break point, you should not set the in parameter, or should set in to be None, which is the default value.

Run the pipeline

Single process run

If you do not have an MPI environment installed, or you just want a single process run, just do (in case plot_wf.pipe is in you working directory)

$ tlpipe plot_wf.pipe

or (in case plot_wf.pipe isn’t in you working directory)

$ tlpipe dir/to/plot_wf.pipe

If you want to submit and run the pipeline in the background, do like

$ nohup tlpipe dir/to/plot_wf.pipe &> output.txt &
Multiple processes run

To run the pipeline in parallel and distributed maner on a cluster using multiple processes, you can do something like (in case plot_wf.pipe is in you working directory)

$ mpiexec -n N tlpipe plot_wf.pipe

or (in case plot_wf.pipe isn’t in you working directory)

$ mpiexec -n N tlpipe dir/to/plot_wf.pipe

If you want to submit and run the pipeline in the background on several nodes, for example, node2, node3, node4, do like

$ nohup mpiexec -n N -host node2,node3,node4 --map-by node tlpipe dir/to/plot_wf.pipe &> output.txt &

Note

In the above commands, N is the number of processes you want to run!

Pipeline products and intermediate results

Pipeline products and intermediate results will be in the directory setting by pipe_outdir.

Other excutable commands

  • h5info: Check what’s in a (or a list of) HDF5 data file(s). For its use, do some thing like

    $ h5info data.hdf5
    

    or

    $ h5info data1.hdf5, data2.hdf5, data3.hdf5
    

Developer’s guid

Developer’s guide

Write a general task

A pipeline task is a subclass of TaskBase intended to perform some small, modular piece analysis.

To write a general task, you can use the following template general_task.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""A general task template."""

from tlpipe.pipeline.pipeline import TaskBase, PipelineStopIteration


class GeneralTask(TaskBase):
    """A general task template."""

    # input parameters and their default values as a dictionary
    params_init = {
                    'task_param': 'param_val',
                  }

    # prefix of this task
    prefix = 'gt_'

    def __init__(self, parameter_file_or_dict=None, feedback=2):

        # Read in the parameters.
        super(self.__class__, self).__init__(parameter_file_or_dict, feedback)

        # Do some initialization here if necessary
        print 'Initialize the task.'

    def setup(self):
        # Set up works here if necessary
        print "Setting up the task."

    def next(self):
        # Doing the actual work here
        print 'Executing the task with paramter task_param = %s' % self.params['task_param']
        # stop the task
        raise PipelineStopIteration()

    def finish(self):
        # Finishing works here if necessary
        print "Finished the task."

The developer of the task must specify what input parameters the task expects if it has and a prefix, as well as code to perform the actual processing for the task.

Input parameters are specified by adding class attributes params_init which is a dictionary whose entries are key and default value pairs. A prefix is used to identify and read the corresponding parameters from the input pipe file for this task.

To perform the actual processing for the task, you could first do the necessary initialization in __init__(), then implement three methods setup(), next() and finish(). Usually the only necessary method to be implemented is next(), in which the actual processing works are done, the other methods __init__(), setup(), finish() do not need if there is no specifical initialization, setting up, and finishing work to do. These methods are executed in order, with next() possibly being executed many times. Iteration of next() is halted by raising a PipelineStopIteration.

To make it work, you could put it somewhere like in tlpipe/tlpipe/timestream/, and write a input pipe file like general_task.pipe:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# -*- mode: python; -*-

# input file for pipeline manager
# execute this pipeline by either command of the following two:
# tlpipe dir/to/general_task.pipe
# mpiexec -n N tlpipe dir/to/general_task.pipe


pipe_tasks = []
pipe_outdir = './output/'


from tlpipe.timestream import general_task
pipe_tasks.append(general_task.GeneralTask)
### parameters for GeneralTask
gt_task_param = 'new_val'

then execute the task by run

$ tlpipe general_task.pipe

Write a task to process timestream data

To write a task to process the timestream data (i.e., the visibility and auxiliary data), you can use the following template ts_template.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
"""Timestream task template."""

import timestream_task


class TsTemplate(timestream_task.TimestreamTask):
    """Timestream task template."""

    params_init = {
                    'task_param': 'param_val',
                  }

    prefix = 'tt_'

    def process(self, ts):

        print 'Executing the task with paramter task_param = %s' % self.params['task_param']
        print
        print 'Timestream data is contained in %s' % ts

        return super(TsTemplate, self).process(ts)

Here, instead of inherit from TaskBase, we inherit from its subclass TimestreamTask, and implement the method process() (and maybe also __init__(), setup(), and finish() if necessary). The timestream data is contained in the argument ts, which may be an instance of RawTimestream or Timestream.

Note

You do not need to override the method next() now, because in the class OneAndOne, which is the super class of TimestreamTask, we have

class OneAndOne(TaskBase):

    def next(self, input=None):
        # ...
        output = self.read_process_write(input)
        # ...
        return output

    def read_process_write(self, input):
        # ...
        output = self.process(input)
        # ...
        return output

Use data operate functions in timestream tasks

To write a task to process the timestream data, you (in most cases) only need to implement process() with the input timestream data contained in its argument ts, as stated above. To help with the data processing, you could use some of the data operate functions defined in the corresponding timestream data container class, which can automatically split the data along one axis or some axes among multiple process and iteratively process all these data slices. For example, to write to task to process the raw timestream data along the axis of baseline, i.e., to process a time-frequency slice of the raw data each time, you can have the task like ts_task.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
"""A task to process the raw timestream data along the axis of baseline."""

import timestream_task


class TsTask(timestream_task.TimestreamTask):
    """A task to process the raw timestream data along the axis of baseline."""

    params_init = {
                  }

    prefix = 'tt_'

    def process(self, ts):

        # distribute data along the axis of baseline
        ts.redistribute('baseline')

        # use data operate function of `ts`
        ts.bl_data_operate(self.func)

        return super(TsTask, self).process(ts)

    def func(self, vis, vis_mask, li, gi, bl, ts, **kwargs):
        """Function that does the actual task."""

        # `vis` is the time-frequency slice of the visibility
        print vis.shape
        # `vis_mask` is the time-frequency slice of the visibility mask
        print vis_mask.shape
        # `li`, `gi` is the local and global index of this slice
        # `bl` is the corresponding baseline
        print li, gi, bl

To execute the task, put it somewhere like in tlpipe/tlpipe/timestream/, and write a input pipe file like ts_task.pipe:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# -*- mode: python; -*-

# input file for pipeline manager
# execute this pipeline by either command of the following two:
# tlpipe dir/to/ts_task.pipe
# mpiexec -n N tlpipe dir/to/ts_task.pipe


pipe_tasks = []
pipe_outdir = './output/'
pipe_logging = 'notset'
# pipe_logging = 'info'
pipe_timing = True
pipe_flush = True


import glob
data_dir = 'dir/to/data' # your data directory
files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list


# data selection
from tlpipe.timestream import dispatch
pipe_tasks.append(dispatch.Dispatch)
### parameters for Dispatch
dp_input_files = files # data files as list
dp_freq_select = (500, 510) # frequency indices, from 500 to 510
dp_feed_select = [1, 2, 32, 33] # feed no. as a list
dp_out = 'dp'

from tlpipe.timestream import ts_task
pipe_tasks.append(ts_task.TsTask)
### parameters for TsTask
tt_in = dp_out
tt_out = 'tt'

then execute the task by run

$ tlpipe ts_task.pipe

These are some data operate functions that you can use:

Data operate functions of RawTimestream and Timestream:

class tlpipe.container.timestream_common.TimestreamCommon
data_operate(func, op_axis=None, axis_vals=0, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
all_data_operate(func, copy_data=False, **kwargs)
time_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_freq_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)

Additional data operate functions of Timestream:

class tlpipe.container.timestream.Timestream
pol_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_pol_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_and_pol_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
pol_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_freq_and_pol_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_freq_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_pol_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_pol_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)

Reference

core – Core functionalities

Constants

constants Various constants.

Tianlai array model

tl_array

kiyopy – Kiyoshi Masui’s paramter parsing package

Pameter parser

parse_ini

Custom exceptions

custom_exceptions Custom Exceptions.

Utilities

utils A few utilities that I have found the need for.

pipeline – Pipeline control and tasks

Pipeline

pipeline

container – Timestream data containers

Data containers

container
timestream_common
raw_timestream
timestream

timestream – Timestream operating tasks

Timestream base task

timestream_task

Operating tasks

dispatch
detect_ns
rfi_flagging
line_rfi
time_flag
freq_flag
multiscale_flag
combine_mask
sir_operate
rfi_stats
bad_detect
delay_transform
ns_cal
rt2ts
ps_fit
ps_cal
apply_gain
temperature_convert
phs2src
phs2zen
phase_closure
ps_subtract
daytime_mask
sun_mask
re_order
accumulate
barrier
average
freq_rebin
map_making
gen_beam

rfi – RFI flagging methods

Surface fitting methods

surface_fit
local_fit Local fit method.
local_average_fit Local average fit method.
local_median_fit Local median fit method.
local_minimum_fit Local minimum fit method.
gaussian_filter Gaussian filter method.
interpolate Spline interpolation method.

Combinatorial thresholding methods

combinatorial_threshold
var_threshold The VarThreshold method.
sum_threshold The SumThreshold method.

Mathematical morphological methods

dilate_operator This implements the mathematical morphological dilate operation.
sir_operator This implements the scale-invariant rank (SIR) operator.

cal – Calibration

Coming soon…

map – Map-making

Driftscan map-making

Richard Shaw’s m-mode formalism method.

foreground – Foreground subtraction

Coming soon…

ps – Power spectrum estimation

Coming soon…

plot – Tasks for plottting

Plotting tasks

plot_integral
plot_slice
plot_waterfall
plot_phase

utils – Utility functions

Utilities

date_util Date and time utils.
np_util
path_util Path utils.
pickle_util Some functions related to pickle.
robust_stats Robust statistical utilities.
sg_filter
rpca_decomp
multiscale
hist_eq

Indices and tables