Source code for hygcs.gcs_classification

"""
GCS Classification Functions
=============================
Geochemical phase classification using percentile-based thresholds,
hysteresis indices, C-Q slopes, and high-resolution flow dynamics.

Scientific basis:
- Percentile-based thresholds for compound-agnostic classification
- C-Q slope reveals mechanistic processes:
  * Positive (b > 0): Dilution-dominated (flushing signature)
  * Negative (b < 0): Enrichment (loading signature)  
  * Near-zero (|b| < 0.1): Chemostatic buffering
- Window-scale hysteresis captures temporal dynamics
- Hierarchical rules prioritize phase detection

Implements hierarchical rule-based classification for 6 geochemical phases:
- Flushing (F): Rapid mobilization as steep concentration decline during high flow, positive C-Q slope
- Loading (L): Accumulation phase by concentration rising to maximum, negative C-Q slope
- Chemostatic (C): Low hysteresis, low-variability, stable behavior, flat C-Q slope
- Dilution (D): Post-flush recovery, declining flow
- Recession (R): Late cycle, low CVc/CVq, both declining
- Variable (V): Ambiguous/mixed patterns

Version 0.5 (2025-12-02) (cc-by) conrad.jackisch@tbt.tu-freiberg.de, antita.sanchez@mineral.tu-freiberg.de
"""

import pandas as pd
import numpy as np
from typing import List, Tuple, Dict, Optional

# Import from core module (renamed functions in v0.5)
from .gcs_core import (
    compute_cvc_cvq_windows,
    compute_cq_slope,
    analyze_segment_flow_dynamics,
    compute_change_percentiles
)


