API#
Reading from Tables#
- daskms.xds_from_ms(ms, columns=None, index_cols=None, group_cols=None, **kwargs)#
Generator yielding a series of xarray datasets representing the contents a Measurement Set. It defers to
xds_from_table()
, which should be consulted for more information.- Parameters:
- msstr
Measurement Set filename
- columnstuple or list, optional
Columns present on the resulting dataset. Defaults to all if
None
.- index_colstuple or list, optional
Sequence of indexing columns. Defaults to
[]
- group_colstuple or list, optional
Sequence of grouping columns. Defaults to
['FIELD_ID', 'DATA_DESC_ID']
- **kwargsoptional
- Returns:
- datasetslist of
xarray.Dataset
xarray datasets for each group
- datasetslist of
- daskms.xds_from_table(table_name, columns=None, index_cols=None, group_cols=None, **kwargs)#
Create multiple
xarray.Dataset
objects from CASA tabletable_name
with the rows lexicographically sorted according to the columns inindex_cols
. Ifgroup_cols
is supplied, the table data is grouped into multiplexarray.Dataset
objects, each associated with a permutation of the unique values for the columns ingroup_cols
.- Parameters:
- table_namestr
CASA table
- columnslist or tuple, optional
Columns present on the returned dataset. Defaults to all if
None
- index_colslist or tuple, optional
List of CASA table indexing columns. Defaults to
()
.- group_colslist or tuple, optional
List of columns on which to group the CASA table. Defaults to
()
- table_schemadict or str or list of dict or str, optional
A schema dictionary defining the dimension naming scheme for each column in the table. For example:
{ "UVW": {'dims': ('uvw',)}, "DATA": {'dims': ('chan', 'corr')}, }
will result in the UVW and DATA arrays having dimensions
('row', 'uvw')
and('row', 'chan', 'corr')
respectively.A string can be supplied, which will be matched against existing default schemas. Examples here include
MS
,ANTENNA
andSPECTRAL_WINDOW
corresponding toMeasurement Sets
theANTENNA
subtable and theSPECTRAL_WINDOW
subtable, respectively.By default, the end of
table_name
will be inspected to see if it matches any default schemas.It is also possible to supply a list of strings or dicts defining a sequence of schemas which are combined. Later elements in the list override previous elements. In the following example, the standard UVW MS component name scheme is overridden with “my-uvw”.
["MS", {"UVW": {'dims': ('my-uvw',)}}]
- table_keywords{False, True}, optional
If True, returns table keywords. Changes return type of the function into a tuple
- column_keywords{False, True}, optional
If True return keywords for each column on the table Changes return type of the function into a tuple
- table_proxy{False, True}, optional
If True returns the Table Proxy associated with the Dataset
- taql_wherestr, optional
TAQL where clause. For example, to exclude auto-correlations
xds_from_table("WSRT.MS", taql_where="ANTENNA1 != ANTENNA2")
- chunkslist of dicts or dict, optional
A
{dim: chunk}
dictionary, specifying the chunking strategy of each dimension in the schema. Defaults to{'row': 100000 }
which will partition the row dimension into chunks of 100000.If a dict, the chunking strategy is applied to each group.
If a list of dicts, each element is applied to the associated group. The last element is extended over the remaining groups if there are insufficient elements.
It’s also possible to specify the individual chunks for multiple dimensions:
{'row': (40000, 60000, 40000, 60000), 'chan': (16, 16, 16, 16), 'corr': (1, 2, 1)}
The above chunks a 200,000 row, 64 channel and 4 correlation space into 4 x 4 x 3 = 48 chunks, but requires prior knowledge of dimensionality, probably obtained with an initial call to
xds_from_table()
.
- Returns:
- datasetslist of
xarray.Dataset
datasets for each group, each ordered by indexing columns
- table_keywordsdict, optional
Returned if
table_keywords is True
- column_keywordsdict, optional
Returned if
column_keywords is True
- table_proxy
daskms.TableProxy
, optional Returned if
table_proxy is True
- datasetslist of
Notes
Both
group_cols
andindex_cols
should consist of columns that are part of the table index.However, this may not always be possible as CASA tables may not always contain indexing columns. The
ANTENNA
orSPECTRAL_WINDOW
Measurement Set subtables are examples in which therow id
serves as the index.Generally, calling
antds = list(xds_from_table("WSRT.MS::ANTENNA"))
is fine, since the data associated with each row of the
ANTENNA
table has the same shape and so a dask or numpy array can be constructed around the contents of the table.This may not be the case for the
SPECTRAL_WINDOW
subtable. Here, each row defines a separate spectral window, but each spectral window may contain different numbers of frequencies. In this case, it is probably better to group the subtable byrow
.There is a special group column
"__row__"
that can be used to group the table by row.for spwds in xds_from_table("WSRT.MS::SPECTRAL_WINDOW", group_cols="__row__"): ...
If
"__row__"
is used for grouping, then no other column may be used. It should also only be used for small tables, as the number of datasets produced, may be prohibitively large.
Writing to Tables#
- daskms.xds_to_table(xds, table_name, columns='ALL', descriptor=None, table_keywords=None, column_keywords=None, table_proxy=False)#
Generates a list of Datasets representing a write operations from the specified arrays in
xarray.Dataset
’s into the CASA table columns specified bytable_name
andcolumns
. This is lazy operation – it is only execute when adask.compute()
orxarray.Dataset.compute()
method is called.- Parameters:
- xds
xarray.Dataset
or list ofxarray.Dataset
dataset(s) containing the specified columns. If a list of datasets is provided, the concatenation of the columns in sequential datasets will be written.
- table_namestr
CASA table path
- columnstuple or list or “ALL”
list of column names to write to the table.
“ALL” is a special marker which specifies that all columns should be written. If you wish to write an “ALL” array to a column, use
columns=['ALL']
- descriptorNone or
AbstractBuilderFactory
or str A class describing how CASA table descriptors and data managers are constructors. Some defaults are available such as ms and ms_subtable.
If None, defaults are used.
- table_keywordsdict, optional
Dictionary of table keywords to add to existing keywords. The operation is performed immediately, not lazily.
- column_keywordsdict, optional
Dictionary of
{column: keywords}
to add to existing column keywords. The operation is performed immediately, not lazily.- table_proxy{False, True}
If True returns the table_proxy
- xds
- Returns:
- write_datasetslist of
xarray.Dataset
Datasets containing arrays representing write operations into a CASA Table
- table_proxy
daskms.TableProxy
, optional The Table Proxy associated with the datasets
- write_datasetslist of
Variables and Datasets#
- class daskms.Variable(dims, data, attrs=None)#
Replicates a minimal subset of xarray Variable’s functionality. Exists to allows
xarray
to be an optionaldask-ms
dependency.- dims#
Dimension schema.
("row", "chan", "corr")
for e.g.
- data#
Array
- attrs#
Array metadata dictionary
- property chunks#
Array chunks if wrapping a dask array else None
- property dtype#
Array data type
- property ndim#
Number of array dimensions
- property shape#
Array shape
- property values#
Returns actual array values
- class daskms.Dataset(data_vars, coords=None, attrs=None)#
Replicates a minimal subset of xarray Dataset’s functionality. Exists to allows
xarray
to be an optionaldask-ms
dependency.- __init__(data_vars, coords=None, attrs=None)#
- Parameters:
- data_vars: dict
Dictionary of variables of the form
{name: (dims, array [, attrs])}
. attrs can be optional.- coordsdict, optional
Dictionary of coordinates of the form
{name: (dims, array [, attrs])}
. attrs can be optional.- attrsdict, optional
Dictionary of Dataset attributes
- assign(**kwargs)#
Creates a new Dataset from existing variables combined with those supplied in **kwargs.
- Returns:
Dataset
Dataset containing existing variables combined with those in **kwargs.
- assign_attrs(**kwargs)#
Creates a new Dataset from existing attributes combined with those supplied in **kwargs.
- Returns:
Dataset
Dataset containing existing attributes combined with those in **kwargs.
- assign_coords(**kwargs)#
Creates a new Dataset from existing attributes combined with those supplied in **kwargs.
- Returns:
Dataset
Dataset containing existing attributes combined with those in **kwargs.
- property attrs#
Dataset attributes
- property chunks#
A
{dim: chunks}
dictionary
- compute(**kwargs)#
Calls dask compute on the dask arrays in this Dataset, returning a new Dataset.
- Returns:
Dataset
Dataset containing computed arrays.
- property coords#
Dataset coordinates
- copy()#
Returns a copy of the Dataset
- property data_vars#
Dataset variables
- property dims#
A
{dim: size}
dictionary
- drop_vars(names, *, errors)#
Drop variables from the Dataset
- Parameters:
- namesstr or iterable of str
Variable names
- errors{“raise”, “ignore”}
If “raise”, a ValueError is raised if the specified variables are missing. If “ignore”, the missing variables are ignored.
- Returns:
- datasetDataset
New dataset without the specified variables
- property sizes#
A
{dim: size}
dictionary
TableProxies#
- class daskms.TableProxy(*args, **kwargs)#
Proxies calls to a
pyrap.tables.table
object via aconcurrent.futures.ThreadPoolExecutor
.- addcols(*args, **kwargs)#
Proxies calls to
addcols()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- addrows(*args, **kwargs)#
Proxies calls to
addrows()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- colnames(*args, **kwargs)#
Proxies calls to
colnames()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getcell(*args, **kwargs)#
Proxies calls to
getcell()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getcellslice(*args, **kwargs)#
Proxies calls to
getcellslice()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getcol(*args, **kwargs)#
Proxies calls to
getcol()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getcoldesc(*args, **kwargs)#
Proxies calls to
getcoldesc()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getcolkeywords(*args, **kwargs)#
Proxies calls to
getcolkeywords()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getcolnp(*args, **kwargs)#
Proxies calls to
getcolnp()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getcolslice(*args, **kwargs)#
Proxies calls to
getcolslice()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getdminfo(*args, **kwargs)#
Proxies calls to
getdminfo()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getkeywords(*args, **kwargs)#
Proxies calls to
getkeywords()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- getvarcol(*args, **kwargs)#
Proxies calls to
getvarcol()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- iswritable(*args, **kwargs)#
Proxies calls to
iswritable()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- nrows(*args, **kwargs)#
Proxies calls to
nrows()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- putcellslice(*args, **kwargs)#
Proxies calls to
putcellslice()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- putcol(*args, **kwargs)#
Proxies calls to
putcol()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- putcolkeywords(*args, **kwargs)#
Proxies calls to
putcolkeywords()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- putcolnp(*args, **kwargs)#
Proxies calls to
putcolnp()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- putcolslice(*args, **kwargs)#
Proxies calls to
putcolslice()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- putkeyword(*args, **kwargs)#
Proxies calls to
putkeyword()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- putkeywords(*args, **kwargs)#
Proxies calls to
putkeywords()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- putvarcol(*args, **kwargs)#
Proxies calls to
putvarcol()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- setmaxcachesize(*args, **kwargs)#
Proxies calls to
setmaxcachesize()
via aThreadPoolExecutor
- Returns:
- future
concurrent.futures.Future
Future containing the result of the call
- future
- submit(fn, locktype, *args, **kwargs)#
Submits
fn(table, *args, **kwargs)
within the executor, returning a Future.- Parameters:
- fncallable
Function with signature
fn(table, *args, **kwargs)
- locktype{NOLOCK, READLOCK, WRITELOCK}
Type of lock to acquire before and release after calling fn
- *args
Arguments passed to fn
- **kwargs
Keyword arguments passed to fn
- Returns:
- future
concurrent.futures.Future
Future containing the result of
fn(table, *args, **kwargs)
- future
Data Column Expressions#
- daskms.expressions.data_column_expr(statement, datasets)#
Produces a list of dask arrays with a variable set to the result of the supplied expression:
vis = data_column_expr("DATA / (DIR1_DATA + DIR2_DATA)", datasets) flag(vis)
- Parameters:
- expressionstr
For example,
DATA / (DIR1_DATA + DIR2_DATA + DIR3_DATA)
. Can contain data column names as well as numeric literal values.- datasetslist of Datasets or Dataset
Datasets containing the DATA columns referenced in the statement
- Returns:
- arrays
dask.array.Array
or list ofdask.array.Array
list of expression results
- arrays
Patterns#
- class daskms.patterns.Multiton(*args, **kwargs)#
General Multiton metaclass
Implementation of the Multiton pattern, which always returns a unique object for a unique set of arguments provided to a class constructor. For example, in the following, only a single instance of A with argument 1 is ever created.
class A(metaclass=Multiton): def __init__(self, *args, **kw): self.args = args self.kw = kw assert A(1) is A(1) assert A(1, "bob") is not A(1)
This is useful for ensuring that only a single instance of a heavy-weight resource such as files, sockets, thread/process pools or database connections is created in a single process, for a unique set of arguments.
Notes
Instantiation of object instances is thread-safe.
- class daskms.patterns.LazyProxy(fn, *args, **kwargs)#
Lazy instantiation of a proxied object.
A LazyProxy proxies an object which is lazily instantiated on first use. It is primarily useful for embedding references to heavy-weight resources in a dask graph, so they can be pickled and sent to other workers without immediately instantiating those resources.
To this end, the proxy takes as arguments:
a class or factory function that instantiates the desired resource.
*args and **kwargs that should be supplied to the instantiator.
The function and arguments for creating a file are wrapped in a LazyProxy. It is only instantiated when f.write is called.#f = LazyProxy(open, "test.txt", mode="r") f.write("Hello World!") f.close()
In addition to the class/factory function, it is possible to specifiy a Finaliser supplied to
weakref.finalize
that is called to cleanup the resource when the LazyProxy is garbage collected. In this case, the first argument should be a tuple of two elements: the factory and the finaliser.# LazyProxy defined with factory function and finaliser function def finalise_file(file): file.close() f2 = LazyProxy((open, finalise_file), "test.txt", mode="r") class WrappedFile: def __init__(self, *args, **kwargs): self.handle = open(*args, **kwargs) def close(self): self.handle.close() # LazyProxy defined with class f1 = LazyProxy((WrappedFile, WrappedFile.close), "test.txt", mode="r")
LazyProxy objects are designed to be embedded in
dask.array.blockwise()
calls. For example:# Specify the start and length of each range file_ranges = np.array([[0, 5], [5, 10], [15, 5] [20, 10]]) # Chunk each range individually da_file_ranges = dask.array(file_ranges, chunks=(1, 2)) # Reference a binary file file_proxy = LazyProxy(open, "data.dat", "rb") def _read(file_proxy, file_range): # Seek to range start and read the length of data start, length = file_range file_proxy.seek(start) return np.asarray(file_proxy.read(length), np.uint8) data = da.blockwise(_read, "x", # Embed the file_proxy in the graph file_proxy, None, # Pass each file range to the _read da_file_ranges, "xy", # output chunks should have the length # of each range adjust_chunks={"x": tuple(file_ranges[:, 1])}, concatenate=True) print(data.compute(processes=True))
- Parameters:
- fnclass or callable or tuple
A callable object that used to create the proxied object. In tuple form, this should consist of two callables. The first should create the proxied object and the second should be a finaliser that performs cleanup on the proxied object when the LazyProxy is garbage collected: it is passed directly to
weakref.finalize
.- *argstuple
Positional arguments passed to the callable object specified in fn that will create the proxied object. The contents of *args should be pickleable.
- **kwargsdict
Keyword arguments passed to the callable object specified in fn that will create the proxied object. The contents of **kwargs should be pickleable.
Notes
Instantiation of the proxied object is thread-safe.
LazyProxy’s are configured to never instantiate within
dask.array.blockwise()
anddask.blockwise.blockwise()
calls.
- class daskms.patterns.LazyProxyMultiton(*args, **kwargs)#
Combination of a
LazyProxy
with aMultiton
Ensures that only a single
LazyProxy
is ever created for the given constructor arguments.class A: def __init__(self, value): self.value = value assert LazyProxyMultiton("foo") is LazyProxyMultiton("foo")