import requests from datetime import datetime, timedelta import time import numpy as np import pandas as pd import yfinance as yf from datetime import datetime, timedelta from sklearn.ensemble import IsolationForest from sklearn.covariance import EllipticEnvelope from sklearn.preprocessing import RobustScaler from statsmodels.tsa.api import VAR from pyod.models.lof import LOF import matplotlib.pyplot as plt import seaborn as sns import warnings warnings.filterwarnings('ignore') class CryptoAnomalyDetector: def __init__(self, tickers=['bitcoin', 'ethereum', 'solana']): # CoinGecko uses lowercase names or IDs instead of tickers self.tickers = tickers self.coin_ids = tickers # For simplicity, using names as IDs self.models = {} self.scalers = {} self.thresholds = {} self.correlation_matrices = [] self.last_training_date = None def fetch_data(self, days=30): """Fetch historical data using CoinGecko API""" base_url = "https://api.coingecko.com/api/v3" end_date = datetime.now() start_date = end_date - timedelta(days=days) # Convert to UNIX timestamps end_timestamp = int(end_date.timestamp()) start_timestamp = int(start_date.timestamp()) data = {} for coin_id in self.coin_ids: url = f"{base_url}/coins/{coin_id}/market_chart/range" params = { 'vs_currency': 'usd', # 'from': start_timestamp, # 'to': end_timestamp 'days': 30 } # Add delay to avoid rate limiting time.sleep(1) try: response = requests.get(url, params=params) response.raise_for_status() coin_data = response.json() # Extract prices and create DataFrame prices = pd.DataFrame(coin_data['prices'], columns=['timestamp', 'price']) prices['date'] = pd.to_datetime(prices['timestamp'], unit='ms') prices.set_index('date', inplace=True) data[coin_id] = prices['price'] except requests.exceptions.RequestException as e: print(f"Error fetching data for {coin_id}: {e}") data[coin_id] = pd.Series() # Empty series if error occurs # Combine all series into a DataFrame combined_data = pd.DataFrame(data) # Resample to daily data if needed (CoinGecko might return more granular data) combined_data = combined_data.resample('D').last().ffill() return combined_data.dropna() # ... [rest of the class remains the same] ... def prepare_features(self, data): """Create features for anomaly detection""" # Calculate log returns returns = np.log(data).diff().dropna() # Calculate rolling volatilities (3-day and 7-day) features = pd.DataFrame() for ticker in self.tickers: features[f'{ticker}_return'] = returns[ticker] features[f'{ticker}_vol_3d'] = returns[ticker].rolling(3).std() features[f'{ticker}_vol_7d'] = returns[ticker].rolling(7).std() # Add correlation features rolling_corr = returns.rolling(5).corr().dropna() self.correlation_matrices = rolling_corr.groupby(level=0).apply(lambda x: x) # Add market-wide volatility (average of all volatilities) features['market_vol'] = features.filter(regex='vol_3d').mean(axis=1) return features.dropna() def train_models(self, training_data): """Train multiple anomaly detection models""" # Robust scaling (better for crypto data) scaler = RobustScaler() scaled_data = scaler.fit_transform(training_data) self.scalers['main'] = scaler # 1. Isolation Forest (for point anomalies) iso_forest = IsolationForest( n_estimators=100, contamination=0.01, random_state=42 ) iso_forest.fit(scaled_data) self.models['isolation_forest'] = iso_forest # 2. Mahalanobis Distance (Elliptic Envelope) elliptic = EllipticEnvelope( contamination=0.01, random_state=42 ) elliptic.fit(scaled_data) self.models['elliptic_envelope'] = elliptic # 3. Local Outlier Factor (for density-based anomalies) lof = LOF( n_neighbors=20, contamination=0.01 ) lof.fit(scaled_data) self.models['lof'] = lof # 4. Vector Autoregression (for multivariate time series) var = VAR(training_data) var_model = var.fit(maxlags=3) self.models['var'] = var_model # Store training date self.last_training_date = datetime.now() def detect_anomalies(self, current_data): """Detect anomalies in new data points""" # Prepare features for current data current_features = self.prepare_features(current_data) scaled_data = self.scalers['main'].transform(current_features) # Get predictions from all models results = pd.DataFrame(index=current_features.index) # Isolation Forest results['iso_forest'] = self.models['isolation_forest'].predict(scaled_data) # Elliptic Envelope results['elliptic_env'] = self.models['elliptic_envelope'].predict(scaled_data) # LOF lof_scores = self.models['lof'].decision_function(scaled_data) results['lof_score'] = lof_scores results['lof_anomaly'] = (lof_scores < np.percentile(lof_scores, 1)).astype(int) # VAR model residuals var_pred = self.models['var'].forecast(current_features.values, 1) residuals = current_features.iloc[-1] - var_pred[0] mahalanobis_dist = np.sqrt(np.dot(np.dot(residuals, np.linalg.inv(self.models['var'].sigma_u)), residuals)) results['var_mahalanobis'] = mahalanobis_dist results['var_anomaly'] = (mahalanobis_dist > np.percentile(results['var_mahalanobis'], 99)).astype(int) # Combined anomaly score results['combined_score'] = ( (results['iso_forest'] == -1).astype(int) + (results['elliptic_env'] == -1).astype(int) + results['lof_anomaly'] + results['var_anomaly'] ) results['final_anomaly'] = (results['combined_score'] >= 2).astype(int) return results def visualize_results(self, prices, anomalies): """Visualize prices with anomalies marked""" plt.figure(figsize=(15, 10)) # Plot prices for i, ticker in enumerate(self.tickers, 1): plt.subplot(len(self.tickers), 1, i) plt.plot(prices[ticker], label=ticker) # Mark anomalies anomaly_points = prices.loc[anomalies[anomalies['final_anomaly'] == 1].index] for point in anomaly_points: plt.axvline(x=point, color='r', alpha=0.3) plt.title(f'{ticker} Price with Anomalies Marked') plt.legend() plt.tight_layout() plt.show() # Plot correlation changes if available if len(self.correlation_matrices) > 0: last_corr = self.correlation_matrices.iloc[-1] plt.figure(figsize=(10, 8)) sns.heatmap(last_corr, annot=True, cmap='coolwarm', center=0) plt.title('Latest Rolling Correlation Matrix') plt.show() def run_pipeline(self): """Complete training and detection pipeline""" print("Fetching training data...") training_data = self.fetch_data(days=30) training_features = self.prepare_features(training_data) print("Training models...") self.train_models(training_features) print("Fetching latest data for detection...") # Get data since last training point (plus some overlap) current_data = self.fetch_data(days=3) current_features = self.prepare_features(current_data) print("Detecting anomalies...") anomalies = self.detect_anomalies(current_data) print("\nAnomaly Detection Results:") print(anomalies[anomalies['final_anomaly'] == 1]) self.visualize_results(current_data, anomalies) return anomalies if __name__ == "__main__": # Initialize with major cryptocurrencies detector = CryptoAnomalyDetector(tickers=['solana', 'dogecoin', 'ethereum', 'ripple', 'tron']) # Run complete pipeline results = detector.run_pipeline()