Commit 588d572b authored by Willi Rath's avatar Willi Rath

First working version

name: py3_std
- conda-forge
- python=3
- coverage
- dask
- distributed
- flake8
- git-lfs
- jupyterlab
- netCDF4
- numpy
- pandas
- pytest
- pytest-cov
- seawater
- xarray
- zarr
# get path to tmp directory
[ "$1" == "" ] && { echo "Need one arg: <path-to-tmp>. Aborting."; exit; }
# get mc3 target directory
# activate working env and fail if there's a problem
source ${mc3_target_dir}/bin/activate py3_std || \
{ echo "Activation of working env failed.";
echo "Please run first.";
exit; }
# run all tests using a local tmp dir and monitor
# timing of all tests
pytest -v -s --basetemp=${tmp_dir} --durations=0
# calculate timestamp
time_stamp=$(date -Ins)
# target directory for the miniconda installation
# clean up
rm -rf ${mc3_target_dir}
# download miniconda
wget \
-O ${installer}
# install miniconda and clean up
bash ${installer} -b -p ${mc3_target_dir}
rm -f ${installer}
# activate root environment of installation
source ${mc3_target_dir}/bin/activate root
# create the working environment
conda env create -f environment.yml
import pytest
import numpy as np
import xarray as xr
# make sure to always use the same state of the PRNG
def _get_xr_data_array(array):
dims = tuple("dim_{:02d}".format(m)
for m in range(array.ndim))
return xr.DataArray(array, dims=dims, name="array")
@pytest.mark.parametrize("sequential_read_write", [False, True])
[1, 2, 3,
reason="Expect out-of-memory errors")(4)])
list(2 ** n for n in range(6)) +
list(pytest.mark.xfail(reason="Expect out-of-memory errors")(2 ** n)
for n in range(6, 9)))
@pytest.mark.timeout(4) # limit to 4 seconds per test
def test_random_array_write(N, ndim, sequential_read_write, tmpdir):
"""Write random data sets to disk and re-read check for corrupted data.
This creates an ndim dimensional array with length N in all directions that
is written to disk either sequentially or in one chunk. Reading the data
back is also done either sequentially or in one chunk.
# create test data
dims = tuple(N for m in range(ndim))
arr = np.random.randn(*dims)
ds = _get_xr_data_array(arr).to_dataset()
# set up output file
output_file = tmpdir.join(
"arr_{ndim}_{N}.nc".format(ndim=ndim, N=N))
output_file = str(output_file)
# write sequentially or in one chunk
if sequential_read_write:
for d00 in range(ds.array.shape[0]):
ds.isel(dim_00=slice(0, d00 + 1)).to_netcdf(
# read sequentially or in one from disk (and check for corruption)
ds_reread = xr.open_dataset(output_file)
for v in ds.data_vars:
if sequential_read_write:
for d00 in range(ds.array.shape[0]):
assert np.array_equal(
ds.isel(dim_00=slice(0, d00 + 1))[v].data,
ds_reread.isel(dim_00=slice(0, d00 + 1))[v].data)
assert np.array_equal(ds[v].data, ds_reread[v].data)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment