API#

Reading from Tables#

daskms.xds_from_ms(ms, columns=None, index_cols=None, group_cols=None, **kwargs)#

Generator yielding a series of xarray datasets representing the contents a Measurement Set. It defers to xds_from_table(), which should be consulted for more information.

Parameters:
msstr

Measurement Set filename

columnstuple or list, optional

Columns present on the resulting dataset. Defaults to all if None.

index_colstuple or list, optional

Sequence of indexing columns. Defaults to []

group_colstuple or list, optional

Sequence of grouping columns. Defaults to ['FIELD_ID', 'DATA_DESC_ID']

**kwargsoptional
Returns:
datasetslist of xarray.Dataset

xarray datasets for each group

daskms.xds_from_table(table_name, columns=None, index_cols=None, group_cols=None, **kwargs)#

Create multiple xarray.Dataset objects from CASA table table_name with the rows lexicographically sorted according to the columns in index_cols. If group_cols is supplied, the table data is grouped into multiple xarray.Dataset objects, each associated with a permutation of the unique values for the columns in group_cols.

Parameters:
table_namestr

CASA table

columnslist or tuple, optional

Columns present on the returned dataset. Defaults to all if None

index_colslist or tuple, optional

List of CASA table indexing columns. Defaults to ().

group_colslist or tuple, optional

List of columns on which to group the CASA table. Defaults to ()

table_schemadict or str or list of dict or str, optional

A schema dictionary defining the dimension naming scheme for each column in the table. For example:

{
    "UVW": {'dims': ('uvw',)},
    "DATA": {'dims': ('chan', 'corr')},
}

will result in the UVW and DATA arrays having dimensions ('row', 'uvw') and ('row', 'chan', 'corr') respectively.

A string can be supplied, which will be matched against existing default schemas. Examples here include MS, ANTENNA and SPECTRAL_WINDOW corresponding to Measurement Sets the ANTENNA subtable and the SPECTRAL_WINDOW subtable, respectively.

By default, the end of table_name will be inspected to see if it matches any default schemas.

It is also possible to supply a list of strings or dicts defining a sequence of schemas which are combined. Later elements in the list override previous elements. In the following example, the standard UVW MS component name scheme is overridden with “my-uvw”.

["MS", {"UVW": {'dims': ('my-uvw',)}}]
table_keywords{False, True}, optional

If True, returns table keywords. Changes return type of the function into a tuple

column_keywords{False, True}, optional

If True return keywords for each column on the table Changes return type of the function into a tuple

table_proxy{False, True}, optional

If True returns the Table Proxy associated with the Dataset

taql_wherestr, optional

TAQL where clause. For example, to exclude auto-correlations

xds_from_table("WSRT.MS", taql_where="ANTENNA1 != ANTENNA2")
chunkslist of dicts or dict, optional

A {dim: chunk} dictionary, specifying the chunking strategy of each dimension in the schema. Defaults to {'row': 100000 } which will partition the row dimension into chunks of 100000.

  • If a dict, the chunking strategy is applied to each group.

  • If a list of dicts, each element is applied to the associated group. The last element is extended over the remaining groups if there are insufficient elements.

It’s also possible to specify the individual chunks for multiple dimensions:

{'row': (40000, 60000, 40000, 60000),
 'chan': (16, 16, 16, 16),
 'corr': (1, 2, 1)}

The above chunks a 200,000 row, 64 channel and 4 correlation space into 4 x 4 x 3 = 48 chunks, but requires prior knowledge of dimensionality, probably obtained with an initial call to xds_from_table().

Returns:
datasetslist of xarray.Dataset

datasets for each group, each ordered by indexing columns

table_keywordsdict, optional

Returned if table_keywords is True

column_keywordsdict, optional

Returned if column_keywords is True

table_proxydaskms.TableProxy, optional

Returned if table_proxy is True

Notes

Both group_cols and index_cols should consist of columns that are part of the table index.

However, this may not always be possible as CASA tables may not always contain indexing columns. The ANTENNA or SPECTRAL_WINDOW Measurement Set subtables are examples in which the row id serves as the index.

Generally, calling

antds = list(xds_from_table("WSRT.MS::ANTENNA"))

is fine, since the data associated with each row of the ANTENNA table has the same shape and so a dask or numpy array can be constructed around the contents of the table.

This may not be the case for the SPECTRAL_WINDOW subtable. Here, each row defines a separate spectral window, but each spectral window may contain different numbers of frequencies. In this case, it is probably better to group the subtable by row.

There is a special group column "__row__" that can be used to group the table by row.

for spwds in xds_from_table("WSRT.MS::SPECTRAL_WINDOW",
                                    group_cols="__row__"):
    ...

If "__row__" is used for grouping, then no other column may be used. It should also only be used for small tables, as the number of datasets produced, may be prohibitively large.

Writing to Tables#

daskms.xds_to_table(xds, table_name, columns='ALL', descriptor=None, table_keywords=None, column_keywords=None, table_proxy=False)#

Generates a list of Datasets representing a write operations from the specified arrays in xarray.Dataset’s into the CASA table columns specified by table_name and columns. This is lazy operation – it is only execute when a dask.compute() or xarray.Dataset.compute() method is called.

Parameters:
xdsxarray.Dataset or list of xarray.Dataset

dataset(s) containing the specified columns. If a list of datasets is provided, the concatenation of the columns in sequential datasets will be written.

table_namestr

CASA table path

columnstuple or list or “ALL”

list of column names to write to the table.

“ALL” is a special marker which specifies that all columns should be written. If you wish to write an “ALL” array to a column, use columns=['ALL']

descriptorNone or AbstractBuilderFactory or str

A class describing how CASA table descriptors and data managers are constructors. Some defaults are available such as ms and ms_subtable.

If None, defaults are used.

table_keywordsdict, optional

Dictionary of table keywords to add to existing keywords. The operation is performed immediately, not lazily.

column_keywordsdict, optional

Dictionary of {column: keywords} to add to existing column keywords. The operation is performed immediately, not lazily.

table_proxy{False, True}

If True returns the table_proxy

Returns:
write_datasetslist of xarray.Dataset

Datasets containing arrays representing write operations into a CASA Table

table_proxydaskms.TableProxy, optional

The Table Proxy associated with the datasets

Variables and Datasets#

class daskms.Variable(dims, data, attrs=None)#

Replicates a minimal subset of xarray Variable’s functionality. Exists to allows xarray to be an optional dask-ms dependency.

dims#

Dimension schema. ("row", "chan", "corr") for e.g.

data#

Array

attrs#

Array metadata dictionary

property chunks#

Array chunks if wrapping a dask array else None

property dtype#

Array data type

property ndim#

Number of array dimensions

property shape#

Array shape

property values#

Returns actual array values

class daskms.Dataset(data_vars, coords=None, attrs=None)#

Replicates a minimal subset of xarray Dataset’s functionality. Exists to allows xarray to be an optional dask-ms dependency.

__init__(data_vars, coords=None, attrs=None)#
Parameters:
data_vars: dict

Dictionary of variables of the form {name: (dims, array [, attrs])}. attrs can be optional.

coordsdict, optional

Dictionary of coordinates of the form {name: (dims, array [, attrs])}. attrs can be optional.

attrsdict, optional

Dictionary of Dataset attributes

assign(**kwargs)#

Creates a new Dataset from existing variables combined with those supplied in **kwargs.

Returns:
Dataset

Dataset containing existing variables combined with those in **kwargs.

assign_attrs(**kwargs)#

Creates a new Dataset from existing attributes combined with those supplied in **kwargs.

Returns:
Dataset

Dataset containing existing attributes combined with those in **kwargs.

assign_coords(**kwargs)#

Creates a new Dataset from existing attributes combined with those supplied in **kwargs.

Returns:
Dataset

Dataset containing existing attributes combined with those in **kwargs.

property attrs#

Dataset attributes

property chunks#

A {dim: chunks} dictionary

