Skip to content
Commits on Source (2)
from . import toolbox, processing, plotting
from .processing import conversions, diagnostics
from .io.api import open_product, select, info, plot, report
from . import processing, plotting
from .io import api as io
from .processing import diagnostics, conversions
from .plotting import plot
from .io.api import open_product, save_product, select, info, report
__all__ = [
# modules
"toolbox",
"api",
"io",
"processing",
"plotting",
# functions
# core functions
"open_product",
"save_product",
"info",
"select",
"conversions",
......
"""
Data IO api
Data IO api.
"""
# flake8: noqa
from .products import open_product, info, select
from ..plotting import plot
from .products import open_product, save_product, info, select
from .reports import report
import xarray as xr
from pathlib import Path
from typing import Any, Dict
def open_product(path1, path2, **kwargs: Dict[str, Any]) -> Dict[str, xr.Dataset]:
raise NotImplementedError("Import of data through Intake is not yet implemented.")
def open_product(
path0: str,
path1: str,
*args: Any,
experimental: bool = False,
**kwargs: Dict[str, Any]
) -> Dict[str, xr.Dataset]:
if not args:
names = [Path(path0).stem, Path(path1).stem]
else:
names = [*args]
if experimental:
ds = {names[0]: xr.open_dataset(path0), names[1]: xr.open_dataset(path1)}
else:
raise NotImplementedError(
"Import of data through Intake is not yet implemented."
)
return ds
def save_product(
ds: Dict[str, xr.Dataset],
path: str,
experimental: bool = False,
**kwargs: Dict[str, Any]
) -> None:
if experimental:
for k, v in ds.items():
v.to_netcdf(Path(path).joinpath(k + ".cdf"))
else:
raise NotImplementedError("Export of data is not yet implemented.")
return None
def info(wndpr: Dict[str, xr.Dataset], *args: Any, **kwargs: Dict[str, Any]) -> None:
......
......@@ -2,5 +2,5 @@ import xarray as xr
from typing import Any, Dict
def report(wndpr: Dict[str, xr.Dataset], *args: Any, **kwargs: Dict[str, Any]) -> None:
def report(wnd: Dict[str, xr.Dataset], *args: Any, **kwargs: Dict[str, Any]) -> None:
raise NotImplementedError("Generation of reports is not yet implemented.")
import xarray as xr
from typing import Dict, Any
from functools import singledispatch
from typing import Dict, Any, Union, List, Optional
import matplotlib.pyplot as plt
def plot(wndpr: Dict[str, xr.Dataset], *args: Any, **kwargs: Dict[str, Any]):
raise NotImplementedError("Plotting is not yet implemented.")
class Plot:
@staticmethod
def power_spectral_density(
wnd: Dict[str, xr.Dataset], *args: Any, **kwargs: Dict[str, Any]
) -> None:
for wndkey in wnd.keys():
plt.semilogx(wnd[wndkey].power_spectral_density)
plt.legend(wnd.keys())
plt.title("PSD: power spectral density")
plt.xlabel("Frequency")
plt.ylabel("Power")
plt.tight_layout()
plt.grid()
return None
@singledispatch
def plot(*args, **kwargs):
raise NotImplementedError("Data type not supported for conversion.")
@plot.register
def _(
wnddict: dict,
var: str,
*args: Any,
dataset: Optional[Union[str, List[str]]] = None,
**kwargs: Dict[str, Any]
) -> None:
if dataset is None:
dataset = []
else:
raise NotImplementedError(
"Specifing a dataset for plots is not implemented yet."
)
if getattr(Plot, var, None) is not None:
getattr(Plot, var)(wnddict, *dataset, *args, **kwargs)
else:
for wndkey in wnddict.keys():
wnddict[wndkey][var].plot()
return None
"""Preprocessing module."""
"""
Preprocessing module.
"""
import numpy as np
import xarray as xr
from typing import Callable, Union, Optional, Dict, Any
from functools import singledispatch
from scipy import signal
from typing import Callable, Union, Optional, Dict, Any, Iterable
class BulkFormula:
......@@ -602,13 +606,74 @@ def sverdrup_transport(X: xr.Dataset) -> xr.Dataset:
return X
def conversions(
wndpr: Dict[str, xr.Dataset], *args: Any, **kwargs: Dict[str, Any]
) -> Dict[str, xr.Dataset]:
class Conversions:
def __init__(self):
raise NotImplementedError("Conversions are not yet implemented.")
def diagnostics(
wndpr: Dict[str, xr.Dataset], *args: Any, **kwargs: Dict[str, Any]
@singledispatch
def conversions(*args, **kwargs):
raise NotImplementedError("Data type not supported for conversion.")
class Diagnostics:
@staticmethod
def welch(da: xr.DataArray, *args: Any, **kwargs: Dict[str, Any]) -> xr.Dataset:
f, psd = signal.welch(da.values.flat, *args, **kwargs)
ds = xr.Dataset(
{"power_spectral_density": ("frequency", psd)},
coords={"frequency": (["frequency"], f)},
)
return ds
@singledispatch
def diagnostics(*args, **kwargs):
raise NotImplementedError("Data type not supported.")
@diagnostics.register
def _(da: xr.DataArray, diag: str, *args: Any, **kwargs: Dict[str, Any]) -> xr.Dataset:
if getattr(Diagnostics, diag, None) is not None:
y = getattr(Diagnostics, diag)(da, *args, **kwargs)
elif getattr(da, diag, None) is not None:
y = getattr(da, diag)(*args, **kwargs)
else:
raise ValueError("Unknown diagnostic.")
return y
@diagnostics.register # type: ignore
def _(
ds: xr.Dataset, var: str, diag: str, *args: Any, **kwargs: Dict[str, Any]
) -> xr.Dataset:
ds = ds.merge(diagnostics(ds[var], diag, *args, **kwargs))
return ds
@diagnostics.register # type: ignore
def _(
wnddict: dict,
var: str,
diag: str,
*args: Any,
dataset: Optional[Iterable] = None,
**kwargs: Dict[str, Any]
) -> Dict[str, xr.Dataset]:
raise NotImplementedError("Diagnostics are not yet implemented.")
if dataset is None:
dataset = wnddict.keys()
else:
raise NotImplementedError(
"Specifing a dataset for diagnostics is not implemented yet."
)
for wndkey in dataset:
wnddict[wndkey] = wnddict[wndkey].merge(
diagnostics(wnddict[wndkey][var], diag, *args, **kwargs)
)
return wnddict
import pytest
from windeval import plotting
from windeval import plotting, processing
def test_report(X):
def test_plot(X):
plotting.plot({"ds": X}, "eastward_wind")
with pytest.raises(NotImplementedError):
plotting.plot({"ds": X})
plotting.plot(False)
with pytest.raises(NotImplementedError):
plotting.plot({"ds": X}, "eastward_wind", dataset=[])
ds = processing.diagnostics({"ds": X}, "eastward_wind", "welch")
plotting.plot(ds, "power_spectral_density")
......@@ -190,5 +190,5 @@ def test_conversions(X):
def test_diagnostics(X):
with pytest.raises(NotImplementedError):
processing.diagnostics({"ds": X})
ds = processing.diagnostics({"ds": X}, "eastward_wind", "welch")
assert isinstance(ds["ds"]["power_spectral_density"], xr.DataArray)
"""Wrapper."""
import xarray as xr
from functools import singledispatch
from typing import Optional, Any, Dict, List
from typing import Optional
from . import processing
@singledispatch
def calculate(ds, *args, **kwargs):
raise NotImplementedError("Data type not supported.")
@calculate.register
def _(
ds: xr.Dataset, var: str, diag: str, *args: Any, **kwargs: Dict[str, Any]
) -> xr.DataArray:
return calculate(ds[var], diag, *args, **kwargs)
@calculate.register # type: ignore
def _(
da: xr.DataArray, diag: str, *args: Any, **kwargs: Dict[str, Any]
) -> xr.DataArray:
f = getattr(processing, diag, None)
if f is not None:
y = f(da, *args, **kwargs)
else:
y = getattr(da, diag)(*args, **kwargs)
return y
@calculate.register # type: ignore
def _(
d: dict, keys: List[str], diag: str, *args: Any, **kwargs: Dict[str, Any]
) -> xr.DataArray:
f = getattr(processing, diag)
return f(*[d[k] for k in keys], *args, **kwargs)
def ekman(
X: xr.Dataset,
component: str = "eastward",
......