import requests import pandas as pd import numpy as np from sklearn.ensemble import IsolationForest from datetime import datetime import os # Configuration CRYPTO = 'solana' # CoinGecko ID for DOGE CURRENCY = 'usd' # Base currency HISTORICAL_URL = f'https://api.coingecko.com/api/v3/coins/{CRYPTO}/market_chart?vs_currency={CURRENCY}&days=30 #&interval=hourly' HISTORY_WINDOW = 1000 # Max historical points to keep (for memory efficiency) CSV_FILE = '%s_price_history.csv' % CRYPTO # For loading historical data # Global variables price_df = pd.DataFrame(columns=['timestamp', 'price']) # Historical prices model = None # Isolation Forest model def fetch_historical_data(): """Fetch historical hourly price data from CoinGecko.""" try: response = requests.get(HISTORICAL_URL) response.raise_for_status() data = response.json() prices = data['prices'] # List of [timestamp_ms, price] df = pd.DataFrame(prices, columns=['timestamp', 'price']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') return df except Exception as e: print(f"Error fetching historical data: {e}") return pd.DataFrame() def load_or_fetch_history(): """Load history from CSV if exists, else fetch from API.""" global price_df if os.path.exists(CSV_FILE): price_df = pd.read_csv(CSV_FILE, parse_dates=['timestamp']) print(f"Loaded {len(price_df)} historical points from CSV.") else: price_df = fetch_historical_data() if not price_df.empty: price_df.to_csv(CSV_FILE, index=False) print(f"Fetched and saved {len(price_df)} historical points.") # Trim to window size price_df = price_df.tail(HISTORY_WINDOW) def engineer_features(df): """Engineer features for anomaly detection.""" df = df.copy() df['pct_change'] = df['price'].pct_change() # Percentage change df['abs_diff'] = df['price'].diff() # Absolute difference df['rolling_mean_5'] = df['price'].rolling(window=5).mean() # Rolling mean df['rolling_std_5'] = df['price'].rolling(window=5).std() # Rolling std df['hour'] = df['timestamp'].dt.hour # Time of day df.fillna(0, inplace=True) features = ['pct_change', 'abs_diff', 'rolling_mean_5', 'rolling_std_5', 'hour'] return df[features] def train_model(train_df): """Train Isolation Forest on given features.""" if len(train_df) < 10: print("Insufficient data to train model.") return None features = engineer_features(train_df) model = IsolationForest(contamination=0.01, random_state=42) model.fit(features) return model def get_anomaly_score(model, test_df, index): """Get anomaly score for a specific point in the test DF.""" # To simulate "appending" without modifying, create temp DF up to this index temp_df = test_df.iloc[:index + 1] # Includes all prior test points up to this one features = engineer_features(temp_df) score = model.decision_function(features.tail(1))[0] return score # Sensitivity Analysis def run_sensitivity_analysis(thresholds=[-0.2, -0.1,-0.07, -0.05, -0.03, -0.01, 0.1], precision=0.0001, max_iterations=100): """Compute upper/lower price bounds for anomalies across multiple thresholds.""" if model is None: print("Model not trained yet. Cannot run sensitivity analysis.") return if price_df.empty: print("No historical data available.") return current_price = price_df['price'].iloc[-1] timestamp = datetime.now() def find_bounds(threshold): def is_anomaly_func(price): # Simulate detection with given threshold new_row = pd.DataFrame({'timestamp': [timestamp], 'price': [price]}) temp_df = pd.concat([price_df, new_row], ignore_index=True) features = engineer_features(temp_df) score = model.decision_function(features.tail(1))[0] return score < threshold # Lower bound low = max(0, current_price * 0.5) high = current_price lower_bound = None for _ in range(max_iterations): mid = (low + high) / 2 if is_anomaly_func(mid): lower_bound = mid high = mid else: low = mid if high - low < precision: break # Upper bound low = current_price high = current_price * 2 upper_bound = None for _ in range(max_iterations): mid = (low + high) / 2 if is_anomaly_func(mid): upper_bound = mid low = mid else: high = mid if high - low < precision: break return lower_bound, upper_bound print("Sensitivity Analysis: Anomaly Price Bounds for Different Thresholds") print(f"Based on last historical price: ${current_price:.4f}") for thresh in thresholds: lower, upper = find_bounds(thresh) print(f"\nThreshold {thresh}:") if lower is not None: print(f" - Prices BELOW ~${lower:.4f} would trigger anomaly.") else: print(" - No lower bound found.") if upper is not None: print(f" - Prices ABOVE ~${upper:.4f} would trigger anomaly.") else: print(" - No upper bound found.") # Backtesting def run_backtest(thresholds=[-0.7, -0.5, -0.3], test_fraction=0.2): """Backtest the model on a holdout set, reporting flagged anomalies per threshold.""" if len(price_df) < 20: print("Insufficient data for backtesting.") return # Split data: Train on first (1 - test_fraction), test on last test_fraction split_idx = int(len(price_df) * (1 - test_fraction)) train_df = price_df.iloc[:split_idx] test_df = price_df.iloc[split_idx:].reset_index(drop=True) backtest_model = train_model(train_df) if backtest_model is None: return print(f"Backtesting on {len(test_df)} holdout points (trained on {len(train_df)} points).") # Score each test point sequentially scores = [] for i in range(len(test_df)): score = get_anomaly_score(backtest_model, test_df, i) scores.append(score) for thresh in thresholds: flagged = [i for i, score in enumerate(scores) if score < thresh] flagged_pct = (len(flagged) / len(test_df)) * 100 if len(test_df) > 0 else 0 print(f"\nThreshold {thresh}: {len(flagged)} points flagged as anomalies ({flagged_pct:.2f}%)") if flagged: print("Flagged points (timestamp, price):") for idx in flagged[:10]: # Limit to first 10 for brevity ts = test_df['timestamp'].iloc[idx] price = test_df['price'].iloc[idx] print(f" - {ts}: ${price:.4f}") if len(flagged) > 10: print(" ... (more flagged points omitted)") # Main Execution load_or_fetch_history() model = train_model(price_df) # Train on full data for sensitivity analysis # Interactive CLI for analysis print(f"{CRYPTO.upper()} price anomaly analyzer loaded with Isolation Forest.") print("Enter 'sensitivity' for sensitivity analysis, 'backtest' for backtesting, or 'quit' to exit.") while True: user_input = input("> ").strip().lower() if user_input == 'quit': print("Exiting...") break elif user_input == 'sensitivity': run_sensitivity_analysis() elif user_input == 'backtest': run_backtest() else: print("Unknown command. Use 'sensitivity', 'backtest', or 'quit'.")