Skip to content

Commit

Permalink
Merge pull request #109 from IMMM-SFA/Jaydon2005-patch-3
Browse files Browse the repository at this point in the history
Create test_hymod.py test file for hymod file.
  • Loading branch information
Jaydon2005 authored Nov 26, 2024
2 parents 528968b + 20d48c2 commit e082a1a
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 22 deletions.
58 changes: 36 additions & 22 deletions msdbook/hymod.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import math

import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
Expand Down Expand Up @@ -38,6 +37,8 @@ def plot_observed_vs_simulated_streamflow(df, hymod_dict, figsize=[12, 6]):
# set axis labels
ax.set_ylabel("Streamflow($m^3/s$)")
ax.set_xlabel("Days")

# add legend
ax.legend()

# set plot title
Expand Down Expand Up @@ -329,21 +330,27 @@ def Pdm01(Hpar, Bpar, Hbeg, PP, PET):

def Nash(K, N, Xbeg, Inp):
"""Fill in description."""

OO = np.zeros(N)
Xend = np.zeros(N)

for Res in range(0, N):
OO[Res] = K * Xbeg[Res]
Xend[Res] = Xbeg[Res] - OO[Res]

if Res == 0:
Xend[Res] = Xend[Res] + Inp
else:
Xend[Res] = Xend[Res] + OO[Res - 1]
# Check if Xbeg is a scalar or array and adjust accordingly
if np.ndim(Xbeg) == 0: # Xbeg is scalar
OO[0] = K * Xbeg
Xend[0] = Xbeg - OO[0]

if N > 1:
Xend[1:] = Xbeg # If N > 1, assume the same value for other elements
else: # Xbeg is an array
for Res in range(0, N):
OO[Res] = K * Xbeg[Res]
Xend[Res] = Xbeg[Res] - OO[Res]

if Res == 0:
Xend[Res] += Inp # Add input only to the first time step
else:
Xend[Res] += OO[Res - 1] # Add previous output to the current state

out = OO[N - 1]

return out, Xend


Expand Down Expand Up @@ -371,19 +378,26 @@ def Hymod01(Data, Pars, InState):

# run soil moisture accounting including evapotranspiration
OV[i], ET[i], XHuz[i], XCuz[i] = Pdm01(
Pars["Huz"], Pars["B"], XHuz[i], Data["Precip"].iloc[i], Data["Pot_ET"].iloc[i]
)
Pars["Huz"], Pars["B"], XHuz[i], Data["Precip"].iloc[i], Data["Pot_ET"].iloc[i]
)

# run Nash Cascade routing of quickflow component
Qq[i], Xq[i, :] = Nash(Pars["Kq"], Pars["Nq"], Xq[i, :], Pars["Alp"] * OV[i])

# run slow flow component, one infinite linear tank
Qs[i], Xs[i] = Nash(Pars["Ks"], 1, [Xs[i]], (1 - Pars["Alp"]) * OV[i])
Xs_scalar = Xs[i].item() if np.ndim(Xs[i]) == 0 else Xs[i] # Ensure scalar extraction
OV_scalar = (1 - Pars["Alp"]) * OV[i]

nash_output = Nash(Pars["Ks"], 1, Xs_scalar, OV_scalar)

Qs[i] = nash_output[0]
Xs[i] = nash_output[1][0]


if i < len(Data) - 1:
XHuz[i + 1] = XHuz[i]
Xq[i + 1] = Xq[i]
Xs[i + 1] = Xs[i]
XHuz[i + 1] = XHuz[i]
Xq[i + 1, :] = Xq[i, :] # Fixed
Xs[i + 1] = Xs[i]

Q[i] = Qs[i] + Qq[i]

