Welcome to tlpipe’s documentation!¶
Introduction¶
Introduction¶
- This is a Python project for the Tianlai data processing pipeline.
- This software can simply run as a single process on a single compute node, but for higher performance, it can also use the Message Passing Interface (MPI) to run data processing tasks distributed and parallelly on multiple computie nodes / supercomputers.
- It mainly focuses on the Tianlai cylinder array’s data processing, though its basic framework and many tasks also work for Tianlai dish array’s data, it does not specifically tuned for that currently.
- It can fulfill data processing tasks from reading data from raw observation data files, to RFI flagging, to relative and absolute calibration, to map-making, etc. It also provides some plotting tasks for data visualization.
- Currrently, foreground subtractiong and power spectrum estimation have not been implemented, but they will come maybe in the near future.
Installation¶
Installation¶
Python version¶
The package works only with python 2 with version >= 2.75. Python 3 does not supported currently.
Prerequisites¶
For the installation and proper work of tlpipe
, the following packages are
required:
- h5py, Pythonic interface to the HDF5 binary data format;
- healpy, Healpix tools package for Python;
- pyephem, Basic astronomical computations for the Python;
- numpy, Base N-dimensional array package for Python;
- scipy, The fundamental package for scientific computing with Python;
- matplotlib, A python 2D plotting library;
- caput, Cluster Astronomical Python Utilities;
- cora, A package for simulating skies for 21cm Intensity Mapping;
- aipy, Astronomical Interferometry in PYthon;
- cython, An static compiler for Python, *optional;
- mpi4py, MPI for Python, optional.
Note
tlpipe
can work without MPI support, in which case, only a single
process is invoked, but in order to process large amounts of data in parallel
and distributed manner, mpi4py is needed.
Installation guide¶
After you have successfully installed the prerequisites, do the following.
First clone this package
$ git clone https://github.com/TianlaiProject/tlpipe.git
Then change to the top directory of this package, install it by the usual methods, either the standard
$ python setup.py install [--user]
or to develop the package
$ python setup.py develop [--user]
It should also be installable directly with pip using the command
$ pip install [-e] git+https://github.com/TianlaiProject/tlpipe.git
Note
If you have installed tlpipe
in the
develop
mode, you doesn’t need to re-install the package every time after you have
changed its (pure python) code. This is useful when you are the developer
of the package or you want to do some development/contributions to the package.
Tutorial¶
Tutorial¶
Note
This is intended to be a tutorial for the user of tlpipe package, who will just use the already presented tasks in the package to do some data analysis. For the developers of this package and those who want to do some developments/continuations, you may want to refer Developer’s guide for a deeper introduction.
Contents
Prepare for the input pipe file¶
An input pipe file is actually a python script file, so it follows plain python syntax, but to emphasis that it is just used as an input pipe file for a data analysis pipeline, usually it is named with a suffix “.pipe” instead of “.py”.
The only required argument to run a data analysis pipeline is the input pipe file, in which one specifies all tasks to be imported and excuted, all parameter settings for each task and also the excuting order (or flow controlling) of the pipeline.
Here we take the waterfall plot as an example to show how to write an input pipe file.
Non-iterative pipeline¶
Create and open an file named plot_wf.pipe (the name can be choosen arbitrary);
Speicify a variable pipe_tasks to hold analysis tasks that will be imported and excuted, and (optionally) a variable pipe_outdir to set the output directory (the default value is ‘./output/’). You can set other parameters related to the pipeline according to your need or just use the default values. All paramters and their default values can be checked by method
show_params()
, note: all these parameters should be prepended with a prefix “pipe_”;1 2 3 4 5 6 7 8 9 10 11 12
# -*- mode: python; -*- # input file for the analysis pipeline # execute this pipeline by either command of the following two: # tlpipe dir/to/plot_wf.pipe # mpiexec -n N tlpipe dir/to/plot_wf.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info'
Import tasks and set task parameters:
Import
Dispatch
to select data to plot;1 2 3 4 5 6 7 8 9 10 11 12 13
import glob data_dir = 'dir/to/data' # your data directory files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_out = 'dp'
Import
Detect
to find and mask noise source signal;1 2 3 4 5 6 7
# find and mask noise source signal from tlpipe.timestream import detect_ns pipe_tasks.append(detect_ns.Detect) ### parameters for Detect dt_in = dp_out # dt_feed = 1 dt_out = 'dt'
Import
Plot
to plot;1 2 3 4 5 6 7
from tlpipe.plot import plot_waterfall pipe_tasks.append(plot_waterfall.Plot) ### parameters for Plot pwf_in = dt_out pwf_flag_ns = True # mask noise source signal pwf_fig_name = 'waterfall/wf' # figure name to save pwf_out = 'pwf'
The final input pipe file looks like download
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 # -*- mode: python; -*- # input file for the analysis pipeline # execute this pipeline by either command of the following two: # tlpipe dir/to/plot_wf.pipe # mpiexec -n N tlpipe dir/to/plot_wf.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info' import glob data_dir = 'dir/to/data' # your data directory files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_out = 'dp' # find and mask noise source signal from tlpipe.timestream import detect_ns pipe_tasks.append(detect_ns.Detect) ### parameters for Detect dt_in = dp_out # dt_feed = 1 dt_out = 'dt' # plot waterfall of selected data from tlpipe.plot import plot_waterfall pipe_tasks.append(plot_waterfall.Plot) ### parameters for Plot pwf_in = dt_out pwf_flag_ns = True # mask noise source signal pwf_fig_name = 'waterfall/wf' # figure name to save pwf_out = 'pwf'
Note
To show all pipeline related parameters and their default values, you can do:
>>> from tlpipe.pipeline import pipeline >>> pipeline.Manager.prefix 'pipe_' >>> pipeline.Manager.show_params() Parameters of Manager: copy: True tasks: [] logging: info flush: False timing: False overwrite: False outdir: output/
Each imported task should be appended into the list pipe_tasks in order to be excuted by the pipeline;
Each task’s paramters should be prepended with its own prefix. See the source file of each task to get the prefix and all paramters that can be set. You can also get the prefix and paramters (and their default values) by the following method (take
Dispatch
for example):>>> from tlpipe.timestream import dispatch >>> dispatch.Dispatch.prefix 'dp_' >>> dispatch.Dispatch.show_params() Parameters of task Dispatch: out: None requires: None in: None iter_start: 0 iter_step: 1 input_files: None iter_num: None copy: False iterable: False output_files: None time_select: (0, None) stop: None libver: latest corr: all exclude: [] check_status: True dist_axis: 0 freq_select: (0, None) feed_select: (0, None) tag_output_iter: True tag_input_iter: True start: 0 mode: r pol_select: (0, None) extra_inttime: 150 days: 1.0 drop_days: 0.0 exclude_bad: True
Usally the input of one task should be ether read from the data files, for example:
1
dp_input_files = files # data files as list
or is the output of a previously excuted task (to construct a task chain), for example:
1
dt_in = dp_out
1
pwf_in = dt_out
Iterative pipeline¶
To make the pipeline iteratively run for several days data, or more than one
group (treat a list of files as a separate group) of data, you should set the
parameter iterable of each task you want to iterate to True, and optionally
specify an iteration number. If no iteration number is specified, the pipeline
will iteratively run until all input data has been processed. Take again the
above waterfall plot as an example, suppose you want to iteratively plot the
waterfall of 2 days data, or two separate groups of data, the input pipe file
plot_wf_iter.pipe download
is like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 # -*- mode: python; -*- # input file for the analysis pipeline # execute this pipeline by either command of the following two: # tlpipe dir/to/plot_wf_iter.pipe # mpiexec -n N tlpipe dir/to/plot_wf_iter.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info' import glob data_dir1 = 'dir1/to/data' # your data directory data_dir2 = 'dir2/to/data' # your data directory ### one way files = sorted(glob.glob(data_dir1+'/*.hdf5')) # more than 1 day's data files as a list ### or another way group1 = sorted(glob.glob(data_dir1+'/*.hdf5')) group2 = sorted(glob.glob(data_dir2+'/*.hdf5')) files = [ group1, group2 ] # or two groups of data, each as a list of data files # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_iterable = True dp_iter_num = 2 # set the number of iterations dp_tag_input_iter = False dp_out = 'dp' # find and mask noise source signal from tlpipe.timestream import detect_ns pipe_tasks.append(detect_ns.Detect) ### parameters for Detect dt_in = dp_out # dt_feed = 1 dt_iterable = True dt_out = 'dt' # plot waterfall of selected data from tlpipe.plot import plot_waterfall pipe_tasks.append(plot_waterfall.Plot) ### parameters for Plot pwf_in = dt_out pwf_iterable = True pwf_flag_ns = True # mask noise source signal pwf_fig_name = 'waterfall/wf' # figure name to save pwf_out = 'pwf'
Note
The number of iterations can be set only once in the first task, as after the first task has been executed the specified number of iterations, it will no longer produce its output for the subsequent tasks, those task will stop to iterate when there is no input for it.
Non-trivial control flow¶
You can run several tasks iteratively, and then run some other tasks non-iteratively when the iterative tasks all have done.
For example, if you want the waterfall plot of two days averaged data,
you can iteratively run several tasks, each iteration for one day data, and
then combine (accumulate and average) the two days data and plot its
waterfall, just as follows shown in plot_wf_nontrivial.pipe
download
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 # -*- mode: python; -*- # input file for the analysis pipeline # execute this pipeline by either command of the following two: # tlpipe dir/to/plot_wf_nontrivial.pipe # mpiexec -n N tlpipe dir/to/plot_wf_nontrivial.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info' import glob data_dir = 'dir/to/data' # your data directory files = sorted(glob.glob(data_dir+'/*.hdf5')) # at least 2 days data files as a list # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_iterable = True dp_iter_num = 2 # set the number of iterations dp_tag_input_iter = False dp_out = 'dp' # find and mask noise source signal from tlpipe.timestream import detect_ns pipe_tasks.append(detect_ns.Detect) ### parameters for Detect dt_in = dp_out # dt_feed = 1 dt_iterable = True dt_out = 'dt' # plot waterfall of selected data from tlpipe.plot import plot_waterfall pipe_tasks.append(plot_waterfall.Plot) ### parameters for Plot pwf_in = dt_out pwf_iterable = True pwf_flag_ns = True # mask noise source signal pwf_fig_name = 'waterfall/wf' # figure name to save pwf_out = 'pwf' # convert raw timestream to timestream from tlpipe.timestream import rt2ts pipe_tasks.append(rt2ts.Rt2ts) ### parameters for Rt2ts r2t_in = dt_out # can also be pwf_out as it is the same r2t_iterable = True r2t_out = 'r2t' # re-order the data to have RA from 0 to 2pi from tlpipe.timestream import re_order pipe_tasks.append(re_order.ReOrder) ### parameters for ReOrder ro_in = r2t_out ro_iterable = True ro_out = 'ro' # accumulate the re-ordered data from different days from tlpipe.timestream import accumulate pipe_tasks.append(accumulate.Accum) ### parameters for Accum ac_in = ro_out ac_iterable = True ac_out = 'ac' # barrier above iterative tasks before executing the following tasks. from tlpipe.timestream import barrier pipe_tasks.append(barrier.Barrier) ### parameters for Barrier # average the accumulated data from tlpipe.timestream import average pipe_tasks.append(average.Average) ### parameters for Average av_in = ac_out av_output_files = [ 'average/file_%d.hdf5' %i for i in range(1, 7) ] # here save intermediate results av_out = 'av' # waterfall plot of the averaged data from tlpipe.plot import plot_waterfall pipe_tasks.append((plot_waterfall.Plot, 'pwf1_')) # here use a new prefix pwf1_ instead of the default pwf_ to discriminate from the previous plot_waterfall ### parameters for Plot pwf1_in = av_out pwf1_input_files = av_output_files # here you can read data from the saved intermediate data files if you do not set pwf1_in pwf1_flag_ns = True pwf1_fig_name = 'vis_av/vis' pwf1_out = 'pwf1'
Note
Notice the use of the task Barrier
to
block the control flow before the executing of its subsequent tasks. As
the task Barrier
won’t get its input
from any other tasks, the pipeline will restart at the begining every time
when it gets to execute Barrier
. Once
everything before Barrier
has been
executed, it will unblocks its subsequent tasks and allow them to proceed
normally.
Note
Note in real data analysis, the data should be RFI flagged, calibrated, and maybe some other processes done before the data accumulating and averaging, here for simplicity and easy understanding, we have omitted all those processes. One can refer to the real data analysis pipeline input files in the package’s input directory.
Execute several times a same task¶
Special care need to be taken when executing several times a same task. Since the input pipe file is just a plain python script, it will be first executed before the parameters parsing process, the assignment of a variable will override the same named variable before it during the excuting of the pipe file script. So for the need of executing several times a same task, different prefixes should be set for each of these tasks (i.e., except for the first appeared which could have just use the default prefix of the task, all others need to set a different prefix). To do this, you need to append a 2-tuple to the list pipe_tasks, with its first element being the imported task, and the second element being a new prefix to use. See for example the line
1 pipe_tasks.append((plot_waterfall.Plot, 'pwf1_')) # here use a new prefix pwf1_ instead of the default pwf_ to discriminate from the previous plot_waterfall
in plot_wf_nontrivial.pipe in the above example.
Save intermediate data¶
To save data that has been processed by one task (used for maybe break point recovery, etc.), you can just set the output_files paramter of this task to be a list of file names (can only save as hdf5 data files), then data will be split into almost equal chunks along the time axis and save each chunk to one of the data file. For example, see the line
1 av_output_files = [ 'average/file_%d.hdf5' %i for i in range(1, 7) ] # here save intermediate results
in plot_wf_nontrivial.pipe in the above example.
Recovery from intermediate data¶
You can recovery the pipeline from a break point (where you have saved the intermediate data) by reading data from data files you have saved. To do this, instead of set the in parameter, you need to set the input_files paramter to a list with elements being the saved data files. For example, see the line
1 pwf1_input_files = av_output_files # here you can read data from the saved intermediate data files if you do not set pwf1_in
in plot_wf_nontrivial.pipe in the above example.
Note
If the in paramter and the input_files parameter are both set, the task will get its input from the in paramter instead of reading data from the input_files as it is much slower to read the data from the files. So in order to recovery from the break point, you should not set the in parameter, or should set in to be None, which is the default value.
Run the pipeline¶
Single process run¶
If you do not have an MPI environment installed, or you just want a single process run, just do (in case plot_wf.pipe is in you working directory)
$ tlpipe plot_wf.pipe
or (in case plot_wf.pipe isn’t in you working directory)
$ tlpipe dir/to/plot_wf.pipe
If you want to submit and run the pipeline in the background, do like
$ nohup tlpipe dir/to/plot_wf.pipe &> output.txt &
Multiple processes run¶
To run the pipeline in parallel and distributed maner on a cluster using multiple processes, you can do something like (in case plot_wf.pipe is in you working directory)
$ mpiexec -n N tlpipe plot_wf.pipe
or (in case plot_wf.pipe isn’t in you working directory)
$ mpiexec -n N tlpipe dir/to/plot_wf.pipe
If you want to submit and run the pipeline in the background on several nodes, for example, node2, node3, node4, do like
$ nohup mpiexec -n N -host node2,node3,node4 --map-by node tlpipe dir/to/plot_wf.pipe &> output.txt &
Note
In the above commands, N is the number of processes you want to run!
Pipeline products and intermediate results¶
Pipeline products and intermediate results will be in the directory setting by pipe_outdir.
Other excutable commands¶
h5info: Check what’s in a (or a list of) HDF5 data file(s). For its use, do some thing like
$ h5info data.hdf5
or
$ h5info data1.hdf5, data2.hdf5, data3.hdf5
Developer’s guid¶
Developer’s guide¶
Write a general task¶
A pipeline task is a subclass of TaskBase
intended to perform some small, modular piece analysis.
To write a general task, you can use the following template
general_task.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 """A general task template.""" from tlpipe.pipeline.pipeline import TaskBase, PipelineStopIteration class GeneralTask(TaskBase): """A general task template.""" # input parameters and their default values as a dictionary params_init = { 'task_param': 'param_val', } # prefix of this task prefix = 'gt_' def __init__(self, parameter_file_or_dict=None, feedback=2): # Read in the parameters. super(self.__class__, self).__init__(parameter_file_or_dict, feedback) # Do some initialization here if necessary print 'Initialize the task.' def setup(self): # Set up works here if necessary print "Setting up the task." def next(self): # Doing the actual work here print 'Executing the task with paramter task_param = %s' % self.params['task_param'] # stop the task raise PipelineStopIteration() def finish(self): # Finishing works here if necessary print "Finished the task."
The developer of the task must specify what input parameters the task expects if it has and a prefix, as well as code to perform the actual processing for the task.
Input parameters are specified by adding class attributes params_init which is a dictionary whose entries are key and default value pairs. A prefix is used to identify and read the corresponding parameters from the input pipe file for this task.
To perform the actual processing for the task, you could first do the
necessary initialization in __init__()
, then implement three
methods setup()
, next()
and finish()
. Usually the only
necessary method to be implemented is next()
, in which the actual
processing works are done, the other methods __init__()
, setup()
,
finish()
do not need if there is no specifical initialization,
setting up, and finishing work to do. These methods are executed in order,
with next()
possibly being executed many times. Iteration of
next()
is halted by raising a PipelineStopIteration
.
To make it work, you could put it somewhere like in tlpipe/tlpipe/timestream/,
and write a input pipe file like
general_task.pipe
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # -*- mode: python; -*- # input file for pipeline manager # execute this pipeline by either command of the following two: # tlpipe dir/to/general_task.pipe # mpiexec -n N tlpipe dir/to/general_task.pipe pipe_tasks = [] pipe_outdir = './output/' from tlpipe.timestream import general_task pipe_tasks.append(general_task.GeneralTask) ### parameters for GeneralTask gt_task_param = 'new_val'
then execute the task by run
$ tlpipe general_task.pipe
Write a task to process timestream data¶
To write a task to process the timestream data (i.e., the visibility and
auxiliary data), you can use the following template
ts_template.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 """Timestream task template.""" import timestream_task class TsTemplate(timestream_task.TimestreamTask): """Timestream task template.""" params_init = { 'task_param': 'param_val', } prefix = 'tt_' def process(self, ts): print 'Executing the task with paramter task_param = %s' % self.params['task_param'] print print 'Timestream data is contained in %s' % ts return super(TsTemplate, self).process(ts)
Here, instead of inherit from TaskBase
,
we inherit from its subclass
TimestreamTask
, and implement
the method process()
(and maybe also __init__()
, setup()
,
and finish()
if necessary). The timestream data is contained in the
argument ts, which may be an instance of
RawTimestream
or
Timestream
.
Note
You do not need to override the method next()
now, because in the
class OneAndOne
, which is the super
class of TimestreamTask
, we
have
class OneAndOne(TaskBase):
def next(self, input=None):
# ...
output = self.read_process_write(input)
# ...
return output
def read_process_write(self, input):
# ...
output = self.process(input)
# ...
return output
Use data operate functions in timestream tasks¶
To write a task to process the timestream data, you (in most cases) only
need to implement process()
with the input timestream data contained
in its argument ts, as stated above. To help with the data processing, you
could use some of the data operate functions defined in the corresponding
timestream data container class, which can automatically split the data along
one axis or some axes among multiple process and iteratively process all these
data slices. For example, to write to task to process the raw timestream data
along the axis of baseline, i.e., to process a time-frequency slice of the
raw data each time, you can have the task like
ts_task.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 """A task to process the raw timestream data along the axis of baseline.""" import timestream_task class TsTask(timestream_task.TimestreamTask): """A task to process the raw timestream data along the axis of baseline.""" params_init = { } prefix = 'tt_' def process(self, ts): # distribute data along the axis of baseline ts.redistribute('baseline') # use data operate function of `ts` ts.bl_data_operate(self.func) return super(TsTask, self).process(ts) def func(self, vis, vis_mask, li, gi, bl, ts, **kwargs): """Function that does the actual task.""" # `vis` is the time-frequency slice of the visibility print vis.shape # `vis_mask` is the time-frequency slice of the visibility mask print vis_mask.shape # `li`, `gi` is the local and global index of this slice # `bl` is the corresponding baseline print li, gi, bl
To execute the task, put it somewhere like in tlpipe/tlpipe/timestream/,
and write a input pipe file like
ts_task.pipe
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # -*- mode: python; -*- # input file for pipeline manager # execute this pipeline by either command of the following two: # tlpipe dir/to/ts_task.pipe # mpiexec -n N tlpipe dir/to/ts_task.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info' pipe_timing = True pipe_flush = True import glob data_dir = 'dir/to/data' # your data directory files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_out = 'dp' from tlpipe.timestream import ts_task pipe_tasks.append(ts_task.TsTask) ### parameters for TsTask tt_in = dp_out tt_out = 'tt'
then execute the task by run
$ tlpipe ts_task.pipe
These are some data operate functions that you can use:
Data operate functions of
RawTimestream
and
Timestream
:
- class
tlpipe.container.timestream_common.
TimestreamCommon
data_operate
(func, op_axis=None, axis_vals=0, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
all_data_operate
(func, copy_data=False, **kwargs)
time_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_freq_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
Additional data operate functions of Timestream
:
- class
tlpipe.container.timestream.
Timestream
pol_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_pol_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_and_pol_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
pol_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_freq_and_pol_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_freq_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_pol_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_pol_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
Reference¶
kiyopy
– Kiyoshi Masui’s paramter parsing package¶
Pameter parser¶
parse_ini |
Custom exceptions¶
custom_exceptions |
Custom Exceptions. |
container
– Timestream data containers¶
Data containers¶
container |
|
timestream_common |
|
raw_timestream |
|
timestream |
timestream
– Timestream operating tasks¶
Timestream base task¶
timestream_task |
Operating tasks¶
dispatch |
|
detect_ns |
|
rfi_flagging |
|
line_rfi |
|
time_flag |
|
freq_flag |
|
multiscale_flag |
|
combine_mask |
|
sir_operate |
|
rfi_stats |
|
bad_detect |
|
delay_transform |
|
ns_cal |
|
rt2ts |
|
ps_fit |
|
ps_cal |
|
apply_gain |
|
temperature_convert |
|
phs2src |
|
phs2zen |
|
phase_closure |
|
ps_subtract |
|
daytime_mask |
|
sun_mask |
|
re_order |
|
accumulate |
|
barrier |
|
average |
|
freq_rebin |
|
map_making |
|
gen_beam |
rfi
– RFI flagging methods¶
Surface fitting methods¶
surface_fit |
|
local_fit |
Local fit method. |
local_average_fit |
Local average fit method. |
local_median_fit |
Local median fit method. |
local_minimum_fit |
Local minimum fit method. |
gaussian_filter |
Gaussian filter method. |
interpolate |
Spline interpolation method. |
Combinatorial thresholding methods¶
combinatorial_threshold |
|
var_threshold |
The VarThreshold method. |
sum_threshold |
The SumThreshold method. |
Mathematical morphological methods¶
dilate_operator |
This implements the mathematical morphological dilate operation. |
sir_operator |
This implements the scale-invariant rank (SIR) operator. |
cal
– Calibration¶
Coming soon…
foreground
– Foreground subtraction¶
Coming soon…
ps
– Power spectrum estimation¶
Coming soon…
utils
– Utility functions¶
Utilities¶
date_util |
Date and time utils. |
np_util |
|
path_util |
Path utils. |
pickle_util |
Some functions related to pickle. |
robust_stats |
Robust statistical utilities. |
sg_filter |
|
rpca_decomp |
|
multiscale |
|
hist_eq |