Source code for hygcs.gcs_core

"""
GCS Core Analysis Functions
============================
Core functions for C-Q analysis including hysteresis metrics,
CVc/CVq ratios, C-Q slope calculations, and preparatory statistics.

(cc-by) Version 0.5 (2025-12-02) conrad.jackisch@tbt.tu-freiberg.de, antita.sanchez@mineral.tu-freiberg.de
"""

import pandas as pd
import numpy as np
import plotly.express as px
from typing import Tuple, Dict, List, Optional
import warnings

# Import individual hysteresis analysis modules
from .harp import calculate_harp_metrics
from .zuecco import calculate_zuecco_metrics
from .lloyd import calculate_lawlerlloyd_metrics


# =============================================================================
# HYSTERESIS METRICS
# =============================================================================

[docs] def calculate_all_hysteresis_metrics( df: pd.DataFrame, time_col: str = 'date', discharge_col: str = 'Q', concentration_col: str = 'C' ) -> Dict[str, any]: """ Calculate hysteresis metrics using all three methods (HARP, Zuecco, Lloyd). This function applies scientifically validated hysteresis analysis methods to time series data (typically a moving window around each segment). Parameters ---------- df : pd.DataFrame Time series data containing discharge and concentration (length >= 5 recommended) time_col : str Column name for time values discharge_col : str Column name for discharge/flow values concentration_col : str Column name for concentration values Returns ------- dict Dictionary containing: - harp_metrics: dict from calculate_harp_metrics() - zuecco_metrics: dict from calculate_zuecco_metrics() - lloyd_metrics: dict from calculate_lawlerlloyd_metrics() - classifications: dict with hysteresis direction classifications - processed_data: dict with processed DataFrames from each method - error: str if calculation failed """ if len(df) < 3: return { 'harp_metrics': {}, 'zuecco_metrics': {}, 'lloyd_metrics': {}, 'classifications': {}, 'processed_data': {}, 'error': 'Insufficient data points (need >= 3)' } try: # Calculate HARP metrics harp_metrics, harp_df = calculate_harp_metrics( df, time_col=time_col, discharge_col=discharge_col, concentration_col=concentration_col ) # Calculate Zuecco metrics zuecco_metrics, zuecco_df = calculate_zuecco_metrics( df, time_col=time_col, discharge_col=discharge_col, concentration_col=concentration_col ) # Calculate Lloyd metrics lloyd_metrics, lloyd_df = calculate_lawlerlloyd_metrics( df, time_col=time_col, discharge_col=discharge_col, concentration_col=concentration_col ) # Create classifications based on each method classifications = {} # HARP classification (if available) if 'hyst_class' in harp_metrics.columns: classifications['harp_class'] = harp_metrics['hyst_class'].values[0] # Zuecco classification if 'hyst_class' in zuecco_metrics.columns: classifications['zuecco_class'] = int(zuecco_metrics['hyst_class'].values[0]) # Lloyd classification (based on HInew recommended method) if 'mean_HInew' in lloyd_metrics.columns: mean_hinew = lloyd_metrics['mean_HInew'].values[0] if not np.isnan(mean_hinew): if mean_hinew > 0.1: classifications['lloyd_direction'] = 'clockwise' elif mean_hinew < -0.1: classifications['lloyd_direction'] = 'counter-clockwise' else: classifications['lloyd_direction'] = 'weak' else: classifications['lloyd_direction'] = 'unknown' # Also classify Lawler method for comparison if 'mean_HIL' in lloyd_metrics.columns: mean_hil = lloyd_metrics['mean_HIL'].values[0] if not np.isnan(mean_hil): if mean_hil > 0.1: classifications['lawler_direction'] = 'clockwise' elif mean_hil < -0.1: classifications['lawler_direction'] = 'counter-clockwise' else: classifications['lawler_direction'] = 'weak' else: classifications['lawler_direction'] = 'unknown' return { 'harp_metrics': harp_metrics.to_dict('records')[0] if len(harp_metrics) > 0 else {}, 'zuecco_metrics': zuecco_metrics.to_dict('records')[0] if len(zuecco_metrics) > 0 else {}, 'lloyd_metrics': lloyd_metrics.to_dict('records')[0] if len(lloyd_metrics) > 0 else {}, 'classifications': classifications, 'processed_data': { 'harp': harp_df, 'zuecco': zuecco_df, 'lloyd': lloyd_df } } except Exception as e: return { 'harp_metrics': {}, 'zuecco_metrics': {}, 'lloyd_metrics': {}, 'classifications': {}, 'processed_data': {}, 'error': f'Calculation failed: {str(e)}' }
# ============================================================================= # CVc/CVq AND C-Q SLOPE ANALYSIS # =============================================================================
[docs] def compute_cvc_cvq_windows(df, qcol="Q_mLs", ccol="Zn_mgL", window=5): """ Compute CVc/CVq ratios and C-Q slopes over rolling windows. Based on Musolff et al. (2015) for chemostatic vs chemodynamic behavior detection. https://doi.org/10.1016/j.advwatres.2015.09.026 Parameters ---------- df : pd.DataFrame Input dataframe with time series of concentration and flow measurements qcol : str Name of the flow column ccol : str Name of the concentration column window : int Number of measurements to use for rolling window analysis Returns ------- pd.DataFrame DataFrame with CVc/CVq analysis for each window, including: - CVc, CVq: Coefficients of variation - CVc_CVq: Ratio (>1 = chemodynamic, <1 = chemostatic) - cq_slope_loglog: C-Q slope in log-log space (power-law exponent b) """ df_sorted = df.sort_values(["site_id", "date"]).copy() rows = [] for site, g in df_sorted.groupby("site_id"): g = g.reset_index(drop=True) for i in range(window - 1, len(g)): block = g.iloc[i - window + 1 : i + 1] mu_q = block[qcol].mean() sd_q = block[qcol].std(ddof=1) mu_c = block[ccol].mean() sd_c = block[ccol].std(ddof=1) CVq = sd_q / mu_q if mu_q != 0 else np.nan CVc = sd_c / mu_c if mu_c != 0 else np.nan ratio = CVc / CVq if (CVq not in [0, np.nan]) else np.nan # Calculate C-Q slope using log-log regression with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning) try: figkx = px.scatter(y=np.log(block[ccol]), x=np.log(block[qcol]), trendline='ols', template='none') results = px.get_trendline_results(figkx) slope = results.loc[0].px_fit_results.params[1] except: slope = np.nan rows.append({ "site_id": site, "compound": ccol, "window_id": i - window + 1, "start_date": block["date"].min(), "end_date": block["date"].max(), "n_samples": len(block), "CVq": CVq, "CVc": CVc, "CVc_CVq": ratio, "cq_slope_loglog": slope }) return pd.DataFrame(rows)
[docs] def compute_cq_slope(q1: float, q2: float, c1: float, c2: float, kind: str = 'loglog') -> float: """ Compute the local C-Q slope for a segment between two consecutive points. Parameters ---------- q1, q2 : float Discharge values at segment start and end. c1, c2 : float Concentration values at segment start and end. kind : {'loglog','linear'}, optional - 'loglog' returns Δlog(C) / Δlog(Q) (i.e., power-law slope b) - 'linear' returns ΔC / ΔQ Returns ------- float The requested slope, or NaN if undefined. """ if kind == 'linear': dq = q2 - q1 if abs(dq) < 1e-12: return np.nan return (c2 - c1) / dq # default: log-log slope # guard against non-positive values (log undefined) if q1 <= 0 or q2 <= 0 or c1 <= 0 or c2 <= 0: return np.nan with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=RuntimeWarning) dlog10q = np.log10(q2) - np.log10(q1) if abs(dlog10q) < 1e-12: return np.nan dlog10c = np.log10(c2) - np.log10(c1) return dlog10c / dlog10q
# ============================================================================= # SEGMENT FLOW DYNAMICS ANALYSIS # =============================================================================
[docs] def analyze_segment_flow_dynamics( segment_data: pd.DataFrame, percentiles: Dict, ccol: Optional[str] = None, qcol: Optional[str] = None ) -> Dict: """ Analyze high-resolution Q dynamics within a segment, with window-scale hysteresis analysis. Integrates hourly discharge data to capture within-segment flow dynamics that may not be apparent from sampling-point averages. Calculates hysteresis metrics on the time window (not event-scale or point-to-point). Parameters ---------- segment_data : pd.DataFrame High-resolution time series for the segment window percentiles : dict Global percentile thresholds for Q levels ccol : str, optional Name of concentration column qcol : str, optional Name of flow column Returns ------- dict or None Dictionary containing: - peak_q: Maximum Q in segment - peak_time: When peak occurred - days_since_peak: Days from peak to segment end - days_to_peak: Days from segment start to peak - flow_phase: 'rising' | 'at_peak' | 'early_decline' | 'late_decline' | 'low' - peak_position: 'early' | 'middle' | 'late' in segment - q_trend: slope of Q over segment - q_acceleration: change in Q rate - q_level: 'low' | 'medium' | 'high' - q_range: Q variability within segment - window_HI_* : Window-scale hysteresis metrics (if ccol provided) Returns None if no data available """ # Extract segment data if qcol is None: qcol = segment_data.columns[0] ccol = segment_data.columns[1] if len(segment_data) == 0 or segment_data[qcol].isna().all(): return None segment_data = segment_data.dropna() if len(segment_data) < 2: return None q_values = segment_data[qcol].values c_values = segment_data[ccol].values if ccol else None times = segment_data.index.values # Find peak peak_idx = np.argmax(q_values) peak_q = q_values[peak_idx] peak_time_idx = segment_data.index[peak_idx] peak_time = times[peak_idx] # Calculate temporal position of peak segment_duration = pd.Timedelta(times[-1] - times[0]).total_seconds() / 86400 # days days_to_peak = pd.Timedelta(peak_time - times[0]).total_seconds() / 86400 days_since_peak = pd.Timedelta(times[-1] - peak_time).total_seconds() / 86400 if segment_duration > 0: peak_position_frac = days_to_peak / segment_duration if peak_position_frac < 0.33: peak_position = 'early' elif peak_position_frac > 0.67: peak_position = 'late' else: peak_position = 'middle' else: peak_position = 'middle' # Determine flow phase based on peak location and current position end_q = q_values[-1] start_q = q_values[0] # Compare end_q to percentiles if end_q > percentiles['Q_p75']: q_level = 'high' elif end_q < percentiles['Q_p25']: q_level = 'low' else: q_level = 'medium' # Flow phase logic if days_since_peak < 1: # Peak within last day flow_phase = 'at_peak' elif days_since_peak < segment_duration * 0.3: # Recent peak if end_q > percentiles['Q_p50']: flow_phase = 'early_decline' # Declining but still high else: flow_phase = 'late_decline' elif days_to_peak < segment_duration * 0.3: # Peak was early flow_phase = 'post_peak' elif peak_position == 'late' and end_q > start_q: flow_phase = 'rising' elif q_level == 'low': flow_phase = 'low' else: # Use trend if end_q > start_q * 1.1: flow_phase = 'rising' elif end_q < start_q * 0.9: if q_level == 'high': flow_phase = 'early_decline' else: flow_phase = 'late_decline' else: flow_phase = 'stable' # Calculate trend (linear fit) if len(q_values) > 2: x = np.arange(len(q_values)) q_trend = np.polyfit(x, q_values, 1)[0] # slope per hour q_trend_daily = q_trend * 24 # slope per day # Acceleration (change in trend) mid = len(q_values) // 2 trend1 = np.polyfit(x[:mid], q_values[:mid], 1)[0] if mid > 1 else 0 trend2 = np.polyfit(x[mid:], q_values[mid:], 1)[0] if len(x) - mid > 1 else 0 q_acceleration = (trend2 - trend1) * 24 # per day else: q_trend_daily = (end_q - start_q) / max(1, segment_duration) q_acceleration = 0 result = { 'peak_q': peak_q, 'peak_time': peak_time, 'days_since_peak': days_since_peak, 'days_to_peak': days_to_peak, 'flow_phase': flow_phase, 'peak_position': peak_position, 'q_trend': q_trend_daily, 'q_acceleration': q_acceleration, 'q_level': q_level, 'q_range': q_values.max() - q_values.min(), 'start_q': start_q, 'end_q': end_q } # Calculate HI on the time window (not event-scale) if ccol is not None and c_values is not None: segment_data_copy = segment_data.copy() segment_data_copy['date'] = segment_data_copy.index try: if len(segment_data_copy) >= 5: # Calculate WINDOW hysteresis metrics window_hyst = calculate_all_hysteresis_metrics( segment_data_copy, time_col='date', discharge_col=qcol, concentration_col=ccol ) # Extract window-scale metrics harp_win = window_hyst.get('harp_metrics', {}) zuecco_win = window_hyst.get('zuecco_metrics', {}) lloyd_win = window_hyst.get('lloyd_metrics', {}) class_win = window_hyst.get('classifications', {}) # Store essential metrics result.update({ 'window_HI_harp': harp_win.get('area', np.nan), 'window_HI_zuecco': zuecco_win.get('h_index', np.nan), 'window_HI_lloyd': lloyd_win.get('mean_HInew', np.nan), 'window_direction': class_win.get('lloyd_direction', 'unknown') }) except Exception as e: # Silently skip if hysteresis calculation fails print(f'Window hysteresis analysis failed: {str(e)}') pass return result
# ============================================================================= # STATISTICAL THRESHOLDS # =============================================================================
[docs] def compute_change_percentiles( data: pd.DataFrame, sites: List[str], ccol: str, qcol: str ) -> Dict[str, float]: """ Calculate percentiles for concentration and flow changes (ΔC and ΔQ). These percentile-based thresholds make the classification compound-agnostic by adapting to the specific distribution of each compound's dynamics. Parameters ---------- data : pd.DataFrame Time series data sites : list of str Sites to include ccol : str Concentration column qcol : str Flow column Returns ------- dict Percentile thresholds for dC and dQ at various levels (p01, p05, p08, p10, p25, p50, p75, p90, p95) """ changes = [] for site in sites: site_data = data[data['site_id'] == site].sort_values('date') if len(site_data) > 1: conc_diffs = site_data[ccol].diff().dropna() flow_diffs = site_data[qcol].diff().dropna() changes.extend([ {'conc_diff': cd, 'flow_diff': fd} for cd, fd in zip(conc_diffs, flow_diffs) ]) changes_df = pd.DataFrame(changes) percentiles = { # Concentration change percentiles 'dC_p01': changes_df['conc_diff'].quantile(0.01), 'dC_p05': changes_df['conc_diff'].quantile(0.05), 'dC_p08': changes_df['conc_diff'].quantile(0.08), # Optimized for flushing detection 'dC_p10': changes_df['conc_diff'].quantile(0.10), 'dC_p25': changes_df['conc_diff'].quantile(0.25), 'dC_p50': changes_df['conc_diff'].quantile(0.50), 'dC_p75': changes_df['conc_diff'].quantile(0.75), 'dC_p90': changes_df['conc_diff'].quantile(0.90), # Large increase threshold 'dC_p95': changes_df['conc_diff'].quantile(0.95), # Flow change percentiles 'dQ_p05': changes_df['flow_diff'].quantile(0.05), 'dQ_p10': changes_df['flow_diff'].quantile(0.10), 'dQ_p25': changes_df['flow_diff'].quantile(0.25), 'dQ_p50': changes_df['flow_diff'].quantile(0.50), 'dQ_p75': changes_df['flow_diff'].quantile(0.75), 'dQ_p90': changes_df['flow_diff'].quantile(0.90), # Absolute change percentiles (for stability detection) 'abs_dC_p50': changes_df['conc_diff'].abs().quantile(0.50), 'abs_dC_p75': changes_df['conc_diff'].abs().quantile(0.75), 'abs_dQ_p50': changes_df['flow_diff'].abs().quantile(0.50), 'abs_dQ_p75': changes_df['flow_diff'].abs().quantile(0.75), } return percentiles
# ============================================================================= # MODULE EXPORTS # ============================================================================= __all__ = [ 'calculate_all_hysteresis_metrics', 'compute_cvc_cvq_windows', 'compute_cq_slope', 'analyze_segment_flow_dynamics', 'compute_change_percentiles' ] print("gcs_core.py: core metrics and preparatory functions loaded")