...
 
Commits (3)
......@@ -5,15 +5,22 @@ import random
from string import ascii_lowercase
@pytest.fixture(scope="function")
def temp_dir(tmpdir_factory):
temp_dir = tmpdir_factory.mktemp('data')
yield temp_dir
temp_dir.remove()
@pytest.mark.parametrize("chars_per_line",
list(2 ** n for n in range(4, 10, 2)))
@pytest.mark.parametrize("N_lines",
list(2 ** n for n in range(8, 18, 2)))
def test_random_log_file_writes(N_lines, chars_per_line, tmpdir):
def test_random_log_file_writes(N_lines, chars_per_line, temp_dir):
rand_state = random.getstate()
file_name = str(tmpdir / "log.log")
file_name = str(temp_dir / "log.log")
with open(file_name, "w") as f:
for n in range(N_lines):
......@@ -36,11 +43,11 @@ def test_random_log_file_writes(N_lines, chars_per_line, tmpdir):
list(2 ** n for n in range(4, 16, 4)))
@pytest.mark.parametrize("N_lines",
list(2 ** n for n in range(8, 20, 4)))
def test_uniform_log_file_writes(N_lines, chars_per_line, tmpdir):
def test_uniform_log_file_writes(N_lines, chars_per_line, temp_dir):
rand_state = random.getstate()
file_name = str(tmpdir / "log.log")
file_name = str(temp_dir / "log.log")
# This line will be written `N_lines` times
log_line = "".join(random.choice(ascii_lowercase)
......
......@@ -26,6 +26,13 @@ def _get_xr_data_array(array):
return xr.DataArray(array, dims=dims, name="array")
@pytest.fixture(scope="function")
def temp_dir(tmpdir_factory):
temp_dir = tmpdir_factory.mktemp('data')
yield temp_dir
temp_dir.remove()
@pytest.mark.parametrize(
"ndim",
[3,
......@@ -37,8 +44,11 @@ def _get_xr_data_array(array):
for n in range(6, 8)))
@pytest.mark.parametrize("num_chunks", [2, 8])
@pytest.mark.parametrize("use_zarr_or_nc4", ["zarr", "nc4"])
@pytest.mark.parametrize("only_write", [False, True])
@pytest.mark.parametrize("threads_per_worker", [4, 1])
@pytest.mark.timeout(_get_timeout())
def test_large_arrays(num_chunks, N, ndim, tmpdir, use_zarr_or_nc4):
def test_large_arrays(num_chunks, N, ndim, temp_dir, use_zarr_or_nc4,
only_write, threads_per_worker):
"""Write random data sets to disk and re-read check for corrupted data.
Random arrays are created with dask. Xarray uses either ZARR or netCDF4 as
......@@ -46,7 +56,7 @@ def test_large_arrays(num_chunks, N, ndim, tmpdir, use_zarr_or_nc4):
"""
with LocalCluster(
n_workers=1, threads_per_worker=4,
n_workers=1, threads_per_worker=threads_per_worker,
memory_limit=500e6, silence_logs=50) as cluster:
# determine chunk size
......@@ -61,7 +71,7 @@ def test_large_arrays(num_chunks, N, ndim, tmpdir, use_zarr_or_nc4):
print("\n", cluster, "data_size", ds.nbytes / 1e6, "MiB")
if use_zarr_or_nc4 == "zarr":
output_store = tmpdir.join(
output_store = temp_dir.join(
"arr_{ndim}_{N}.zarr".format(ndim=ndim, N=N))
output_store = str(output_store)
......@@ -70,7 +80,7 @@ def test_large_arrays(num_chunks, N, ndim, tmpdir, use_zarr_or_nc4):
ds_reread = xr.open_zarr(output_store)
elif use_zarr_or_nc4 == "nc4":
output_file = tmpdir.join(
output_file = temp_dir.join(
"arr_{ndim}_{N}.nc".format(ndim=ndim, N=N))
output_file = str(output_file)
......@@ -78,5 +88,6 @@ def test_large_arrays(num_chunks, N, ndim, tmpdir, use_zarr_or_nc4):
ds_reread = xr.open_dataset(output_file, chunks=ds.chunks)
for v in ds.data_vars:
assert (ds[v].data == ds_reread[v].data).all()
if not only_write:
for v in ds.data_vars:
assert (ds[v].data == ds_reread[v].data).all()