"""
Zuecco Hysteresis Index Calculator
Python implementation of the hysteresis index method by Zuecco et al. (2016)
Original code by Florian Ulrich Jehn: https://github.com/florianjehn/Hysteresis-Index-Zuecco
Please note that the code is not exactly the same and h results deviate slightly from each other.
The classification appears to be consistent.
Florian Ulrich Jehn. (2019). zutn/Hysteresis-Index-Zuecco: First public version (1.0). Zenodo. https://doi.org/10.5281/zenodo.3441882
Zuecco, G., Penna, D., Borga, M., & van Meerveld, H. J. (2016). A versatile index to characterize hysteresis between hydrological
variables in the frequency domain. Hydrological Processes, 30(9), 1554-1572. https://doi.org/10.1002/hyp.10681
This module calculates hysteresis indices and classification from time series
of two variables (typically discharge and concentration or soil moisture).
The method computes the difference between rising and falling limb areas
and classifies hysteresis patterns into 9 classes.
(cc) conrad.jackisch@tbt.tu-freiberg.de
"""
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from scipy.signal import find_peaks
from sklearn.preprocessing import MinMaxScaler
[docs]
def calculate_zuecco_metrics(df_obs, time_col='Etime', discharge_col='Q', concentration_col='C'):
"""
Calculate Zuecco hysteresis index and classification.
Parameters
----------
df_obs : pd.DataFrame
Observed data with time, discharge, and concentration columns
time_col, discharge_col, concentration_col : str
Column names for time, discharge, and concentration
Returns
-------
metrics_df : pd.DataFrame
Zuecco metrics:
- h_index: Hysteresis index (sum of differential areas)
- hyst_class: Hysteresis class (0-8)
- min_diff_area: Minimum differential area
- max_diff_area: Maximum differential area
df_all : pd.DataFrame
Processed time series data with:
- Q, C: Original discharge and concentration values
- QS, CS: Normalized (0-1) discharge and concentration
- Qphase, Cphase: Rising/falling phase indicators
- switchpoints: Peak markers (gQ, gC, lQ, lC)
Plus x_fixed interpolation stored in attrs:
- attrs['x_fixed_interp']: DataFrame with x_fixed, y_rise, y_fall, diff_area
"""
# Validate input
if ((discharge_col not in df_obs.columns) | (concentration_col not in df_obs.columns) | (time_col not in df_obs.columns)):
raise ValueError(f"Columns '{discharge_col}' and/or '{concentration_col}' and/or '{time_col}' not found in dataframe")
# Check for NaN values
if df_obs[discharge_col].isna().any() or df_obs[concentration_col].isna().any():
df_obs = df_obs.dropna()
print('Input data contains NaN values. Rows dropped. Consider data preparation before processing.')
df_all = df_obs[[time_col, discharge_col, concentration_col]].copy()
df_all.columns = ['Etime', 'Q', 'C'] # Rename to standard names
# Prepare time index
if pd.api.types.is_numeric_dtype(df_obs[time_col]):
df_all.index = pd.to_timedelta(df_all['Etime'], unit='D')
else:
df_all.index = df_all['Etime'] - df_all['Etime'].iloc[0]
# Resample and interpolate
df_all = df_all.resample('1h').first().interpolate(method='linear', limit=None, limit_direction='forward')
# Create numeric Etime from index for scaling
df_all['Etime'] = df_all.index.total_seconds() / 86400.0 # Convert to days
# Min-max scaling
df_all[['EtimeS', 'QS', 'CS']] = MinMaxScaler().fit_transform(df_all[['Etime', 'Q', 'C']])
# Find peaks and mark switchpoints
df_all['switchpoints'] = ''
df_all.loc[df_all.index[find_peaks(df_all['Q'])[0]], 'switchpoints'] = 'lQ'
df_all.loc[df_all.index[find_peaks(df_all['C'])[0]], 'switchpoints'] = 'lC'
df_all.loc[df_all.index[df_all['Q'].argmax()], 'switchpoints'] = 'gQ'
df_all.loc[df_all.index[df_all['C'].argmax()], 'switchpoints'] = 'gC'
# Define phases based on discharge
df_all['Qphase'] = 'rising'
df_all.loc[df_all.index[df_all['Q'].argmax()]:, 'Qphase'] = 'falling'
df_all['Cphase'] = 'rising'
df_all.loc[df_all.index[df_all['C'].argmax()]:, 'Cphase'] = 'falling'
# Create limbs using original elegant pandas approach
# Handle duplicates by taking mean of CS values at same QS
rising_limb = df_all.loc[df_all.Qphase == 'rising', ['QS', 'CS']].groupby('QS').mean()['CS']
falling_limb = df_all.loc[df_all.Qphase == 'falling', ['QS', 'CS']].groupby('QS').mean()['CS']
limbs = pd.concat([rising_limb, falling_limb], axis=1, sort=True).interpolate(method='linear')
limbs.columns = [0, 1] # Use integer column names for compatibility
# Define x_fixed points for Zuecco integration (default from original: 0.15 to 1.0)
x_fixed = pd.Series(np.linspace(0.15, 1.0, 18))
# Sample limbs at x_fixed points - limbs are already indexed by QS and interpolated
limbs_fixed = limbs.reindex(x_fixed, method='nearest', tolerance=0.01, limit=1)
limbs_fixed = limbs_fixed.interpolate(method='linear')
limbs_fixed.columns = [0, 1] # Ensure integer column names
# Check if we have valid data at fixed points
if limbs_fixed.isna().all().any():
print("Warning: Some x_fixed points could not be interpolated. Using fallback method.")
# Fallback: use simple interpolation across full range
limbs_fixed = pd.DataFrame({
0: np.interp(x_fixed, limbs.index, limbs[0]),
1: np.interp(x_fixed, limbs.index, limbs[1])
}, index=x_fixed)
# Calculate differential areas between rising and falling limbs
# Area of each trapezoid: (y1 + y2) * dx / 2
diff_area = pd.Series(index=range(len(x_fixed) - 1), dtype=float)
for j in range(len(x_fixed) - 1):
dx = x_fixed.iloc[j + 1] - x_fixed.iloc[j]
rise_trap = (limbs_fixed.iloc[j + 1, 0] + limbs_fixed.iloc[j, 0]) * dx / 2
fall_trap = (limbs_fixed.iloc[j + 1, 1] + limbs_fixed.iloc[j, 1]) * dx / 2
diff_area.iloc[j] = rise_trap - fall_trap
# Hysteresis index: sum of differential areas
h_index = diff_area.sum()
h_index = h_index if np.isfinite(h_index) else 0
# Get min/max for classification
min_diff_area = diff_area.min() if len(diff_area) > 0 else np.nan
max_diff_area = diff_area.max() if len(diff_area) > 0 else np.nan
# Classify hysteresis pattern (use renamed columns)
hyst_class = _find_hysteresis_class(
df_all['Q'], df_all['C'],
min_diff_area, max_diff_area, h_index
)
# Store results
metric_df = pd.DataFrame({
'h_index': [h_index],
'hyst_class': [hyst_class],
'min_diff_area': [min_diff_area],
'max_diff_area': [max_diff_area]
})
# Store limbs_fixed data in attrs for plotting
df_all.attrs['limbs_fixed'] = limbs_fixed
df_all.attrs['diff_area'] = diff_area
df_all.attrs['x_fixed'] = x_fixed
return metric_df, df_all
def _find_hysteresis_class(
x: pd.Series,
y: pd.Series,
min_diff_area: float,
max_diff_area: float,
h_index: float
) -> int:
"""
Classify hysteresis pattern into one of 9 classes (0-8).
Classes based on Zuecco et al. (2016):
0: Linear (no hysteresis)
1-4: Clockwise patterns
5-8: Counter-clockwise patterns
Returns
-------
hyst_class : int
Hysteresis class (0-8)
"""
pos_max_x = x.index.get_loc(x.idxmax())
# Check rising limb
min_y_rise = y.iloc[:pos_max_x + 1].min()
max_y_rise = y.iloc[:pos_max_x + 1].max()
change_max_y_rise = abs(max_y_rise - y.iloc[0])
change_min_y_rise = abs(min_y_rise - y.iloc[0])
if change_max_y_rise != change_min_y_rise:
hyst_class = _determine_class(
min_diff_area, max_diff_area, h_index,
change_max_y_rise, change_min_y_rise
)
else:
# Check falling limb
min_y_fall = y.iloc[pos_max_x:].min()
max_y_fall = y.iloc[pos_max_x:].max()
change_max_y_fall = abs(max_y_fall - y.iloc[0])
change_min_y_fall = abs(min_y_fall - y.iloc[0])
if change_min_y_fall != change_max_y_fall:
hyst_class = _determine_class(
min_diff_area, max_diff_area, h_index,
change_max_y_fall, change_min_y_fall
)
else:
hyst_class = 0
return hyst_class
def _determine_class(
min_diff_area: float,
max_diff_area: float,
h_index: float,
change_max_y: float,
change_min_y: float
) -> int:
"""
Determine specific hysteresis class based on area metrics.
Returns
-------
hyst_class : int
Hysteresis class (0-8)
"""
if change_max_y > change_min_y:
if min_diff_area > 0 and max_diff_area > 0:
return 1
elif min_diff_area < 0 and max_diff_area < 0:
return 4
elif min_diff_area <= 0 and max_diff_area > 0 and h_index >= 0:
return 2
elif min_diff_area < 0 and max_diff_area >= 0 and h_index < 0:
return 3
else:
return 0 # linearity
else: # change_max_y < change_min_y
if min_diff_area > 0 and max_diff_area > 0:
return 5
elif min_diff_area < 0 and max_diff_area < 0:
return 8
elif min_diff_area <= 0 and max_diff_area > 0 and h_index >= 0:
return 6
elif min_diff_area < 0 and max_diff_area >= 0 and h_index < 0:
return 7
else:
return 0 # linearity
[docs]
def zuecco_plot(df_all, metrics):
"""
Visualize Zuecco hysteresis analysis results in a two-panel plot:
- Left: Hysteresis loop with rising/falling limbs
- Right: Differential area plot
df_all :: Processed DataFrame from calculate_zuecco_metrics
metrics :: Metrics DataFrame from calculate_zuecco_metrics
returns :: fig -> can be shown, saved, or further be processed
"""
from plotly.subplots import make_subplots
# Extract Zuecco-specific data from attrs
limbs_fixed = df_all.attrs.get('limbs_fixed')
diff_area = df_all.attrs.get('diff_area')
x_fixed = df_all.attrs.get('x_fixed')
if limbs_fixed is None or diff_area is None:
raise ValueError(
"DataFrame missing Zuecco data. "
"Ensure it comes from calculate_zuecco_metrics()"
)
title='Zuecco Hysteresis Analysis'
fig = make_subplots(rows=1, cols=2, subplot_titles=('Hysteresis Loop', 'Differential Area'), horizontal_spacing=0.12)
# Left plot: Hysteresis loop
fig.add_trace(go.Scatter(x=df_all['QS'],y=df_all['CS'],mode='lines+markers',marker=dict(size=4, color='steelblue'),line=dict(color='steelblue'),name='Data'),row=1, col=1)
# Add rising limb (limbs_fixed[0])
fig.add_trace(go.Scatter(x=x_fixed,y=limbs_fixed[0],mode='lines+markers',marker=dict(size=6, symbol='circle', color='green'),line=dict(color='green', dash='dot'),name='Rising limb'),row=1, col=1)
# Add falling limb (limbs_fixed[1])
fig.add_trace(go.Scatter(x=x_fixed,y=limbs_fixed[1],mode='lines+markers',marker=dict(size=6, symbol='square', color='red'),line=dict(color='red', dash='dot'),name='Falling limb'),row=1, col=1)
# Right plot: Differential area
x_fixed_plot = x_fixed[:len(diff_area)]
fig.add_trace(go.Scatter(x=x_fixed_plot,y=diff_area.values,mode='lines+markers',marker=dict(size=6, color='red'),line=dict(color='red'),name='ΔA',fill='tozeroy',fillcolor='rgba(255,0,0,0.1)'),row=1, col=2)
# Add zero line
fig.add_hline(y=0, line_dash="solid", line_color="black", line_width=1,row=1, col=2)
# Update axes
fig.update_xaxes(title_text="x (normalized)", row=1, col=1)
fig.update_yaxes(title_text="y (normalized)", row=1, col=1)
fig.update_xaxes(title_text="x (normalized)", row=1, col=2)
fig.update_yaxes(title_text="ΔA (-)", row=1, col=2)
# Add annotation with metrics
annotation_text = (
f"<b>Hysteresis Index:</b> {metrics['h_index'].values[0]:.4f}<br>"
f"<b>Class:</b> {int(metrics['hyst_class'].values[0])}"
)
fig.add_annotation(text=annotation_text,xref="paper", yref="paper",x=0.98, y=0.98,
xanchor='right', yanchor='top',showarrow=False,
bgcolor="rgba(255,255,255,0.8)",bordercolor="black",borderwidth=1,font=dict(size=11)
)
fig.update_layout(title=title,template='plotly_white',height=400,showlegend=True,legend=dict(x=0.01, y=0.99, xanchor='left', yanchor='top'))
return fig