Commit 95e07d18 authored by Ezra Eisbrenner's avatar Ezra Eisbrenner

Merge branch 'feat/add_sverdrup_transport' into 'master'

Add Sverdrup transport

See merge request !19
parents 01acd72c ca91e02e
Pipeline #10377 passed with stage
in 1 minute and 6 seconds
Wrapper
=======
.. automodule:: windeval.wrapper
:members:
:undoc-members:
:show-inheritance:
......@@ -39,6 +39,7 @@ Planned logic:
:caption: Modules
_source/importer
_source/wrapper
_source/processing
_source/analysis
......
from . import toolbox, processing
from .wrapper import ekman, sverdrup
__all__ = ["toolbox", "processing"]
__all__ = ["toolbox", "processing", "ekman", "sverdrup"]
......@@ -69,7 +69,9 @@ class BulkFormula:
self, bulk_formula.lower()
)
def generic(self, X: xr.Dataset, component: str) -> xr.DataArray:
def generic(
self, X: xr.Dataset, component: str, extend_ranges: Optional[bool] = None
) -> xr.DataArray:
"""Definition of generic bulk formula.
.. math::
......@@ -83,6 +85,10 @@ class BulkFormula:
component : str
Name of the zonal or meridional wind component as defined by the CF_ naming
convention.
extend_ranges : bool, optional
Fills undefined areas of the bulk formulas if `True`, returns `numpy.nan`
values otherwise, defaults to defaults to wrapped function default.
Returns
-------
......@@ -90,8 +96,16 @@ class BulkFormula:
Wind stress.
"""
d = {}
for x in ["extend_ranges"]:
if locals()[x] is None:
continue
d[x] = locals()[x]
tau = (
X.air_density * self.Cd(X, component) * np.abs(X[component]) * X[component]
X.air_density
* self.Cd(X, component, **d)
* np.abs(X[component])
* X[component]
)
return tau
......@@ -385,6 +399,7 @@ def surface_downward_eastward_stress(
X: xr.Dataset,
drag_coefficient: Optional[str] = None,
bulk_formula: Optional[str] = None,
extend_ranges: Optional[bool] = None,
) -> xr.Dataset:
"""Caclulate surface downward eastward stress.
......@@ -406,7 +421,7 @@ def surface_downward_eastward_stress(
"""
X["surface_downward_eastward_stress"] = BulkFormula(
*[s for s in [drag_coefficient, bulk_formula] if s is not None]
).calculate(X, "eastward_wind")
).calculate(X, "eastward_wind", *[s for s in [extend_ranges] if s is not None])
return X
......@@ -415,6 +430,7 @@ def surface_downward_northward_stress(
X: xr.Dataset,
drag_coefficient: Optional[str] = None,
bulk_formula: Optional[str] = None,
extend_ranges: Optional[bool] = None,
) -> xr.Dataset:
"""Caclulate surface downward northward stress.
......@@ -436,7 +452,7 @@ def surface_downward_northward_stress(
"""
X["surface_downward_northward_stress"] = BulkFormula(
*[s for s in [drag_coefficient, bulk_formula] if s is not None]
).calculate(X, "northward_wind")
).calculate(X, "northward_wind", *[s for s in [extend_ranges] if s is not None])
return X
......@@ -461,7 +477,9 @@ def northward_ekman_transport(X: xr.Dataset) -> xr.Dataset:
"""
_has(X, "surface_downward_eastward_stress")
X["northward_ekman_transport"] = -X.surface_downward_eastward_stress / X.latitude
X[
"northward_ekman_transport"
] = -X.surface_downward_eastward_stress / _Coriolis().parameter(X.latitude)
return X
......@@ -486,11 +504,28 @@ def eastward_ekman_transport(X: xr.Dataset) -> xr.Dataset:
"""
_has(X, "surface_downward_northward_stress")
X["eastward_ekman_transport"] = X.surface_downward_northward_stress / X.latitude
X[
"eastward_ekman_transport"
] = X.surface_downward_northward_stress / _Coriolis().parameter(X.latitude)
return X
class _Coriolis:
def __init__(self):
self.c = 2 * 2 * np.pi / ((23 * 60 + 56) * 60 + 4.1)
def parameter(self, y: xr.DataArray) -> xr.DataArray:
f = self.c * np.sin(np.deg2rad(y))
return f
def derivative(self, y: xr.DataArray) -> xr.DataArray:
b = self.c * np.cos(np.deg2rad(y))
return b
def _has(X: xr.Dataset, v: str) -> None:
"""Calculate variable `v` if missing in `X`.
......@@ -501,3 +536,67 @@ def _has(X: xr.Dataset, v: str) -> None:
globals()[v](X)
return None
def sverdrup_transport(X: xr.Dataset) -> xr.Dataset:
"""Calculate Sverdrup transport.
.. math::
V = \\hat{\\mathbf{k}} \\cdot \\frac{\\mathbf{\\nabla}\\times\\tau}{\\beta}
Parameters
----------
X : array_like
Wind product data as Xarray-DataSet.
Returns
-------
array_like
Sverdrup transport.
"""
_has(X, "surface_downward_eastward_stress")
_has(X, "surface_downward_northward_stress")
def mesh(X, M):
dims = [x for x in list(X.coords) if x not in ["longitude", "latitude"]]
for d in dims:
M = np.multiply.outer(np.ones(len(X.coords[d])), M)
return M
X["sverdrup_transport"] = (
[x for x in reversed(list(X.coords))],
np.full(X.surface_downward_northward_stress.shape, np.nan),
)
lon = mesh(
X,
np.outer(
np.ones(-1 + len(X.coords["latitude"])),
X.longitude.values[1:] - X.longitude.values[:-1],
),
)
lat = mesh(
X,
np.outer(
X.latitude.values[1:] - X.latitude.values[:-1],
np.ones(-1 + len(X.coords["longitude"])),
),
)
X["sverdrup_transport"][..., :-1, :-1] = (
(
X.surface_downward_northward_stress[..., 1:, :-1].values
- X.surface_downward_northward_stress[..., :-1, :-1].values
)
/ lon
- (
X.surface_downward_eastward_stress[..., :-1, 1:].values
- X.surface_downward_eastward_stress[..., :-1, :-1].values
)
/ lat
) / _Coriolis().derivative(lat)
return X
import os
import pytest
import numpy as np
import xarray as xr
......@@ -15,3 +16,40 @@ def station():
s = s.rename({"lat": "latitude", "lon": "longitude", "WS_401": "wind_speed"})
s.attrs["WindProductType"] = "Station"
return s
@pytest.fixture
def X():
i, j, k, t = 4, 5, 5, 6
ds = xr.Dataset(
{
"eastward_wind": (
("time", "depth", "latitude", "longitude"),
np.reshape(
np.append(np.array([3, 1]), np.ones(i * j * k * t - 2)),
(t, k, j, i),
),
),
"northward_wind": (
("time", "depth", "latitude", "longitude"),
np.reshape(
np.append(np.array([4, np.nan]), np.ones(i * j * k * t - 2)),
(t, k, j, i),
),
),
"air_density": (
("time", "depth", "latitude", "longitude"),
np.reshape(
np.append(np.array([1, 1]), np.ones(i * j * k * t - 2)),
(t, k, j, i),
),
),
},
coords={
"longitude": xr.DataArray(np.array(range(i)), dims=("longitude",)),
"latitude": xr.DataArray(np.array(range(j)), dims=("latitude",)),
"depth": xr.DataArray(np.array(range(k)), dims=("depth",)),
"time": xr.DataArray(np.array(range(t)), dims=("time",)),
},
)
return ds
......@@ -6,23 +6,6 @@ import xarray as xr
from windeval import processing
@pytest.fixture
def X():
ds = xr.Dataset(
{
"eastward_wind": (("x", "y"), np.array([[3], [1]])),
"northward_wind": (("x", "y"), np.array([[4], [np.nan]])),
"air_density": (("x", "y"), np.array([[1], [1]])),
},
coords={
"latitude": (("x", "y"), np.array([[1], [2]])),
"longitude": (("x", "y"), np.array([[1], [2]])),
},
)
return ds
@pytest.fixture
def accuracy():
return 1e-12
......@@ -123,8 +106,8 @@ def test_BulkFormula_K00(accuracy):
def test_wind_speed(X):
processing.wind_speed(X)
assert X.data_vars["wind_speed"].values[0] == 5.0
assert np.isnan(X.data_vars["wind_speed"].values[1])
assert X.data_vars["wind_speed"].values[0, 0, 0, 0] == 5.0
assert np.isnan(X.data_vars["wind_speed"].values[0, 0, 0, 1])
def test_surface_downward_eastward_stress(X):
......@@ -132,8 +115,14 @@ def test_surface_downward_eastward_stress(X):
X, drag_coefficient="ncep_ncar_2007", bulk_formula="generic"
)
y = 1.3e-3 * X.eastward_wind ** 2
assert X.data_vars["surface_downward_eastward_stress"].values[0] == y[0]
assert X.data_vars["surface_downward_eastward_stress"].values[1] == y[1]
assert (
X.data_vars["surface_downward_eastward_stress"].values[0, 0, 0, 0]
== y[0, 0, 0, 0]
)
assert (
X.data_vars["surface_downward_eastward_stress"].values[0, 0, 0, 1]
== y[0, 0, 0, 1]
)
def test_surface_downward_northward_stress(X):
......@@ -141,8 +130,11 @@ def test_surface_downward_northward_stress(X):
X, drag_coefficient="ncep_ncar_2007", bulk_formula="generic"
)
y = 1.3e-3 * X.northward_wind ** 2
assert X.data_vars["surface_downward_northward_stress"].values[0] == y[0]
assert np.isnan(X.data_vars["surface_downward_northward_stress"].values[1])
assert (
X.data_vars["surface_downward_northward_stress"].values[0, 0, 0, 0]
== y[0, 0, 0, 0]
)
assert np.isnan(X.data_vars["surface_downward_northward_stress"].values[0, 0, 0, 1])
def test_northward_ekman_transport(X):
......@@ -150,9 +142,13 @@ def test_northward_ekman_transport(X):
X, drag_coefficient="ncep_ncar_2007", bulk_formula="generic"
)
processing.northward_ekman_transport(X)
y = -1.3e-3 * X.eastward_wind ** 2 / X.latitude
assert X.data_vars["northward_ekman_transport"].values[0] == y[0]
assert X.data_vars["northward_ekman_transport"].values[1] == y[1]
y = (
-1.3e-3
* X.eastward_wind.values[0, 0, 0, :2] ** 2
/ processing._Coriolis().parameter(X.latitude.values[0])
)
assert X.data_vars["northward_ekman_transport"].values[0, 0, 0, 0] == y[0]
assert X.data_vars["northward_ekman_transport"].values[0, 0, 0, 1] == y[1]
def test_eastward_ekman_transport(X):
......@@ -160,13 +156,29 @@ def test_eastward_ekman_transport(X):
X, drag_coefficient="ncep_ncar_2007", bulk_formula="generic"
)
processing.eastward_ekman_transport(X)
y = 1.3e-3 * X.northward_wind ** 2 / X.latitude
assert X.data_vars["eastward_ekman_transport"].values[0] == y[0]
assert np.isnan(X.data_vars["eastward_ekman_transport"].values[1])
y = (
1.3e-3
* X.northward_wind.values[0, 0, 0, 0] ** 2
/ processing._Coriolis().parameter(X.latitude.values[0])
)
assert X.data_vars["eastward_ekman_transport"].values[0, 0, 0, 0] == y
assert np.isnan(X.data_vars["eastward_ekman_transport"].values[0, 0, 0, 1])
def test_eastward_ekman_transport_v2(X):
processing.eastward_ekman_transport(X)
y = 1.3e-3 * X.northward_wind ** 2 / X.latitude
assert X.data_vars["eastward_ekman_transport"].values[0] == y[0]
assert np.isnan(X.data_vars["eastward_ekman_transport"].values[1])
y = (
1.3e-3
* X.northward_wind.values[0, 0, 0, 0] ** 2
/ processing._Coriolis().parameter(X.latitude.values[0])
)
assert X.data_vars["eastward_ekman_transport"].values[0, 0, 0, 0] == y
assert np.isnan(X.data_vars["eastward_ekman_transport"].values[0, 0, 0, 1])
def test_sverdrup_transport(X):
processing.sverdrup_transport(X)
assert math.isclose(
X.sverdrup_transport[0, 0, 0, 0].values, -62.40566775, rel_tol=1e-7
)
assert np.isnan(X.data_vars["sverdrup_transport"].values[0, 0, 0, 1])
import windeval
def test_ekman(X):
windeval.ekman(X, drag_coefficient="large_and_pond_1981", extend_ranges=True)
def test_sverdrup(X):
windeval.sverdrup(X, drag_coefficient="large_and_yeager_2004")
"""Wrapper."""
import xarray as xr
from typing import Optional
from . import processing as pr
def ekman(
X: xr.Dataset,
component: str = "eastward",
drag_coefficient: Optional[str] = None,
bulk_formula: Optional[str] = None,
extend_ranges: Optional[bool] = None,
) -> xr.Dataset:
"""Wrapper for Ekman transport calculations.
Parameters
----------
X : array_like
Wind product data set.
component : str
Component of Ekman transport to be calculated, defaults to 'eastward'.
drag_coefficient : str, optional
Name of drag coefficient method, defaults to wrapped function default.
bulk_formula : str, optional
Name of bulk formula, defaults to wrapped function default.
extend_ranges : bool, optional
Fills undefined areas of the bulk formulas if `True`, returns `numpy.nan` values
otherwise, defaults to defaults to wrapped function default.
Returns
-------
array_like
Wind product data set with added Ekman transport and required fields.
"""
d = {}
for x in ["drag_coefficient", "bulk_formula", "extend_ranges"]:
if locals()[x] is None:
continue
d[x] = locals()[x]
c = "northward" if component == "eastward" else "eastward"
getattr(pr, "surface_downward_" + c + "_stress")(X, **d)
getattr(pr, component + "_ekman_transport")(X)
return X
def sverdrup(
X: xr.Dataset,
drag_coefficient: Optional[str] = None,
bulk_formula: Optional[str] = None,
extend_ranges: Optional[bool] = None,
) -> xr.Dataset:
"""Wrapper for Sverdrup transport calculations.
Parameters
----------
X : array_like
Wind product data set.
drag_coefficient : str, optional
Name of drag coefficient method, defaults to wrapped function default.
bulk_formula : str, optional
Name of bulk formula, defaults to wrapped function default.
extend_ranges : bool, optional
Fills undefined areas of the bulk formulas if `True`, returns `numpy.nan` values
otherwise, defaults to defaults to wrapped function default.
Returns
-------
array_like
Wind product data set with added Sverdrup transport and required fields.
"""
d = {}
for x in ["drag_coefficient", "bulk_formula", "extend_ranges"]:
if locals()[x] is None:
continue
d[x] = locals()[x]
pr.surface_downward_eastward_stress(X, **d)
pr.surface_downward_northward_stress(X, **d)
pr.sverdrup_transport(X)
return X
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment