| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- import requests
- import pandas as pd
- import numpy as np
- from sklearn.ensemble import IsolationForest
- from datetime import datetime
- import os
- # Configuration
- CRYPTO = 'solana' # CoinGecko ID for DOGE
- CURRENCY = 'usd' # Base currency
- HISTORICAL_URL = f'https://api.coingecko.com/api/v3/coins/{CRYPTO}/market_chart?vs_currency={CURRENCY}&days=30 #&interval=hourly'
- HISTORY_WINDOW = 1000 # Max historical points to keep (for memory efficiency)
- CSV_FILE = '%s_price_history.csv' % CRYPTO # For loading historical data
- # Global variables
- price_df = pd.DataFrame(columns=['timestamp', 'price']) # Historical prices
- model = None # Isolation Forest model
- def fetch_historical_data():
- """Fetch historical hourly price data from CoinGecko."""
- try:
- response = requests.get(HISTORICAL_URL)
- response.raise_for_status()
- data = response.json()
- prices = data['prices'] # List of [timestamp_ms, price]
- df = pd.DataFrame(prices, columns=['timestamp', 'price'])
- df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
- return df
- except Exception as e:
- print(f"Error fetching historical data: {e}")
- return pd.DataFrame()
- def load_or_fetch_history():
- """Load history from CSV if exists, else fetch from API."""
- global price_df
- if os.path.exists(CSV_FILE):
- price_df = pd.read_csv(CSV_FILE, parse_dates=['timestamp'])
- print(f"Loaded {len(price_df)} historical points from CSV.")
- else:
- price_df = fetch_historical_data()
- if not price_df.empty:
- price_df.to_csv(CSV_FILE, index=False)
- print(f"Fetched and saved {len(price_df)} historical points.")
- # Trim to window size
- price_df = price_df.tail(HISTORY_WINDOW)
- def engineer_features(df):
- """Engineer features for anomaly detection."""
- df = df.copy()
- df['pct_change'] = df['price'].pct_change() # Percentage change
- df['abs_diff'] = df['price'].diff() # Absolute difference
- df['rolling_mean_5'] = df['price'].rolling(window=5).mean() # Rolling mean
- df['rolling_std_5'] = df['price'].rolling(window=5).std() # Rolling std
- df['hour'] = df['timestamp'].dt.hour # Time of day
- df.fillna(0, inplace=True)
- features = ['pct_change', 'abs_diff', 'rolling_mean_5', 'rolling_std_5', 'hour']
- return df[features]
- def train_model(train_df):
- """Train Isolation Forest on given features."""
- if len(train_df) < 10:
- print("Insufficient data to train model.")
- return None
-
- features = engineer_features(train_df)
- model = IsolationForest(contamination=0.01, random_state=42)
- model.fit(features)
- return model
- def get_anomaly_score(model, test_df, index):
- """Get anomaly score for a specific point in the test DF."""
- # To simulate "appending" without modifying, create temp DF up to this index
- temp_df = test_df.iloc[:index + 1] # Includes all prior test points up to this one
- features = engineer_features(temp_df)
- score = model.decision_function(features.tail(1))[0]
- return score
- # Sensitivity Analysis
- def run_sensitivity_analysis(thresholds=[-0.2, -0.1,-0.07, -0.05, -0.03, -0.01, 0.1], precision=0.0001, max_iterations=100):
- """Compute upper/lower price bounds for anomalies across multiple thresholds."""
- if model is None:
- print("Model not trained yet. Cannot run sensitivity analysis.")
- return
-
- if price_df.empty:
- print("No historical data available.")
- return
-
- current_price = price_df['price'].iloc[-1]
- timestamp = datetime.now()
-
- def find_bounds(threshold):
- def is_anomaly_func(price):
- # Simulate detection with given threshold
- new_row = pd.DataFrame({'timestamp': [timestamp], 'price': [price]})
- temp_df = pd.concat([price_df, new_row], ignore_index=True)
- features = engineer_features(temp_df)
- score = model.decision_function(features.tail(1))[0]
- return score < threshold
-
- # Lower bound
- low = max(0, current_price * 0.5)
- high = current_price
- lower_bound = None
- for _ in range(max_iterations):
- mid = (low + high) / 2
- if is_anomaly_func(mid):
- lower_bound = mid
- high = mid
- else:
- low = mid
- if high - low < precision:
- break
-
- # Upper bound
- low = current_price
- high = current_price * 2
- upper_bound = None
- for _ in range(max_iterations):
- mid = (low + high) / 2
- if is_anomaly_func(mid):
- upper_bound = mid
- low = mid
- else:
- high = mid
- if high - low < precision:
- break
-
- return lower_bound, upper_bound
-
- print("Sensitivity Analysis: Anomaly Price Bounds for Different Thresholds")
- print(f"Based on last historical price: ${current_price:.4f}")
- for thresh in thresholds:
- lower, upper = find_bounds(thresh)
- print(f"\nThreshold {thresh}:")
- if lower is not None:
- print(f" - Prices BELOW ~${lower:.4f} would trigger anomaly.")
- else:
- print(" - No lower bound found.")
- if upper is not None:
- print(f" - Prices ABOVE ~${upper:.4f} would trigger anomaly.")
- else:
- print(" - No upper bound found.")
- # Backtesting
- def run_backtest(thresholds=[-0.7, -0.5, -0.3], test_fraction=0.2):
- """Backtest the model on a holdout set, reporting flagged anomalies per threshold."""
- if len(price_df) < 20:
- print("Insufficient data for backtesting.")
- return
-
- # Split data: Train on first (1 - test_fraction), test on last test_fraction
- split_idx = int(len(price_df) * (1 - test_fraction))
- train_df = price_df.iloc[:split_idx]
- test_df = price_df.iloc[split_idx:].reset_index(drop=True)
-
- backtest_model = train_model(train_df)
- if backtest_model is None:
- return
-
- print(f"Backtesting on {len(test_df)} holdout points (trained on {len(train_df)} points).")
-
- # Score each test point sequentially
- scores = []
- for i in range(len(test_df)):
- score = get_anomaly_score(backtest_model, test_df, i)
- scores.append(score)
-
- for thresh in thresholds:
- flagged = [i for i, score in enumerate(scores) if score < thresh]
- flagged_pct = (len(flagged) / len(test_df)) * 100 if len(test_df) > 0 else 0
- print(f"\nThreshold {thresh}: {len(flagged)} points flagged as anomalies ({flagged_pct:.2f}%)")
- if flagged:
- print("Flagged points (timestamp, price):")
- for idx in flagged[:10]: # Limit to first 10 for brevity
- ts = test_df['timestamp'].iloc[idx]
- price = test_df['price'].iloc[idx]
- print(f" - {ts}: ${price:.4f}")
- if len(flagged) > 10:
- print(" ... (more flagged points omitted)")
- # Main Execution
- load_or_fetch_history()
- model = train_model(price_df) # Train on full data for sensitivity analysis
- # Interactive CLI for analysis
- print(f"{CRYPTO.upper()} price anomaly analyzer loaded with Isolation Forest.")
- print("Enter 'sensitivity' for sensitivity analysis, 'backtest' for backtesting, or 'quit' to exit.")
- while True:
- user_input = input("> ").strip().lower()
- if user_input == 'quit':
- print("Exiting...")
- break
- elif user_input == 'sensitivity':
- run_sensitivity_analysis()
- elif user_input == 'backtest':
- run_backtest()
- else:
- print("Unknown command. Use 'sensitivity', 'backtest', or 'quit'.")
|