Writing Datasets¶
Creating, Updating or Appending to a Table or Measurement Set is accomplished
through the use of the xds_to_table() and the presence
or absence of the ROWID coordinate on a
Dataset (See ROWID Coordinates).
The pattern for writing a writing a dataset is as follows:
>>> from daskms import xds_to_table
>>> writes = xds_to_table(datasets, "TEST.MS", ["DATA", "BITFLAG"])
>>> dask.compute(writes)
In the above example, given a list of datasets, the
DATA and BITFLAG columns are written to the TEST.MS table.
Note
“ALL” can be supplied to the column argument to specify that all arrays should be written to the table. However, it is advisable to explicitly specify which columns to write to avoid accidentally overwriting data or or performing unnecessary writes.
Updating/Appending Rows¶
The presence of ROWID coordinates on each of the datasets provided
to xds_to_table() governs whether the function will
update or append rows to a table.
If the ROWID coordinate is present on a dataset, it will be used
to update existing rows in the dataset. By contrast, the absence of
ROWID will cause rows to be appended to the table.
The following Dataset without ROWID creates a new table from scratch.
>>> import dask
>>> import dask.array as da
>>> from daskms import Dataset
>>> # Create Dataset Variables
>>> data_vars = {
'DATA_DESC_ID': (("row",), da.zeros(10, chunks=2)),
'DATA': (("row", "chan", "corr"), da.zeros((10, 16, 4), chunks=(2, 16, 4))
}
>>> # Write dataset to table
>>> writes = xds_to_table([Dataset(data_vars)], "test.table", "ALL")
>>> dask.compute(writes)
It is perfectly possible to combine the two operations by submitting
multiple datasets, some of which contain ROWID coordinates
while others do not.
>>> import dask
>>> from daskms import xds_from_ms, Dataset
>>> from daskms.example_data import example_ms
>>>
>>> # Create example Measurement Set and read datasets
>>> ms = example_ms()
>>> datasets = xds_from_ms(ms)
>>> # Add last Dataset to table using variables only (no ROWID coordinate)
>>> new_ds = Dataset(datasets[-1].data_vars)
>>> datasets.append(new_ds)
>>>
>>> # Write datasets back to Measurement Set
>>> writes = xds_to_table(datasets, ms, "ALL")
>>> dask.compute(writes)
In these cases it is strongly suggested that
the datasets representing updates are generated from
xds_from_table() as this will ensure that the correct
rows are referenced on the dataset. Data from datasets representing
appends will always be added to the end of the table.
Note that it may be also be desirable for appended rows to have an ordering similar to those of the updated rows, as described in Sorting. It is currently the user’s responsibility to achieve this.
Updating/Adding Columns¶
If a dataset array is present as a column in the dataset, the column will be updated.
By contrast, a missing column will lead cause xds_to_table()
to infer a CASA column descriptor, add the column to the table and then write
the array to it.
>>> from daskms import xds_from_ms
>>> from daskms.example_data import example_ms
>>>
>>> ms = example_ms()
>>> datasets = xds_from_ms(ms)
>>>
>>> # Add BITFLAG data to datasets
>>> for i, ds in enumerate(datasets):
>>> datasets[i] = ds.assign(BITFLAG=(("row", "chan", "corr",
da.zeros_like(ds.DATA.data))))
>>>
>>> # Write data back to ms
>>> writes = xds_to_table(datasets, ms, ["BITFLAG"])
>>> dask.compute(writes)
Creating and updating the Measurement Set and it’s sub-tables¶
In the case of the Measurement Set and it’s sub-tables, care is taken to ensure that
Required columns are added.
Required columns conform to the Measurement Set v2.0 Specification.
This means that, for example, if you have a UVW array with a non-standard shape ([4]) and type (float), the UVW column will still be created the shape ([3]) and type (double) mandated by the MSv2.0 spec.
The above also applies to the following optional columns in the MSv2.0:
DATA |
MODEL_DATA |
CORRECTED_DATA |
WEIGHT_SPECTRUM |
SIGMA_SPECTRUM |
IMAGING_WEIGHTS |
Other optional MSv2.0 columns can easily be supported.
This behaviour is triggered whenever the table_name ends
with lower or uppercase .MS in the case of the main
Measurement Set table:
>>> xds_to_table(datasets, "test.ms", ["DATA", "BITFLAG"])
or when it ends with with ::subtablename in the case of a subtable:
>>> xds_to_table(datasets, "test.ms::SPECTRAL_WINDOW", ["CHAN_FREQ"])
Respect the standard naming conventions and you’ll be fine.
Creating Sub-tables¶
It is possible for sub-tables to be added to a table. For example, the SOURCE table is an optional table that may or may not be present on the Measurement Set
The following convention specifies that the SOURCE sub-table
of TEST.MS should be created:
>>> writes = xds_to_table(source_dataset,
"~/data/TEST.MS::SOURCE",
columns="ALL")
xds_to_table will also created the "Table: ~/data/TEST.MS/SOURCE"
keyword in TEST.MS linking it with the SOURCE sub-table.
Warning
As discussed in Opening Sub-tables, it is advisable to use the :: scope operator so that dask-ms understands the link between the main table and the sub-table. The following will create a SOURCE table but will not create a link between the table and the sub-table:
>>> writes = xds_to_table(source_dataset,
"~/data/TEST.MS/SOURCE",
columns="ALL")
Keywords¶
Keywords can be added to the target table and columns:
>>> xds_to_table(datasets, "test.ms", [],
table_keywords={"foo":"bar"},
column_keywords={"DATA": {"foo": "bar"}})
Writing Arrays from Array API Compatible Libraries¶
dask-ms supports writing arrays from libraries that implement the
Python Array API standard (2021.12+),
including JAX, CuPy, and PyTorch. Arrays are converted to NumPy inside
each write task, so no changes to the xds_to_table() call
are needed. GPU arrays are transferred to CPU automatically, one chunk
at a time, avoiding a full device-to-host copy at graph-construction time.
The only practical difference between libraries is how you wrap the source array in a dask array:
JAX and CuPy expose NumPy-compatible dtype objects, so
dask.array.from_array()works without any extra arguments.PyTorch uses its own dtype objects (
torch.complex64, etc.) which are not recognised by NumPy at graph-construction time, sodask.array.from_delayed()with an explicitdtypeis required.
JAX¶
dask.array.from_array() works directly with jax.Array objects.
Each chunk will be a jax.Array; dask-ms converts it to NumPy inside
the write task.
import dask
import dask.array as da
import jax.numpy as jnp
from daskms import xds_from_ms, xds_to_table
ms = "path/to/data.ms"
ds = xds_from_ms(ms, columns=["DATA"], group_cols=[],
chunks={"row": 100})[0]
dims, chunks = ds.sizes, ds.chunks
jax_data = jnp.zeros(
(dims["row"], dims["chan"], dims["corr"]), dtype=jnp.complex64
)
da_data = da.from_array(
jax_data, chunks=(chunks["row"], dims["chan"], dims["corr"])
)
new_ds = ds.assign(DATA=(("row", "chan", "corr"), da_data))
dask.compute(xds_to_table(new_ds, ms, ["DATA"]))
This pattern also applies when jax.Array objects are produced
per-chunk, for example from a dask.array.map_blocks() call over a
JAX-jitted function:
import jax
@jax.jit
def predict(uvw_chunk):
... # returns a jax.Array of shape (row, chan, corr)
da_data = da.map_blocks(
predict, ds.UVW.data,
dtype="complex64",
new_axis=[1, 2],
chunks=(chunks["row"], dims["chan"], dims["corr"]),
)
CuPy¶
CuPy arrays live on the GPU. dask.array.from_array() introspects
the dtype correctly, and dask-ms transfers each chunk to CPU inside the
write task via cupy.ndarray.get().
import dask
import dask.array as da
import cupy as cp
from daskms import xds_from_ms, xds_to_table
ms = "path/to/data.ms"
ds = xds_from_ms(ms, columns=["DATA"], group_cols=[],
chunks={"row": 100})[0]
dims, chunks = ds.sizes, ds.chunks
cupy_data = cp.zeros(
(dims["row"], dims["chan"], dims["corr"]), dtype=cp.complex64
)
da_data = da.from_array(
cupy_data, chunks=(chunks["row"], dims["chan"], dims["corr"])
)
new_ds = ds.assign(DATA=(("row", "chan", "corr"), da_data))
dask.compute(xds_to_table(new_ds, ms, ["DATA"]))
PyTorch¶
PyTorch tensor dtype objects (torch.complex64, etc.) are not
NumPy-compatible, so dask.array.from_array() cannot introspect the
dtype at graph-construction time. Use dask.array.from_delayed()
with an explicit NumPy dtype string instead, building one delayed
chunk per row-chunk:
import dask
import dask.array as da
import torch
from daskms import xds_from_ms, xds_to_table
ms = "path/to/data.ms"
ds = xds_from_ms(ms, columns=["DATA"], group_cols=[],
chunks={"row": 100})[0]
dims, chunks = ds.sizes, ds.chunks
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_data = torch.zeros(
dims["row"], dims["chan"], dims["corr"],
dtype=torch.complex64, device=device,
)
parts, row = [], 0
for rc in chunks["row"]:
chunk = torch_data[row: row + rc]
parts.append(
da.from_delayed(
dask.delayed(chunk),
shape=(rc, dims["chan"], dims["corr"]),
dtype="complex64", # NumPy dtype — no torch dependency here
)
)
row += rc
da_data = da.concatenate(parts, axis=0)
new_ds = ds.assign(DATA=(("row", "chan", "corr"), da_data))
dask.compute(xds_to_table(new_ds, ms, ["DATA"]))
For CUDA tensors, the device-to-host transfer (tensor.to("cpu")) is
performed inside each write task. For CPU tensors no copy is made at the
dask layer.