compute(**kwargs)#

Calls dask compute on the dask arrays in this Dataset, returning a new Dataset.

Returns:
Dataset

Dataset containing computed arrays.

property coords#

Dataset coordinates

copy()#

Returns a copy of the Dataset

property data_vars#

Dataset variables

property dims#

A tuple of dimensions

drop_vars(names, *, errors)#

Drop variables from the Dataset

Parameters:
namesstr or iterable of str

Variable names

errors{“raise”, “ignore”}

If “raise”, a ValueError is raised if the specified variables are missing. If “ignore”, the missing variables are ignored.

Returns:
datasetDataset

New dataset without the specified variables

property sizes#

A {dim: size} dictionary

TableProxies#

class daskms.TableProxy(*args, **kwargs)#

Proxies calls to a casacore.tables.table object via a concurrent.futures.ThreadPoolExecutor.

addcols(*args, **kwargs)#

Proxies calls to addcols() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

addrows(*args, **kwargs)#

Proxies calls to addrows() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

colnames(*args, **kwargs)#

Proxies calls to colnames() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getcell(*args, **kwargs)#

Proxies calls to getcell() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getcellslice(*args, **kwargs)#

Proxies calls to getcellslice() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getcol(*args, **kwargs)#

Proxies calls to getcol() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getcoldesc(*args, **kwargs)#

Proxies calls to getcoldesc() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getcolkeywords(*args, **kwargs)#

Proxies calls to getcolkeywords() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getcolnp(*args, **kwargs)#

Proxies calls to getcolnp() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getcolslice(*args, **kwargs)#

Proxies calls to getcolslice() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getdminfo(*args, **kwargs)#

Proxies calls to getdminfo() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getkeywords(*args, **kwargs)#

Proxies calls to getkeywords() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

getvarcol(*args, **kwargs)#

Proxies calls to getvarcol() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

iswritable(*args, **kwargs)#

Proxies calls to iswritable() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

nrows(*args, **kwargs)#

Proxies calls to nrows() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

putcellslice(*args, **kwargs)#

Proxies calls to putcellslice() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

putcol(*args, **kwargs)#

Proxies calls to putcol() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

putcolkeywords(*args, **kwargs)#

Proxies calls to putcolkeywords() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

putcolnp(*args, **kwargs)#

Proxies calls to putcolnp() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

putcolslice(*args, **kwargs)#

Proxies calls to putcolslice() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

putkeyword(*args, **kwargs)#

Proxies calls to putkeyword() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

putkeywords(*args, **kwargs)#

Proxies calls to putkeywords() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

putvarcol(*args, **kwargs)#

Proxies calls to putvarcol() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

setmaxcachesize(*args, **kwargs)#

Proxies calls to setmaxcachesize() via a ThreadPoolExecutor

Returns:
futureconcurrent.futures.Future

Future containing the result of the call

submit(fn, locktype, *args, **kwargs)#

Submits fn(table, *args, **kwargs) within the executor, returning a Future.

Parameters:
fncallable

Function with signature fn(table, *args, **kwargs)

locktype{NOLOCK, READLOCK, WRITELOCK}

Type of lock to acquire before and release after calling fn

*args

Arguments passed to fn

**kwargs

Keyword arguments passed to fn

Returns:
futureconcurrent.futures.Future

Future containing the result of fn(table, *args, **kwargs)

Data Column Expressions#

daskms.expressions.data_column_expr(statement, datasets)#

Produces a list of dask arrays with a variable set to the result of the supplied expression:

vis = data_column_expr("DATA / (DIR1_DATA + DIR2_DATA)",
                       datasets)
flag(vis)
Parameters:
expressionstr

For example, DATA / (DIR1_DATA + DIR2_DATA + DIR3_DATA). Can contain data column names as well as numeric literal values.

datasetslist of Datasets or Dataset

Datasets containing the DATA columns referenced in the statement

Returns:
arraysdask.array.Array or list of dask.array.Array

list of expression results

Patterns#

class daskms.patterns.Multiton(*args, **kwargs)#

General Multiton metaclass

Implementation of the Multiton pattern, which always returns a unique object for a unique set of arguments provided to a class constructor. For example, in the following, only a single instance of A with argument 1 is ever created.

class A(metaclass=Multiton):
    def __init__(self, *args, **kw):
        self.args = args
        self.kw = kw

assert A(1) is A(1)
assert A(1, "bob") is not A(1)

This is useful for ensuring that only a single instance of a heavy-weight resource such as files, sockets, thread/process pools or database connections is created in a single process, for a unique set of arguments.

Notes

Instantiation of object instances is thread-safe.

class daskms.patterns.LazyProxy(fn, *args, **kwargs)#

Lazy instantiation of a proxied object.

A LazyProxy proxies an object which is lazily instantiated on first use. It is primarily useful for embedding references to heavy-weight resources in a dask graph, so they can be pickled and sent to other workers without immediately instantiating those resources.

To this end, the proxy takes as arguments:

  1. a class or factory function that instantiates the desired resource.

  2. *args and **kwargs that should be supplied to the instantiator.

The function and arguments for creating a file are wrapped in a LazyProxy. It is only instantiated when f.write is called.#
f = LazyProxy(open, "test.txt", mode="r")
f.write("Hello World!")
f.close()

In addition to the class/factory function, it is possible to specifiy a Finaliser supplied to weakref.finalize that is called to cleanup the resource when the LazyProxy is garbage collected. In this case, the first argument should be a tuple of two elements: the factory and the finaliser.

# LazyProxy defined with factory function and finaliser function
def finalise_file(file):
    file.close()

f2 = LazyProxy((open, finalise_file), "test.txt", mode="r")

class WrappedFile:
    def __init__(self, *args, **kwargs):
        self.handle = open(*args, **kwargs)

    def close(self):
        self.handle.close()

# LazyProxy defined with class
f1 = LazyProxy((WrappedFile, WrappedFile.close), "test.txt", mode="r")

LazyProxy objects are designed to be embedded in dask.array.blockwise() calls. For example:

# Specify the start and length of each range
file_ranges = np.array([[0, 5], [5, 10], [15, 5] [20, 10]])
# Chunk each range individually
da_file_ranges = dask.array(file_ranges, chunks=(1, 2))
# Reference a binary file
file_proxy = LazyProxy(open, "data.dat", "rb")

def _read(file_proxy, file_range):
    # Seek to range start and read the length of data
    start, length = file_range
    file_proxy.seek(start)
    return np.asarray(file_proxy.read(length), np.uint8)

data = da.blockwise(_read, "x",
                    # Embed the file_proxy in the graph
                    file_proxy, None,
                    # Pass each file range to the _read
                    da_file_ranges, "xy",
                    # output chunks should have the length
                    # of each range
                    adjust_chunks={"x": tuple(file_ranges[:, 1])},
                    concatenate=True)

print(data.compute(processes=True))
Parameters:
fnclass or callable or tuple

A callable object that used to create the proxied object. In tuple form, this should consist of two callables. The first should create the proxied object and the second should be a finaliser that performs cleanup on the proxied object when the LazyProxy is garbage collected: it is passed directly to weakref.finalize.

*argstuple

Positional arguments passed to the callable object specified in fn that will create the proxied object. The contents of *args should be pickleable.

**kwargsdict

Keyword arguments passed to the callable object specified in fn that will create the proxied object. The contents of **kwargs should be pickleable.

Notes

  • Instantiation of the proxied object is thread-safe.

  • LazyProxy’s are configured to never instantiate within dask.array.blockwise() and dask.blockwise.blockwise() calls.

class daskms.patterns.LazyProxyMultiton(*args, **kwargs)#

Combination of a LazyProxy with a Multiton

Ensures that only a single LazyProxy is ever created for the given constructor arguments.

class A:
    def __init__(self, value):
        self.value = value

assert LazyProxyMultiton("foo") is LazyProxyMultiton("foo")

See LazyProxy and Multiton for further details