Writing Datasets

Creating, Updating or Appending to a Table or Measurement Set is accomplished through the use of the xds_to_table() and the presence or absence of the ROWID coordinate on a Dataset (See ROWID Coordinates).

The pattern for writing a writing a dataset is as follows:

>>> from daskms import xds_to_table
>>> writes = xds_to_table(datasets, "TEST.MS", ["DATA", "BITFLAG"])
>>> dask.compute(writes)

In the above example, given a list of datasets, the DATA and BITFLAG columns are written to the TEST.MS table.

Note

“ALL” can be supplied to the column argument to specify that all arrays should be written to the table. However, it is advisable to explicitly specify which columns to write to avoid accidentally overwriting data or or performing unnecessary writes.

Updating/Appending Rows

The presence of ROWID coordinates on each of the datasets provided to xds_to_table() governs whether the function will update or append rows to a table.

If the ROWID coordinate is present on a dataset, it will be used to update existing rows in the dataset. By contrast, the absence of ROWID will cause rows to be appended to the table.

The following Dataset without ROWID creates a new table from scratch.

>>> import dask
>>> import dask.array as da
>>> from daskms import Dataset
>>> # Create Dataset Variables
>>> data_vars = {
    'DATA_DESC_ID': (("row",), da.zeros(10, chunks=2)),
    'DATA': (("row", "chan", "corr"), da.zeros((10, 16, 4), chunks=(2, 16, 4))
}
>>> # Write dataset to table
>>> writes = xds_to_table([Dataset(data_vars)], "test.table", "ALL")
>>> dask.compute(writes)

It is perfectly possible to combine the two operations by submitting multiple datasets, some of which contain ROWID coordinates while others do not.

>>> import dask
>>> from daskms import xds_from_ms, Dataset
>>> from daskms.example_data import example_ms
>>>
>>> # Create example Measurement Set and read datasets
>>> ms = example_ms()
>>> datasets = xds_from_ms(ms)
>>> # Add last Dataset to table using variables only (no ROWID coordinate)
>>> new_ds = Dataset(datasets[-1].data_vars)
>>> datasets.append(new_ds)
>>>
>>> # Write datasets back to Measurement Set
>>> writes = xds_to_table(datasets, ms, "ALL")
>>> dask.compute(writes)

In these cases it is strongly suggested that the datasets representing updates are generated from xds_from_table() as this will ensure that the correct rows are referenced on the dataset. Data from datasets representing appends will always be added to the end of the table.

Note that it may be also be desirable for appended rows to have an ordering similar to those of the updated rows, as described in Sorting. It is currently the user’s responsibility to achieve this.

Updating/Adding Columns

If a dataset array is present as a column in the dataset, the column will be updated. By contrast, a missing column will lead cause xds_to_table() to infer a CASA column descriptor, add the column to the table and then write the array to it.

>>> from daskms import xds_from_ms
>>> from daskms.example_data import example_ms
>>>
>>> ms = example_ms()
>>> datasets = xds_from_ms(ms)
>>>
>>> # Add BITFLAG data to datasets
>>> for i, ds in enumerate(datasets):
>>>     datasets[i] = ds.assign(BITFLAG=(("row", "chan", "corr",
                                          da.zeros_like(ds.DATA.data))))
>>>
>>> # Write data back to ms
>>> writes = xds_to_table(datasets, ms, ["BITFLAG"])
>>> dask.compute(writes)

Creating and updating the Measurement Set and it’s sub-tables

In the case of the Measurement Set and it’s sub-tables, care is taken to ensure that

  1. Required columns are added.

  2. Required columns conform to the Measurement Set v2.0 Specification.

This means that, for example, if you have a UVW array with a non-standard shape ([4]) and type (float), the UVW column will still be created the shape ([3]) and type (double) mandated by the MSv2.0 spec.

The above also applies to the following optional columns in the MSv2.0:

DATA

MODEL_DATA

CORRECTED_DATA

WEIGHT_SPECTRUM

SIGMA_SPECTRUM

IMAGING_WEIGHTS

Other optional MSv2.0 columns can easily be supported.

This behaviour is triggered whenever the table_name ends with lower or uppercase .MS in the case of the main Measurement Set table:

>>> xds_to_table(datasets, "test.ms", ["DATA", "BITFLAG"])

or when it ends with with ::subtablename in the case of a subtable:

>>> xds_to_table(datasets, "test.ms::SPECTRAL_WINDOW", ["CHAN_FREQ"])

Respect the standard naming conventions and you’ll be fine.

Creating Sub-tables

It is possible for sub-tables to be added to a table. For example, the SOURCE table is an optional table that may or may not be present on the Measurement Set

The following convention specifies that the SOURCE sub-table of TEST.MS should be created:

>>> writes = xds_to_table(source_dataset,
                          "~/data/TEST.MS::SOURCE",
                          columns="ALL")

xds_to_table will also created the "Table: ~/data/TEST.MS/SOURCE" keyword in TEST.MS linking it with the SOURCE sub-table.

Warning

As discussed in Opening Sub-tables, it is advisable to use the :: scope operator so that dask-ms understands the link between the main table and the sub-table. The following will create a SOURCE table but will not create a link between the table and the sub-table:

>>> writes = xds_to_table(source_dataset,
                          "~/data/TEST.MS/SOURCE",
                          columns="ALL")

Keywords

Keywords can be added to the target table and columns:

>>> xds_to_table(datasets, "test.ms", [],
                 table_keywords={"foo":"bar"},
                 column_keywords={"DATA": {"foo": "bar"}})

Writing Arrays from Array API Compatible Libraries

dask-ms supports writing arrays from libraries that implement the Python Array API standard (2021.12+), including JAX, CuPy, and PyTorch. Arrays are converted to NumPy inside each write task, so no changes to the xds_to_table() call are needed. GPU arrays are transferred to CPU automatically, one chunk at a time, avoiding a full device-to-host copy at graph-construction time.

The only practical difference between libraries is how you wrap the source array in a dask array:

  • JAX and CuPy expose NumPy-compatible dtype objects, so dask.array.from_array() works without any extra arguments.

  • PyTorch uses its own dtype objects (torch.complex64, etc.) which are not recognised by NumPy at graph-construction time, so dask.array.from_delayed() with an explicit dtype is required.

JAX

dask.array.from_array() works directly with jax.Array objects. Each chunk will be a jax.Array; dask-ms converts it to NumPy inside the write task.

import dask
import dask.array as da
import jax.numpy as jnp
from daskms import xds_from_ms, xds_to_table

ms = "path/to/data.ms"
ds = xds_from_ms(ms, columns=["DATA"], group_cols=[],
                 chunks={"row": 100})[0]
dims, chunks = ds.sizes, ds.chunks

jax_data = jnp.zeros(
    (dims["row"], dims["chan"], dims["corr"]), dtype=jnp.complex64
)
da_data = da.from_array(
    jax_data, chunks=(chunks["row"], dims["chan"], dims["corr"])
)
new_ds = ds.assign(DATA=(("row", "chan", "corr"), da_data))
dask.compute(xds_to_table(new_ds, ms, ["DATA"]))

This pattern also applies when jax.Array objects are produced per-chunk, for example from a dask.array.map_blocks() call over a JAX-jitted function:

import jax

@jax.jit
def predict(uvw_chunk):
    ...  # returns a jax.Array of shape (row, chan, corr)

da_data = da.map_blocks(
    predict, ds.UVW.data,
    dtype="complex64",
    new_axis=[1, 2],
    chunks=(chunks["row"], dims["chan"], dims["corr"]),
)

CuPy

CuPy arrays live on the GPU. dask.array.from_array() introspects the dtype correctly, and dask-ms transfers each chunk to CPU inside the write task via cupy.ndarray.get().

import dask
import dask.array as da
import cupy as cp
from daskms import xds_from_ms, xds_to_table

ms = "path/to/data.ms"
ds = xds_from_ms(ms, columns=["DATA"], group_cols=[],
                 chunks={"row": 100})[0]
dims, chunks = ds.sizes, ds.chunks

cupy_data = cp.zeros(
    (dims["row"], dims["chan"], dims["corr"]), dtype=cp.complex64
)
da_data = da.from_array(
    cupy_data, chunks=(chunks["row"], dims["chan"], dims["corr"])
)
new_ds = ds.assign(DATA=(("row", "chan", "corr"), da_data))
dask.compute(xds_to_table(new_ds, ms, ["DATA"]))

PyTorch

PyTorch tensor dtype objects (torch.complex64, etc.) are not NumPy-compatible, so dask.array.from_array() cannot introspect the dtype at graph-construction time. Use dask.array.from_delayed() with an explicit NumPy dtype string instead, building one delayed chunk per row-chunk:

import dask
import dask.array as da
import torch
from daskms import xds_from_ms, xds_to_table

ms = "path/to/data.ms"
ds = xds_from_ms(ms, columns=["DATA"], group_cols=[],
                 chunks={"row": 100})[0]
dims, chunks = ds.sizes, ds.chunks

device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_data = torch.zeros(
    dims["row"], dims["chan"], dims["corr"],
    dtype=torch.complex64, device=device,
)

parts, row = [], 0
for rc in chunks["row"]:
    chunk = torch_data[row: row + rc]
    parts.append(
        da.from_delayed(
            dask.delayed(chunk),
            shape=(rc, dims["chan"], dims["corr"]),
            dtype="complex64",  # NumPy dtype — no torch dependency here
        )
    )
    row += rc
da_data = da.concatenate(parts, axis=0)

new_ds = ds.assign(DATA=(("row", "chan", "corr"), da_data))
dask.compute(xds_to_table(new_ds, ms, ["DATA"]))

For CUDA tensors, the device-to-host transfer (tensor.to("cpu")) is performed inside each write task. For CPU tensors no copy is made at the dask layer.