Expand All @@ -407,11 +421,11 @@ def hymod(Nq, Kq, Ks, Alp, Huz, B, hymod_dataframe, ndays):
"""Hymod main function.
:param Nq: number of quickflow routing tanks
:param Kq: quickflow routing tanks parameters - Range [0.1, 1]
:param Ks: slowflow routing tanks rate parameter - Range [0, 0.1]
:param Alp: Quick-slow split parameters - Range [0, 1]
:param Huz: Max height of soil moisture accounting tanks - Range [0, 500]
:param B: Distribution function shape parameter - Range [0, 2]
:param Kq: quickflow routing tanks parameters
:param Ks: slowflow routing tanks rate parameter
:param Alp: Quick-slow split parameters
:param Huz: Max height of soil moisture accounting tanks
:param B: Distribution function shape parameter
:param hymod_dataframe: Dataframe of hymod data
:param ndays: The number of days to process from the beginning of the record
Expand Down
91 changes: 91 additions & 0 deletions msdbook/tests/test_hymod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pytest
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from msdbook.hymod import (
plot_observed_vs_simulated_streamflow,
plot_observed_vs_sensitivity_streamflow,
plot_monthly_heatmap,
plot_annual_heatmap,
plot_varying_heatmap,
plot_precalibration_flow,
plot_precalibration_glue,
Pdm01,
Nash,
Hymod01,
hymod
)

@pytest.fixture
def sample_data():
"""Fixture for sample input data."""
dates = pd.date_range(start='2000-01-01', periods=10)
df = pd.DataFrame({
'Precip': np.random.rand(10),
'Pot_ET': np.random.rand(10),
'Strmflw': np.random.rand(10)
}, index=dates)
return df

@pytest.fixture
def sample_simulated():
"""Fixture for sample simulated data."""
return pd.DataFrame(np.random.rand(10, 3), columns=['Sim1', 'Sim2', 'Sim3'])

@pytest.fixture
def sample_heatmap_data():
"""Fixture for sample heatmap data."""
return np.random.rand(5, 12), pd.DataFrame(np.random.rand(12), columns=['Strmflw'])

def test_plot_observed_vs_simulated_streamflow(sample_data):
"""Test if the function for plotting observed vs simulated streamflow works without errors."""
hymod_dict = {"Q": np.random.rand(len(sample_data))}
ax = plot_observed_vs_simulated_streamflow(sample_data, hymod_dict)
assert isinstance(ax, plt.Axes)
plt.close()

def test_plot_observed_vs_sensitivity_streamflow(sample_data, sample_simulated):
"""Test if the function for plotting observed vs sensitivity streamflow works without errors."""
ax = plot_observed_vs_sensitivity_streamflow(sample_data, sample_simulated)
assert isinstance(ax, plt.Axes)
plt.close()

def test_plot_monthly_heatmap(sample_heatmap_data):
"""Test if the function for plotting monthly heatmap works without errors."""
arr_sim, df_obs = sample_heatmap_data
ax, ax2 = plot_monthly_heatmap(arr_sim, df_obs)
assert isinstance(ax, plt.Axes)
assert isinstance(ax2, plt.Axes)
plt.close()

def test_Pdm01():
"""Test Pdm01 function for correct output."""
OV, ET, Hend, Cend = Pdm01(1.0, 0.5, 0.2, 0.1, 0.3)
assert isinstance(OV, float)
assert isinstance(ET, float)
assert isinstance(Hend, float)
assert isinstance(Cend, float)

def test_Nash():
"""Test Nash function for correct output."""
out, Xend = Nash(0.5, 5, np.zeros(5), 1.0)
assert isinstance(out, float)
assert isinstance(Xend, np.ndarray)
assert Xend.shape == (5,)

def test_Hymod01(sample_data):
"""Test Hymod01 function for correct output."""
pars = {"Nq": 2, "Kq": 0.5, "Ks": 0.1, "Alp": 0.3, "Huz": 0.5, "B": 1.0}
init = {"Xq": np.zeros(pars["Nq"]), "Xs": 0, "XHuz": 0}
results = Hymod01(sample_data, pars, init)
assert isinstance(results, dict)
assert all(key in results for key in ["XHuz", "XCuz", "Xq", "Xs", "ET", "OV", "Qq", "Qs", "Q"])
assert isinstance(results["XHuz"], np.ndarray)
assert isinstance(results["Q"], np.ndarray)

def test_hymod(sample_data):
"""Test hymod function for correct output."""
results = hymod(2, 0.5, 0.1, 0.3, 0.5, 1.0, sample_data, 10)
assert isinstance(results, dict)
assert all(key in results for key in ["XHuz", "XCuz", "Xq", "Xs", "ET", "OV", "Qq", "Qs", "Q"])
assert isinstance(results["Q"], np.ndarray)

0 comments on commit e082a1a

Please sign in to comment.