[docs] def classify_segment_phase( row: pd.Series, percentiles: Dict ) -> Tuple[str, float, List[str]]: """ Classify a single segment into one of 6 geochemical phases. Classification logic with high-resolution Q dynamics using percentile-based thresholds. Implements hierarchical rule-based classification for 6 geochemical phases: - Flushing (F): Steep concentration decline during high flow, positive C-Q slope - Loading (L): Concentration rising to maximum, negative C-Q slope - Chemostatic (C): Low hysteresis, stable behavior, flat C-Q slope - Dilution (D): Post-flush recovery, declining flow - Recession (R): Late cycle, low CVc/CVq, both declining - Variable (V): Ambiguous/mixed patterns row :: pd.Series Segment data with hysteresis, flow, temporal context, and C-Q slope percentiles :: dict Percentile thresholds for classification Returns :: tuple (phase, confidence, rules_triggered) - phase: str, one of 'F', 'L', 'C', 'D', 'R', 'V' - confidence: float, 0.0-1.0 - rules_triggered: list of str, diagnostic information """ # Extract variables hi_zuecco = row.get('window_HI_zuecco', 0.0) hi_lloyd = row.get('window_HI_lloyd', 0.0) hi_harp = row.get('window_HI_harp', 0.0) # Use Zuecco as primary (most robust), with fallback if not np.isnan(hi_zuecco): hi = hi_zuecco elif not np.isnan(hi_lloyd): hi = hi_lloyd elif not np.isnan(hi_harp): hi = hi_harp else: hi = 0.0 q_pos = row.get('Q_position', 'medium') c_pos = row.get('C_position', 'medium') flow_diff = row.get('flow_diff', 0) conc_diff = row.get('conc_diff', 0) cvc_cvq = row.get('CVc_CVq', np.nan) # Extract C-Q slope cq_slope = row.get('cq_slope_loglog', np.nan) # High-res dynamics flow_phase = row.get('highres_flow_phase', 'unknown') days_since_peak = row.get('highres_days_since_peak', np.nan) q_level = row.get('highres_q_level', q_pos) # Temporal context hi_transition = row.get('HI_transition', 'stable') prev_c_pos = row.get('prev_C_position', 'none') prev_conc_diff = row.get('prev_conc_diff', 0) prev2_conc_diff = row.get('prev2_conc_diff', 0) c_trajectory = row.get('C_trajectory', 'stable') wai = row.get('WAI', np.nan) rules = [] # ================================================================================== # PRIORITY 1: FLUSHING # Criteria: Steep C decline + High Q + Positive C-Q slope (dilution signature) # ================================================================================== current_steep = conc_diff < percentiles['dC_p08'] prev_steep = prev_conc_diff < percentiles['dC_p08'] prev2_steep = prev2_conc_diff < percentiles['dC_p08'] q_at_peak = flow_phase in ['at_peak', 'rising', 'early_decline'] q_high = q_level in ['high', 'medium'] or q_pos == 'high' c_was_high = prev_c_pos == 'high' or c_pos == 'high' or c_trajectory in ['steep_decline_from_high', 'steep_decline'] # C-Q slope checks for flushing (positive = dilution) cq_slope_positive = not np.isnan(cq_slope) and cq_slope > 0.15 # Current steep decline during high Q if current_steep and (q_at_peak or q_high): rules.extend(['current_steep_decline', 'q_high_or_peak']) # C-Q slope confirms dilution mechanism if cq_slope_positive: rules.append('positive_cq_slope_dilution') confidence = 0.95 if c_was_high else 0.90 else: confidence = 0.92 if c_was_high else 0.85 if c_was_high: rules.append('c_was_high') # WAI (water availability index) can boost confidence if not np.isnan(wai) and wai > 1.0: rules.append('high_wai_wet') confidence = min(0.98, confidence + 0.03) return 'F', confidence, rules # Aftermath: previous steep decline, Q still elevated if prev_steep and q_high and not np.isnan(days_since_peak) and days_since_peak < 15: rules.extend(['prev_steep_decline', 'q_high', 'recent_peak']) if abs(conc_diff) < percentiles['abs_dC_p75']: rules.append('aftermath_stable') confidence = 0.85 if cq_slope_positive else 0.80 if cq_slope_positive: rules.append('positive_cq_slope_confirms') return 'F', confidence, rules # Extended aftermath: steep decline 2 segments ago if prev2_steep and q_at_peak: rules.extend(['prev2_steep_decline', 'q_at_peak']) confidence = 0.75 if cq_slope_positive else 0.70 if cq_slope_positive: rules.append('positive_cq_slope') return 'F', confidence, rules # Very large (extreme) decline with any Q elevation if conc_diff < percentiles['dC_p01']: rules.append('extreme_decline') # Prioritize C-Q slope evidence if cq_slope_positive: rules.append('positive_cq_slope_strong') return 'F', 0.92, rules elif flow_diff > 0 or q_pos in ['high', 'medium']: rules.append('q_elevated') return 'F', 0.88, rules # ================================================================================== # PRIORITY 2: LOADING # Criteria: C rising to max + Negative C-Q slope (enrichment signature) # ================================================================================== c_high_rising = c_pos == 'high' and conc_diff > 0 q_not_peaked = flow_phase not in ['at_peak', 'early_decline'] # C-Q slope checks for loading (negative = enrichment) cq_slope_negative = not np.isnan(cq_slope) and cq_slope < -0.15 if c_high_rising: rules.extend(['c_high', 'c_rising']) # Negative C-Q slope confirms enrichment/mobilization if cq_slope_negative: rules.append('negative_cq_slope_enrichment') if q_not_peaked: rules.append('q_not_peaked') confidence = 0.95 else: confidence = 0.92 else: # Without C-Q slope confirmation if q_not_peaked: rules.append('q_not_peaked') confidence = 0.90 else: confidence = 0.80 # WAI during dry conditions boosts loading confidence if not np.isnan(wai) and wai < -1.0: rules.append('low_wai_accumulation') confidence = min(0.98, confidence + 0.03) return 'L', confidence, rules # Large concentration increase if conc_diff > percentiles['dC_p90']: rules.append('large_c_increase') if flow_diff <= percentiles['dQ_p75']: rules.append('q_stable') confidence = 0.90 if cq_slope_negative else 0.85 if cq_slope_negative: rules.append('negative_cq_slope_confirms') return 'L', confidence, rules # ================================================================================== # PRIORITY 3: CHEMOSTATIC # Criteria: Low HI + Stable + Flat C-Q slope (buffered system) # ================================================================================== in_post_flush = (prev_conc_diff < percentiles['dC_p25']) or \ (row.get('prev2_conc_diff', 0) < percentiles['dC_p08']) post_peak = flow_phase in ['post_peak', 'late_decline'] or \ (not np.isnan(days_since_peak) and days_since_peak > 5 and days_since_peak < 30) # Check C-Q slope is near-zero (chemostatic signature) cq_slope_flat = np.isnan(cq_slope) or abs(cq_slope) < 0.1 if abs(hi) < 0.12 and hi_transition == 'stable': rules.extend(['low_hi', 'stable_hi']) # Flat C-Q slope confirms chemostatic behavior if cq_slope_flat: rules.append('flat_cq_slope_chemostatic') if not (c_high_rising or (q_high and abs(conc_diff) > percentiles['abs_dC_p75'])): # Exclude post-flush recovery (that's Dilution, not Chemostatic) if not (in_post_flush and post_peak): rules.append('not_dynamic') # Higher confidence with flat C-Q slope confidence = 0.90 if cq_slope_flat else 0.85 return 'C', confidence, rules # ================================================================================== # PRIORITY 4: DILUTION # Criteria: Post-flush recovery, Q declining, C stabilizing # ================================================================================== post_peak_phase = flow_phase in ['post_peak', 'late_decline', 'stable'] or \ (not np.isnan(days_since_peak) and days_since_peak > 5) q_declining_moderate = flow_diff < 0 c_stable_or_recovering = abs(conc_diff) < percentiles['abs_dC_p75'] prev_c_declining = prev_conc_diff < percentiles['dC_p25'] prev2_c_declining = row.get('prev2_conc_diff', 0) < percentiles['dC_p08'] c_depleted = c_pos in ['low', 'medium'] # Post-flush recovery: Q declining, C stabilizing after recent flush if post_peak_phase and q_declining_moderate and c_stable_or_recovering: if prev_c_declining or prev2_c_declining: rules.extend(['post_peak', 'q_declining', 'c_stable', 'recent_flush']) if c_depleted: rules.append('c_depleted') return 'D', 0.85, rules return 'D', 0.75, rules # Alternative: Large Q drop with small C change post-peak if post_peak_phase and flow_diff < percentiles['dQ_p10'] and abs(conc_diff) < percentiles['abs_dC_p75']: rules.extend(['post_peak', 'large_q_drop', 'c_not_changing']) if prev_c_declining or prev2_c_declining: rules.append('recent_flush') return 'D', 0.80, rules # ================================================================================== # PRIORITY 5: RECESSION # Criteria: Late cycle, low CVc/CVq, both declining # ================================================================================== late_cycle = flow_phase in ['low', 'late_decline'] or q_level == 'low' both_declining = flow_diff < percentiles['dQ_p25'] and conc_diff < percentiles['dC_p25'] if not np.isnan(cvc_cvq) and cvc_cvq < 0.8: rules.append('low_cvc_cvq') if flow_diff < percentiles['dQ_p25']: rules.append('q_declining') if late_cycle: rules.append('late_cycle') return 'R', 0.85, rules return 'R', 0.75, rules if both_declining and late_cycle: rules.extend(['both_declining', 'late_cycle']) return 'R', 0.70, rules # ================================================================================== # FALLBACK: VARIABLE # ================================================================================== rules.append('fallback_variable') if abs(conc_diff) < 30 and abs(flow_diff) < 30: conf = 0.60 else: conf = 0.40 return 'V', conf, rules
[docs] def classify_geochemical_phase( data: pd.DataFrame, sites: List[str], flow_highres: Optional[pd.DataFrame] = None, qcol: str = 'Q_mLs', ccol: str = 'PLI', water_avail_col: Optional[str] = 'scPDSI', window: int = 5, use_highres: bool = True, headex: float = 0.4, tailex: float = 0.2 ) -> pd.DataFrame: """ Classify geochemical phases using percentile-based thresholds and high-resolution Q data. This function integrates hysteresis analysis, CVc/CVq ratios, C-Q slopes, and high-resolution flow dynamics to classify segments into 6 geochemical phases: - Flushing (F): Rapid mobilization during high flow - Loading (L): Accumulation phase - Chemostatic (C): Stable, low-variability behavior - Dilution (D): Post-flush recovery - Recession (R): Late-cycle decline - Variable (V): Mixed/ambiguous behavior data :: pd.DataFrame Time series data with columns:: site_id, date, [qcol], [ccol] sites :: list of str Site IDs to analyze flow_highres :: pd.DataFrame, optional High-resolution (hourly) flow data with 'time' column and site columns qcol :: str Flow/discharge column name ccol :: str Concentration column name water_avail_col :: str, optional Water availability index column (e.g., scPDSI) window :: int Window size for CVc/CVq calculation use_highres :: bool Whether to use high-resolution flow dynamics (requires flow_highres) headex/tailex :: float Percentage of segment length extending segment before/after for window hysteresis Returns :: pd.DataFrame Classification results with columns: - Segment metadata (dates, flows, concentrations) - Behavior classification - C-Q slopes - CVc, CVq, CVc_CVq: Variability metrics - Window-scale hysteresis (window_HI_*) - geochemical_phase: 'F', 'L', 'C', 'D', 'R', 'V' - phase_confidence: 0.0-1.0 - rules_triggered: Diagnostic information - highres_*: High-resolution flow metrics (if use_highres=True) """ # Legacy compatibility of site naming SITE_MAPPING = { 'B3': 'Site 3B', 'B4': 'Site 3A', 'A12': 'Site 2', 'C4': 'Site 1' } print("[1/6] Building segment-wise data...") # analyze_segments only creates segment metadata (NO event-scale hysteresis) segments_df = _build_segments(data, sites, ccol, qcol) print(f" Generated {len(segments_df)} segments") print("[2/6] Computing CVc/CVq and C-Q slopes...") cvc_cvq_df = compute_cvc_cvq_windows(data, qcol=qcol, ccol=ccol, window=window) print("[3/6] Merging data...") # Note renamed column cq_slope_loglog merged_df = pd.merge( segments_df, cvc_cvq_df[['site_id', 'end_date', 'CVc', 'CVq', 'CVc_CVq', 'cq_slope_loglog']], on=['site_id', 'end_date'], how='left' ) # Handle duplicate cq_slope_loglog column from merge if 'cq_slope_loglog_x' in merged_df.columns and 'cq_slope_loglog_y' in merged_df.columns: # Prefer segment-based calculation (_x), fallback to window calculation (_y) merged_df['cq_slope_loglog'] = merged_df['cq_slope_loglog_x'].fillna(merged_df['cq_slope_loglog_y']) merged_df = merged_df.drop(columns=['cq_slope_loglog_x', 'cq_slope_loglog_y']) # Add WAI if water_avail_col and water_avail_col in data.columns: wai_map = data.set_index('date')[water_avail_col].to_dict() merged_df['WAI'] = merged_df['end_date'].map(wai_map) # Global percentiles for absolute Q and C levels q_quantiles = data[qcol].quantile([0.25, 0.50, 0.75]) c_quantiles = data[ccol].quantile([0.25, 0.50, 0.75]) percentiles = { 'Q_p25': q_quantiles[0.25], 'Q_p50': q_quantiles[0.50], 'Q_p75': q_quantiles[0.75], 'C_p25': c_quantiles[0.25], 'C_p50': c_quantiles[0.50], 'C_p75': c_quantiles[0.75] } # Calculate percentiles for CHANGES (ΔC and ΔQ) print(" Computing change distributions...") change_percentiles = compute_change_percentiles(data, sites, ccol, qcol) percentiles.update(change_percentiles) print(f" ΔC thresholds: steep decline < {percentiles['dC_p08']:.1f} (p08), large increase > {percentiles['dC_p90']:.1f} (p90)") print(f" ΔQ thresholds: moderate decline < {percentiles['dQ_p25']:.1f} (p25), increase > {percentiles['dQ_p75']:.1f} (p75)") merged_df['Q_position'] = merged_df['end_flow'].apply( lambda x: 'low' if x < percentiles['Q_p25'] else ('high' if x > percentiles['Q_p75'] else 'medium') ) merged_df['C_position'] = merged_df['end_conc'].apply( lambda x: 'low' if x < percentiles['C_p25'] else ('high' if x > percentiles['C_p75'] else 'medium') ) # High-resolution flow analysis (if enabled) if use_highres and flow_highres is not None: print("[4/6] Analyzing high-resolution Q dynamics and window hysteresis...") flow_highres['time'] = pd.to_datetime(flow_highres['time']) results = [] for site in sites: site_df = merged_df[merged_df['site_id'] == site].sort_values('end_date').reset_index(drop=True) # High-res subset try: hres_dummy = flow_highres[site] qxcol = site except: hres_dummy = flow_highres[SITE_MAPPING[site]] qxcol = SITE_MAPPING[site] cdummy = data.loc[(data.site_id == site), ccol] cdummy.index = data.loc[(data.site_id == site), 'date'] site_dyn = pd.concat([hres_dummy, cdummy], axis=1).interpolate( method='spline', order=2, limit=None, limit_direction='forward' ).dropna() if len(site_df) == 0: continue for i in range(len(site_df)): row = site_df.iloc[i].copy() # Define window for hysteresis calculation t_extent = pd.Timedelta(row['end_date'] - row['start_date']).days segment_start = pd.to_datetime(row['start_date']) - pd.Timedelta(days=int(t_extent * headex)) segment_end = pd.to_datetime(row['end_date']) + pd.Timedelta(days=int(t_extent * tailex)) try: # Window-scale hysteresis calculated here flow_dynamics = analyze_segment_flow_dynamics( site_dyn.loc[segment_start:segment_end], percentiles, ccol, qxcol ) except: flow_dynamics = False if flow_dynamics: for key, val in flow_dynamics.items(): row[f'highres_{key}'] = val else: row['highres_flow_phase'] = 'unknown' row['highres_days_since_peak'] = np.nan # Extended temporal context if i > 0: prev = site_df.iloc[i - 1] row['prev_behavior'] = prev['behavior'] row['prev_CVc_CVq'] = prev.get('CVc_CVq', np.nan) row['prev_Q_position'] = prev['Q_position'] row['prev_C_position'] = prev['C_position'] row['prev_conc_diff'] = prev['conc_diff'] row['prev_flow_diff'] = prev['flow_diff'] # Simplified HI transition tracking if flow_dynamics: current_hi = flow_dynamics.get('window_HI_zuecco', np.nan) prev_hi = prev.get('window_HI_zuecco', np.nan) if not np.isnan(prev_hi) and not np.isnan(current_hi): if prev_hi > 0.01 and current_hi < -0.01: row['HI_transition'] = 'pos_to_neg' elif prev_hi < -0.01 and current_hi > 0.01: row['HI_transition'] = 'neg_to_pos' else: row['HI_transition'] = 'stable' else: row['HI_transition'] = 'stable' else: row['HI_transition'] = 'stable' # Store current window HI values for next iteration if flow_dynamics: row['window_HI_zuecco'] = flow_dynamics.get('window_HI_zuecco', np.nan) row['window_HI_lloyd'] = flow_dynamics.get('window_HI_lloyd', np.nan) row['window_HI_harp'] = flow_dynamics.get('window_HI_harp', np.nan) else: # First segment row['prev_behavior'] = 'none' row['prev_CVc_CVq'] = np.nan row['prev_Q_position'] = 'none' row['prev_C_position'] = 'none' row['prev_conc_diff'] = 0 row['prev_flow_diff'] = 0 row['HI_transition'] = 'first' if flow_dynamics: row['window_HI_zuecco'] = flow_dynamics.get('window_HI_zuecco', np.nan) row['window_HI_lloyd'] = flow_dynamics.get('window_HI_lloyd', np.nan) row['window_HI_harp'] = flow_dynamics.get('window_HI_harp', np.nan) # Two segments back if i > 1: prev2 = site_df.iloc[i - 2] row['prev2_conc_diff'] = prev2['conc_diff'] row['prev2_C_position'] = prev2['C_position'] else: row['prev2_conc_diff'] = 0 row['prev2_C_position'] = 'none' # Next segment if i < len(site_df) - 1: next_seg = site_df.iloc[i + 1] row['next_C_position'] = next_seg['C_position'] else: row['next_C_position'] = 'none' # C trajectory - using percentile-based thresholds conc_diff = row['conc_diff'] c_pos = row['C_position'] prev_c_pos = row.get('prev_C_position', 'none') if conc_diff < percentiles['dC_p08']: if prev_c_pos == 'high': row['C_trajectory'] = 'steep_decline_from_high' else: row['C_trajectory'] = 'steep_decline' elif conc_diff < percentiles['dC_p25']: row['C_trajectory'] = 'gradual_decline' elif conc_diff > percentiles['dC_p90']: row['C_trajectory'] = 'rising_to_max' if c_pos == 'high' else 'large_increase' elif conc_diff > percentiles['dC_p75']: row['C_trajectory'] = 'moderate_increase' elif c_pos == 'high' and abs(conc_diff) < percentiles['abs_dC_p50']: row['C_trajectory'] = 'at_maximum' else: row['C_trajectory'] = 'stable' results.append(row) temporal_df = pd.DataFrame(results) else: print("[4/6] Skipping high-resolution Q analysis (use_highres=False or no data)") temporal_df = merged_df.copy() # Add minimal temporal context without highres for site in sites: site_mask = temporal_df['site_id'] == site site_df = temporal_df[site_mask].sort_values('end_date').reset_index(drop=True) for i in range(len(site_df)): idx = site_df.index[i] if i > 0: prev_idx = site_df.index[i - 1] temporal_df.loc[idx, 'prev_conc_diff'] = temporal_df.loc[prev_idx, 'conc_diff'] temporal_df.loc[idx, 'prev_C_position'] = temporal_df.loc[prev_idx, 'C_position'] else: temporal_df.loc[idx, 'prev_conc_diff'] = 0 temporal_df.loc[idx, 'prev_C_position'] = 'none' if i > 1: prev2_idx = site_df.index[i - 2] temporal_df.loc[idx, 'prev2_conc_diff'] = temporal_df.loc[prev2_idx, 'conc_diff'] else: temporal_df.loc[idx, 'prev2_conc_diff'] = 0 # Set defaults for highres columns temporal_df.loc[idx, 'highres_flow_phase'] = 'unknown' temporal_df.loc[idx, 'highres_days_since_peak'] = np.nan temporal_df.loc[idx, 'HI_transition'] = 'stable' print("[5/6] Classifying with percentile-based logic + C-Q slopes...") phases = [] confidences = [] rules_list = [] for idx, row in temporal_df.iterrows(): # classify_segment_phase no longer needs hi_method parameter phase, conf, rules = classify_segment_phase(row, percentiles) phases.append(phase) confidences.append(conf) rules_list.append('|'.join(rules)) temporal_df['geochemical_phase'] = phases temporal_df['phase_confidence'] = confidences temporal_df['rules_triggered'] = rules_list print(f"\n[6/6] Complete!") print(f"\nPhase distribution:") for phase, count in temporal_df['geochemical_phase'].value_counts().items(): pct = count / len(temporal_df) * 100 print(f" {phase}: {count} ({pct:.1f}%)") return temporal_df
# ============================================================================= # SIMPLE C-Q BEHAVIOR CLASSIFIER (Williams 1989 / Evans & Davies 1998) # =============================================================================
[docs] def classify_cq_behavior_simple( flow_diff: float, conc_diff: float, flow_range: Tuple[float, float], conc_range: Tuple[float, float], threshold_factor: float = 0.01 ) -> str: """ Simple C-Q behavior classifier based on segment changes. Based on Williams (1989) and Evans & Davies (1998). This is a simpler classification than the main GCS phase classifier above. Parameters ---------- flow_diff : float Change in flow between points conc_diff : float Change in concentration between points flow_range : tuple (min, max) flow values for significance testing conc_range : tuple (min, max) concentration values for significance testing threshold_factor : float Relative threshold for significant change Returns ------- str Behavior classification: - 'connectivity': Q↑ C↑ (mobilization) - 'dispersion': Q↑ C↓ (dilution dominates) - 'accumulation': Q↓ C↑ (evaporation/point sources) - 'recovery': Q↓ C↓ (system recovery) - 'quasi-chemostatic': Q changes, C stable - 'source variation': C changes, Q stable - 'static': No significant changes """ flow_delta = flow_range[1] - flow_range[0] conc_delta = conc_range[1] - conc_range[0] # Determine if changes are significant is_flow_changing = abs(flow_diff) > (threshold_factor * flow_delta) if flow_delta > 1e-10 else False is_conc_changing = abs(conc_diff) > (threshold_factor * conc_delta) if conc_delta > 1e-10 else False if not is_flow_changing and not is_conc_changing: return 'static' elif is_flow_changing and not is_conc_changing: return 'quasi-chemostatic' elif not is_flow_changing and is_conc_changing: return 'source variation' else: # Both changing - determine relationship if flow_diff > 0 and conc_diff > 0: return 'connectivity' elif flow_diff < 0 and conc_diff < 0: return 'recovery' elif flow_diff > 0 and conc_diff < 0: return 'dispersion' else: return 'accumulation'
# ============================================================================= # PRIVATE HELPERS # ============================================================================= def _build_segments( data: pd.DataFrame, sites: List[str], ccol: str, qcol: str ) -> pd.DataFrame: """ V5 NEW FUNCTION: Build segment-wise data WITHOUT event-scale hysteresis. Replaces the incorrect analyze_hysteresis approach that duplicated event-scale metrics across all segments. Returns segment metadata only: - Dates, flows, concentrations - Changes (ΔQ, ΔC) - Behavior classification - C-Q slopes NO hysteresis metrics here - those are calculated at window-scale in the high-res analysis. """ analysis_data = data[data['site_id'].isin(sites)].copy() analysis_data = analysis_data.dropna(subset=[qcol, ccol, 'date']) if len(analysis_data) < 2: return pd.DataFrame() analysis_data = analysis_data.sort_values(['site_id', 'date']) # Calculate compound-specific ranges for behavior classification flow_range = (analysis_data[qcol].min(), analysis_data[qcol].max()) conc_range = (analysis_data[ccol].min(), analysis_data[ccol].max()) results = [] for site in sites: site_data = analysis_data[analysis_data['site_id'] == site].reset_index(drop=True) if len(site_data) < 2: continue # Analyze segments (point-to-point) for i in range(len(site_data) - 1): p1, p2 = site_data.iloc[i], site_data.iloc[i + 1] # Calculate changes flow_diff = p2[qcol] - p1[qcol] conc_diff = p2[ccol] - p1[ccol] # Classify behavior (using simple Williams 1989 classifier) behavior = classify_cq_behavior_simple(flow_diff, conc_diff, flow_range, conc_range) # Build result - ONLY segment metadata result = { 'site_id': site, 'compound': ccol, 'segment_id': i, 'start_date': p1['date'], 'end_date': p2['date'], 'start_flow': p1[qcol], 'end_flow': p2[qcol], 'start_conc': p1[ccol], 'end_conc': p2[ccol], 'flow_diff': flow_diff, 'conc_diff': conc_diff, 'behavior': behavior, 'cq_slope_loglog': compute_cq_slope(p1[qcol], p2[qcol], p1[ccol], p2[ccol], kind="loglog"), 'cq_slope_linear': compute_cq_slope(p1[qcol], p2[qcol], p1[ccol], p2[ccol], kind="linear") } # Add HydPhase if available if 'HydPhase' in site_data.columns: result['start_hyphase'] = p1.get('HydPhase', 'unknown') result['end_hyphase'] = p2.get('HydPhase', 'unknown') results.append(result) return pd.DataFrame(results) # ============================================================================= # MODULE EXPORTS # ============================================================================= __all__ = [ 'classify_segment_phase', 'classify_geochemical_phase', 'classify_cq_behavior_simple' ] print("gcs_classification.py: geochemical phase classification loaded")