"""
Lawler and Lloyd Hysteresis Index Calculator
Python implementation of the hysteresis index method by Lawler et al. (2006) and Lloyd et al. (2015)
Lawler, D. M., Petts, G. E., Foster, I. D. L., and Harper, S. (2006): Turbidity dynamics during spring
storm events in an urban headwater river system: The Upper Tame, West Midlands, UK, Sci. Total Environ.,
360, 109–126, https://doi.org/10.1016/j.scitotenv.2005.08.032
Lloyd, C. E. M., Freer, J. E., Johnes, P. J., and Collins, A. L. (2016): Using hysteresis analysis of
high-resolution water quality monitoring data, including uncertainty, to infer controls on nutrient
and sediment transfer in catchments, Sci. Total Environ., 543, 388–404,
https://doi.org/10.1016/j.scitotenv.2015.11.028
Lloyd, C. E. M., Freer, J. E., Johnes, P. J., and Collins, A. L. (2016): Technical Note: Testing an
improved index for analysing storm discharge–concentration hysteresis, Hydrol. Earth Syst. Sci., 20,
625–632, https://doi.org/10.5194/hess-20-625-2016
This module calculates hysteresis indices and classification from time series
of two variables (typically discharge and concentration or similar).
Version 2.0 (2025-12-02) - CRITICAL BUG FIX
FIXED: Column indexing bug that caused KeyError when accessing limbs DataFrame
(cc) conrad.jackisch@tbt.tu-freiberg.de
"""
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from scipy.signal import find_peaks
from sklearn.preprocessing import MinMaxScaler
import warnings
[docs]
def calculate_lawlerlloyd_metrics(df_obs, time_col='Etime', discharge_col='Q', concentration_col='C'):
"""
Calculate Lawler-Lloyd hysteresis index.
Parameters
----------
df_obs : pd.DataFrame
Observed data with time, discharge, and concentration columns
time_col, discharge_col, concentration_col : str
Column names for time, discharge, and concentration
Returns
-------
metrics_df : pd.DataFrame
Lawler-Lloyd metrics:
- mean_HI: Mean hysteresis index across percentiles
- median_HI: Median hysteresis index
- HI_range: Range of HI values (max - min)
- mean_abs_HI: Mean of absolute HI values
df_all : pd.DataFrame
Processed time series data with:
- Q, C: Original discharge and concentration values
- QS, CS: Normalized (0-1) discharge and concentration
- Qphase, Cphase: Rising/falling phase indicators
- switchpoints: Peak markers (gQ, gC, lQ, lC)
Plus HI percentile data stored in attrs:
- attrs['HI_percentiles']: DataFrame with Hi, RLi, FLi at each QS percentile
"""
# Validate input
if ((discharge_col not in df_obs.columns) | (concentration_col not in df_obs.columns) | (time_col not in df_obs.columns)):
raise ValueError(f"Columns '{discharge_col}' and/or '{concentration_col}' and/or '{time_col}' not found in dataframe")
# Check for NaN values
if df_obs[discharge_col].isna().any() or df_obs[concentration_col].isna().any():
df_obs = df_obs.dropna()
print('Input data contains NaN values. Rows dropped. Consider data preparation before processing.')
df_all = df_obs[[time_col, discharge_col, concentration_col]].copy()
df_all.columns = ['Etime', 'Q', 'C'] # Rename to standard names
# Prepare time index
if pd.api.types.is_numeric_dtype(df_obs[time_col]):
df_all.index = pd.to_timedelta(df_all['Etime'], unit='D')
else:
df_all.index = df_all['Etime'] - df_all['Etime'].iloc[0]
# Resample and interpolate
df_all = df_all.resample('1h').first().interpolate(method='linear', limit=None, limit_direction='forward')
# Create numeric Etime from index for scaling
df_all['Etime'] = df_all.index.total_seconds() / 86400.0 # Convert to days
# Min-max scaling
df_all[['EtimeS', 'QS', 'CS']] = MinMaxScaler().fit_transform(df_all[['Etime', 'Q', 'C']])
# Find peaks and mark switchpoints
df_all['switchpoints'] = ''
df_all.loc[df_all.index[find_peaks(df_all['Q'])[0]], 'switchpoints'] = 'lQ'
df_all.loc[df_all.index[find_peaks(df_all['C'])[0]], 'switchpoints'] = 'lC'
df_all.loc[df_all.index[df_all['Q'].argmax()], 'switchpoints'] = 'gQ'
df_all.loc[df_all.index[df_all['C'].argmax()], 'switchpoints'] = 'gC'
# Define phases based on discharge
df_all['Qphase'] = 'rising'
df_all.loc[df_all.index[df_all['Q'].argmax()]:, 'Qphase'] = 'falling'
df_all['Cphase'] = 'rising'
df_all.loc[df_all.index[df_all['C'].argmax()]:, 'Cphase'] = 'falling'
# Create limbs using original elegant pandas approach
# Handle duplicates by taking mean of CS values at same QS
rising_limb = df_all.loc[df_all.Qphase == 'rising', ['QS', 'CS']].groupby('QS').mean()['CS']
falling_limb = df_all.loc[df_all.Qphase == 'falling', ['QS', 'CS']].groupby('QS').mean()['CS']
limbs = pd.concat([rising_limb, falling_limb], axis=1, sort=True).interpolate(method='linear')
# *** CRITICAL FIX (v2.0): Explicitly set column names to integer indices ***
limbs.columns = [0, 1] # Use integer column names for compatibility
# Calculate both Lawler (2006) and Lloyd (2016) HI methods at QS percentiles
# Lawler et al. (2006) - Equations 1 & 2: Ratio-based method
# Lloyd et al. (2016) - Equation 5: Difference-based method (normalized)
percentiles = np.arange(0.1, 1.0, 0.1)
HI_data = pd.DataFrame(index=percentiles, columns=['HIL', 'HInew', 'RLi', 'FLi'])
for i in percentiles:
# Sample limbs at QS percentile VALUE i (not index position!)
if i in limbs.index:
C_rise = limbs.loc[i, 0]
C_fall = limbs.loc[i, 1]
else:
# Interpolate if exact percentile not in index
C_rise = np.interp(i, limbs.index, limbs.iloc[:, 0])
C_fall = np.interp(i, limbs.index, limbs.iloc[:, 1])
# Store limb values
HI_data.loc[i, 'RLi'] = C_rise
HI_data.loc[i, 'FLi'] = C_fall
# METHOD 1: Lawler et al. (2006) - Ratio method (Eq. 1 & 2 from Paper)
if C_rise > C_fall: # Clockwise
if C_fall == 0:
HI_data.loc[i, 'HIL'] = np.nan # Undefined
else:
HI_data.loc[i, 'HIL'] = (C_rise / C_fall) - 1
else: # Anticlockwise
if C_rise == 0:
HI_data.loc[i, 'HIL'] = np.nan # Undefined
else:
HI_data.loc[i, 'HIL'] = (-1 / (C_rise / C_fall)) + 1
# METHOD 2: Lloyd et al. (2016) - Difference method (Eq. 5 from Technical Note (recommended method))
C_mid = max(C_rise, C_fall)
if C_mid == 0:
HI_data.loc[i, 'HInew'] = 0
else:
HI_data.loc[i, 'HInew'] = (C_rise - C_fall) / C_mid
# Calculate summary metrics for both methods
# Suppress "Mean of empty slice" warnings when all values are NaN
with warnings.catch_warnings():
warnings.filterwarnings('ignore', message='Mean of empty slice')
warnings.filterwarnings('ignore', category=RuntimeWarning)
metric_df = pd.DataFrame({
# Lawler method (ratio-based)
'mean_HIL': [HI_data['HIL'].mean()],
'median_HIL': [HI_data['HIL'].median()],
'HIL_range': [HI_data['HIL'].max() - HI_data['HIL'].min()],
# Lloyd new method (difference-based, recommended)
'mean_HInew': [HI_data['HInew'].mean()],
'median_HInew': [HI_data['HInew'].median()],
'HInew_range': [HI_data['HInew'].max() - HI_data['HInew'].min()],
'mean_abs_HInew': [HI_data['HInew'].abs().mean()]
})
# Store percentile data in attrs for plotting
metric_df.attrs['HI_percentiles'] = HI_data
return metric_df, df_all
[docs]
def lloyd_plot(df_all, metrics):
"""
Visualize Lawler-Lloyd hysteresis analysis in a two-panel plot:
- Left: Hysteresis loop with rising/falling limbs
- Right: HI index across discharge percentiles
df_all :: Processed DataFrame from calculate_lawlerlloyd_metrics
metrics :: Metrics DataFrame from calculate_lawlerlloyd_metrics
returns :: fig -> can be shown, saved, or further be processed
"""
from plotly.subplots import make_subplots
# Extract Lloyd-specific data from attrs
HI_percentiles = metrics.attrs.get('HI_percentiles')
if HI_percentiles is None:
raise ValueError("DataFrame missing Lloyd HI data. Ensure it comes from calculate_lawlerlloyd_metrics()")
title = 'Lawler-Lloyd Hysteresis Analysis'
fig = make_subplots(rows=1, cols=2, subplot_titles=('Hysteresis Loop', 'HI Index vs Discharge Percentile'), horizontal_spacing=0.12)
# Left plot: Hysteresis loop
fig.add_trace(go.Scatter(x=df_all['QS'],y=df_all['CS'],mode='lines+markers',marker=dict(size=4, color='steelblue'),line=dict(color='steelblue'),name='Data'),row=1, col=1)
# Add percentile markers on rising and falling limb
fig.add_trace(go.Scatter(x=HI_percentiles.index,y=HI_percentiles['RLi'],mode='markers',marker=dict(size=8, symbol='circle', color='green'),name='Rising limb'),row=1, col=1)
fig.add_trace(go.Scatter(x=HI_percentiles.index,y=HI_percentiles['FLi'],mode='markers',marker=dict(size=8, symbol='square', color='red'),name='Falling limb'),row=1, col=1)
# Right plot: Both HI methods across percentiles Lloyd et al. (2016) new method (recommended)
fig.add_trace(go.Scatter(x=HI_percentiles.index,y=HI_percentiles['HInew'],mode='lines+markers',marker=dict(size=6, color='purple'),line=dict(color='purple', width=2),name='HI<sub>new</sub> (Lloyd 2016)',fill='tozeroy',fillcolor='rgba(128,0,128,0.1)'),row=1, col=2)
# Lawler et al. (2006) original method
fig.add_trace(go.Scatter(x=HI_percentiles.index,y=HI_percentiles['HIL'],mode='lines+markers',marker=dict(size=5, color='orange'),line=dict(color='orange', width=1, dash='dot'),name='HI<sub>L</sub> (Lawler 2006)'),row=1, col=2)
# Add zero line
fig.add_hline(y=0, line_dash="solid", line_color="black", line_width=1, row=1, col=2)
# Update axes
fig.update_xaxes(title_text="Discharge (normalized)", row=1, col=1)
fig.update_yaxes(title_text="Concentration (normalized)", row=1, col=1)
fig.update_xaxes(title_text="Discharge Percentile", row=1, col=2)
fig.update_yaxes(title_text="HI (-)", row=1, col=2)
# Add annotation with metrics (showing recommended method)
annotation_text = (
f"<b>Lloyd (2016) - Recommended:</b><br>"
f"Mean HI<sub>new</sub>: {metrics['mean_HInew'].values[0]:.4f}<br>"
f"Median HI<sub>new</sub>: {metrics['median_HInew'].values[0]:.4f}<br>"
f"<br><b>Lawler (2006) - Original:</b><br>"
f"Mean HI<sub>L</sub>: {metrics['mean_HIL'].values[0]:.4f}<br>"
f"Median HI<sub>L</sub>: {metrics['median_HIL'].values[0]:.4f}"
)
fig.add_annotation(text=annotation_text,xref="paper", yref="paper",x=0.98, y=0.98,xanchor='right', yanchor='top',showarrow=False,bgcolor="rgba(255,255,255,0.8)",bordercolor="black",borderwidth=1,font=dict(size=11))
fig.update_layout(title=title,template='plotly_white',height=400,showlegend=True,legend=dict(x=0.01, y=0.99, xanchor='left', yanchor='top'))
return fig