"""
HARP (Hysteresis Analysis of Residence time and Production) Metrics Calculator
Python translation of the original R code version 2.2 (July 2024)
by Melanie E. Roberts et al. https://github.com/MelanieEmmajade/HARP
Roberts, M.E. (2023). HARP Hysteresis Metrics - an R implementation (v1.0.0).
Zenodo. https://doi.org/10.5281/zenodo.8409091
Roberts, M.E., Kim, D., Lu, J. & Hamilton, D.P., HARP: A suite of parameters
to describe the hysteresis of streamflow and water quality constituents,
Journal of Hydrology, 2023, https://doi.org/10.1016/j.jhydrol.2023.130262
This module calculates hysteresis metrics from time series of discharge (Q)
and concentration (C) data for hydrochemical analysis.
(cc) conrad.jackisch@tbt.tu-freiberg.de
"""
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from sklearn.preprocessing import MinMaxScaler
from scipy.signal import find_peaks
# Optional intersection libraries
try:
from shapely.geometry import LineString
HAS_SHAPELY = True
except ImportError:
HAS_SHAPELY = False
try:
import poly_point_isect as ppi
HAS_PPI = True
except ImportError:
HAS_PPI = False
[docs]
def calculate_harp_metrics(df_obs, time_col='Etime', discharge_col='Q', concentration_col='C', intersection_method='auto'):
"""
Calculate HARP metrics from discharge and concentration time series.
Parameters
----------
df_obs : pd.DataFrame
Observed data with time, discharge, and concentration columns
time_col, discharge_col, concentration_col : str
Column names for time, discharge, and concentration
intersection_method : str, default 'auto'
'auto', 'shapely', 'bentley-ottmann', or 'simple'
Returns
-------
metric_df : pd.DataFrame
HARP metrics:
- area: Total hysteresis area (magnitude) with sign of dominant portion
Positive = diluting, Negative = enriching
- residual: Change in scaled concentration (end - start)
Positive = enriching, Negative = diluting
- area_lower/area_upper: Areas of lower/upper portions (if intersection found)
- peak_Q/peak_C: Scaled peak timings (0-1)
- peaktime_Q/peaktime_C: Absolute peak timings (days)
- radius_equiv: Equivalent circle radius
df_all : pd.DataFrame
Processed data with scaled values and phase information
"""
df_all = df_obs[[time_col, discharge_col, concentration_col]].copy()
df_all.columns = ['Etime', 'Q', 'C'] # Rename to standard names
# Prepare time index
if pd.api.types.is_numeric_dtype(df_obs[time_col]):
df_all.index = pd.to_timedelta(df_all['Etime'], unit='D')
else:
df_all.index = df_all['Etime'] - df_all['Etime'].iloc[0]
# Resample and interpolate
df_all = df_all.resample('1h').first().interpolate(method='linear', limit=None, limit_direction='forward')
# Create numeric Etime from index for scaling
df_all['Etime'] = df_all.index.total_seconds() / 86400.0 # Convert to days
# Min-max scaling
df_all[['EtimeS', 'QS', 'CS']] = MinMaxScaler().fit_transform(df_all[['Etime', 'Q', 'C']])
# Find peaks and mark switchpoints
df_all['switchpointsQ'] = ''
df_all['switchpointsC'] = ''
df_all.loc[df_all.index[find_peaks(df_all['Q'])[0]], 'switchpointsQ'] = 'lQ'
df_all.loc[df_all.index[df_all['Q'].argmax()], 'switchpointsQ'] = 'gQ'
df_all.loc[df_all.index[find_peaks(df_all['C'])[0]], 'switchpointsC'] = 'lC'
df_all.loc[df_all.index[df_all['C'].argmax()], 'switchpointsC'] = 'gC'
# Define phases based on discharge
df_all['Qphase'] = 'rising'
df_all.loc[df_all.index[df_all['Q'].argmax()]:, 'Qphase'] = 'falling'
df_all['Cphase'] = 'rising'
df_all.loc[df_all.index[df_all['C'].argmax()]:, 'Cphase'] = 'falling'
# Create limbs using pandas approach (handle duplicates with groupby().mean())
rising_limb = df_all.loc[df_all.Qphase == 'rising', ['QS', 'CS']].groupby('QS').mean()['CS']
falling_limb = df_all.loc[df_all.Qphase == 'falling', ['QS', 'CS']].groupby('QS').mean()['CS']
limbs = pd.concat([rising_limb, falling_limb], axis=1, sort=True).interpolate(method='linear')
limbs.columns = [0, 1] # Use integer column names for compatibility
# Find intersection point
xQS = _find_intersection(limbs, method=intersection_method)
if xQS is not None:
# Mark intersection
xID = df_all.index[abs(df_all.QS - xQS).argmin()]
df_all.loc[xID, 'switchpointsQ'] = 'X'
df_all.loc[xID, 'switchpointsC'] = 'X'
# Calculate areas for lower and upper portions (preserve signs)
rising1 = df_all.loc[(df_all.QS <= xQS) & (df_all.Qphase == 'rising'), ['QS', 'CS']].groupby('QS').mean()['CS']
falling1 = df_all.loc[(df_all.QS <= xQS) & (df_all.Qphase == 'falling'), ['QS', 'CS']].groupby('QS').mean()['CS']
limbs1 = pd.concat([rising1, falling1], axis=1, sort=True).interpolate(method='linear')
limbs1.columns = [0, 1]
rising2 = df_all.loc[(df_all.QS > xQS) & (df_all.Qphase == 'rising'), ['QS', 'CS']].groupby('QS').mean()['CS']
falling2 = df_all.loc[(df_all.QS > xQS) & (df_all.Qphase == 'falling'), ['QS', 'CS']].groupby('QS').mean()['CS']
limbs2 = pd.concat([rising2, falling2], axis=1, sort=True).interpolate(method='linear')
limbs2.columns = [0, 1]
area1 = ((limbs1[0] - limbs1[1]) * limbs1.index.diff()).sum()
area2 = ((limbs2[0] - limbs2[1]) * limbs2.index.diff()).sum()
# Total area: magnitude sum with sign of larger portion
total_area = abs(area1) + abs(area2)
area_sign = np.sign(area1) if abs(area1) > abs(area2) else np.sign(area2)
area = area_sign * total_area
else:
# No intersection found - calculate total area
area = ((limbs[0] - limbs[1]) * limbs.index.diff()).sum()
area1 = np.nan
area2 = np.nan
# Residual: change in concentration from start to finish
residual = df_all['CS'].iloc[-1] - df_all['CS'].iloc[0]
# Calculate equivalent circle radius
radius = np.sqrt(abs(area) / np.pi)
# Extract peak timing metrics
peak_q_time = df_all.loc[df_all.switchpointsQ == 'gQ'].index[0]
peak_c_time = df_all.loc[df_all.switchpointsC == 'gC'].index[0]
metric_df = pd.DataFrame({
'area': [area],
'residual': [residual],
'area_lower': [area1],
'area_upper': [area2],
'peak_Q': [df_all.loc[df_all.switchpointsQ == 'gQ', 'EtimeS'].values[0]],
'peak_C': [df_all.loc[df_all.switchpointsC == 'gC', 'EtimeS'].values[0]],
'peaktime_Q': [peak_q_time.days + peak_q_time.seconds / 86400.0],
'peaktime_C': [peak_c_time.days + peak_c_time.seconds / 86400.0],
'radius_equiv': [radius]
})
metric_df['dQpeak'] = metric_df['peak_Q']/metric_df['peaktime_Q']
metric_df['dCpeak'] = metric_df['peak_C']/metric_df['peaktime_C']
return metric_df, df_all
def _find_intersection(limbs, method='auto'):
"""Find intersection between rising and falling limbs."""
# Auto-select method
if method == 'auto':
if HAS_SHAPELY:
method = 'shapely'
elif HAS_PPI:
method = 'bentley-ottmann'
else:
method = 'simple'
limbs_clean = limbs.dropna()
if len(limbs_clean) < 2:
return None
if method == 'shapely':
if not HAS_SHAPELY:
raise ImportError("shapely not available. Install: pip install shapely")
coords1 = np.column_stack([limbs_clean.index, limbs_clean[0]])
coords2 = np.column_stack([limbs_clean.index, limbs_clean[1]])
line1 = LineString(coords1)
line2 = LineString(coords2)
intersection = line1.intersection(line2)
if intersection.is_empty:
return None
elif intersection.geom_type == 'Point':
return intersection.x
elif intersection.geom_type == 'MultiPoint':
return np.median([pt.x for pt in intersection.geoms])
return None
elif method == 'bentley-ottmann':
if not HAS_PPI:
raise ImportError("poly_point_isect not available")
try:
result = ppi.isect_polygon(limbs_clean.values)
if len(result) > 0:
# Find closest point on the index
return limbs_clean.index[abs(limbs_clean.mean(axis=1) - result[0][0]).argmin()]
except:
return None
elif method == 'simple':
# Find where limbs cross by checking sign changes
diff = limbs_clean[0] - limbs_clean[1]
sign_changes = np.where(np.diff(np.sign(diff)))[0]
if len(sign_changes) > 0:
# Return first crossing point
return limbs_clean.index[sign_changes[0]]
return None
else:
raise ValueError(f"Unknown method: {method}")
def draw_circle(radius, center_x=0.5, center_y=0.5, n_points=100):
'''Generate circle coordinates for visualization.
radius :: equiv radius from harp calculation
center_x/center_y :: optional to center the circle in the plot
npoints :: points on the circle
returns :: x, y as coordinates'''
t = np.linspace(0, 2 * np.pi, n_points)
x = center_x + radius * np.cos(t)
y = center_y + radius * np.sin(t)
return x, y
[docs]
def harp_plot(df_processed,metrics):
'''Visualize hysteresis loop
df_processed :: df_all output from calculate_harp_metrics
metrics :: metrics output from calculate_harp_metrics
returns :: fig -> can be shown, saved, or further be processed
'''
fig = go.Figure()
# Plot data colored by time
fig.add_trace(go.Scatter(x=df_processed['QS'],y=df_processed['CS'],mode='markers+lines',
marker=dict(color=df_processed['Etime'], colorscale='Viridis', showscale=True,colorbar=dict(title='Time (days)')),
name='Hysteresis loop'))
# Add equivalent area circle
circ_x, circ_y = draw_circle(metrics['radius_equiv'].values[0])
fig.add_trace(go.Scatter(x=circ_x, y=circ_y,mode='lines',line=dict(color='pink', width=2, dash='dot'),
name=f'Equiv. area (r={metrics["radius_equiv"].values[0]:.3f})'))
# Mark intersection point if exists
if 'X' in df_processed['switchpointsQ'].values:
x_point = df_processed.loc[df_processed['switchpointsQ'] == 'X', 'QS']
y_point = df_processed.loc[df_processed['switchpointsC'] == 'X', 'CS']
fig.add_trace(go.Scatter(x=x_point, y=y_point,mode='markers',
marker=dict(color='pink', size=12, symbol='x'),name='Intersection'))
fig.update_layout(template='none',xaxis_title='Discharge (scaled)',yaxis_title='Concentration (scaled)',title='C-Q Hysteresis Loop')
return fig