| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- import requests
- from datetime import datetime, timedelta
- import time
- import numpy as np
- import pandas as pd
- import yfinance as yf
- from datetime import datetime, timedelta
- from sklearn.ensemble import IsolationForest
- from sklearn.covariance import EllipticEnvelope
- from sklearn.preprocessing import RobustScaler
- from statsmodels.tsa.api import VAR
- from pyod.models.lof import LOF
- import matplotlib.pyplot as plt
- import seaborn as sns
- import warnings
- warnings.filterwarnings('ignore')
- class CryptoAnomalyDetector:
- def __init__(self, tickers=['bitcoin', 'ethereum', 'solana']):
- # CoinGecko uses lowercase names or IDs instead of tickers
- self.tickers = tickers
- self.coin_ids = tickers # For simplicity, using names as IDs
- self.models = {}
- self.scalers = {}
- self.thresholds = {}
- self.correlation_matrices = []
- self.last_training_date = None
-
- def fetch_data(self, days=30):
- """Fetch historical data using CoinGecko API"""
- base_url = "https://api.coingecko.com/api/v3"
- end_date = datetime.now()
- start_date = end_date - timedelta(days=days)
-
- # Convert to UNIX timestamps
- end_timestamp = int(end_date.timestamp())
- start_timestamp = int(start_date.timestamp())
-
- data = {}
-
- for coin_id in self.coin_ids:
- url = f"{base_url}/coins/{coin_id}/market_chart/range"
- params = {
- 'vs_currency': 'usd',
- # 'from': start_timestamp,
- # 'to': end_timestamp
- 'days': 30
- }
-
- # Add delay to avoid rate limiting
- time.sleep(1)
-
- try:
- response = requests.get(url, params=params)
- response.raise_for_status()
- coin_data = response.json()
-
- # Extract prices and create DataFrame
- prices = pd.DataFrame(coin_data['prices'], columns=['timestamp', 'price'])
- prices['date'] = pd.to_datetime(prices['timestamp'], unit='ms')
- prices.set_index('date', inplace=True)
- data[coin_id] = prices['price']
-
- except requests.exceptions.RequestException as e:
- print(f"Error fetching data for {coin_id}: {e}")
- data[coin_id] = pd.Series() # Empty series if error occurs
-
- # Combine all series into a DataFrame
- combined_data = pd.DataFrame(data)
-
- # Resample to daily data if needed (CoinGecko might return more granular data)
- combined_data = combined_data.resample('D').last().ffill()
-
- return combined_data.dropna()
-
- # ... [rest of the class remains the same] ...
-
- def prepare_features(self, data):
- """Create features for anomaly detection"""
- # Calculate log returns
- returns = np.log(data).diff().dropna()
-
- # Calculate rolling volatilities (3-day and 7-day)
- features = pd.DataFrame()
- for ticker in self.tickers:
- features[f'{ticker}_return'] = returns[ticker]
- features[f'{ticker}_vol_3d'] = returns[ticker].rolling(3).std()
- features[f'{ticker}_vol_7d'] = returns[ticker].rolling(7).std()
-
- # Add correlation features
- rolling_corr = returns.rolling(5).corr().dropna()
- self.correlation_matrices = rolling_corr.groupby(level=0).apply(lambda x: x)
-
- # Add market-wide volatility (average of all volatilities)
- features['market_vol'] = features.filter(regex='vol_3d').mean(axis=1)
-
- return features.dropna()
-
- def train_models(self, training_data):
- """Train multiple anomaly detection models"""
- # Robust scaling (better for crypto data)
- scaler = RobustScaler()
- scaled_data = scaler.fit_transform(training_data)
- self.scalers['main'] = scaler
-
- # 1. Isolation Forest (for point anomalies)
- iso_forest = IsolationForest(
- n_estimators=100,
- contamination=0.01,
- random_state=42
- )
- iso_forest.fit(scaled_data)
- self.models['isolation_forest'] = iso_forest
-
- # 2. Mahalanobis Distance (Elliptic Envelope)
- elliptic = EllipticEnvelope(
- contamination=0.01,
- random_state=42
- )
- elliptic.fit(scaled_data)
- self.models['elliptic_envelope'] = elliptic
-
- # 3. Local Outlier Factor (for density-based anomalies)
- lof = LOF(
- n_neighbors=20,
- contamination=0.01
- )
- lof.fit(scaled_data)
- self.models['lof'] = lof
-
- # 4. Vector Autoregression (for multivariate time series)
- var = VAR(training_data)
- var_model = var.fit(maxlags=3)
- self.models['var'] = var_model
-
- # Store training date
- self.last_training_date = datetime.now()
-
- def detect_anomalies(self, current_data):
- """Detect anomalies in new data points"""
- # Prepare features for current data
- current_features = self.prepare_features(current_data)
- scaled_data = self.scalers['main'].transform(current_features)
-
- # Get predictions from all models
- results = pd.DataFrame(index=current_features.index)
-
- # Isolation Forest
- results['iso_forest'] = self.models['isolation_forest'].predict(scaled_data)
-
- # Elliptic Envelope
- results['elliptic_env'] = self.models['elliptic_envelope'].predict(scaled_data)
-
- # LOF
- lof_scores = self.models['lof'].decision_function(scaled_data)
- results['lof_score'] = lof_scores
- results['lof_anomaly'] = (lof_scores < np.percentile(lof_scores, 1)).astype(int)
-
- # VAR model residuals
- var_pred = self.models['var'].forecast(current_features.values, 1)
- residuals = current_features.iloc[-1] - var_pred[0]
- mahalanobis_dist = np.sqrt(np.dot(np.dot(residuals, np.linalg.inv(self.models['var'].sigma_u)), residuals))
- results['var_mahalanobis'] = mahalanobis_dist
- results['var_anomaly'] = (mahalanobis_dist > np.percentile(results['var_mahalanobis'], 99)).astype(int)
-
- # Combined anomaly score
- results['combined_score'] = (
- (results['iso_forest'] == -1).astype(int) +
- (results['elliptic_env'] == -1).astype(int) +
- results['lof_anomaly'] +
- results['var_anomaly']
- )
-
- results['final_anomaly'] = (results['combined_score'] >= 2).astype(int)
-
- return results
-
- def visualize_results(self, prices, anomalies):
- """Visualize prices with anomalies marked"""
- plt.figure(figsize=(15, 10))
-
- # Plot prices
- for i, ticker in enumerate(self.tickers, 1):
- plt.subplot(len(self.tickers), 1, i)
- plt.plot(prices[ticker], label=ticker)
-
- # Mark anomalies
-
- anomaly_points = prices.loc[anomalies[anomalies['final_anomaly'] == 1].index]
- for point in anomaly_points:
- plt.axvline(x=point, color='r', alpha=0.3)
-
- plt.title(f'{ticker} Price with Anomalies Marked')
- plt.legend()
-
- plt.tight_layout()
- plt.show()
-
- # Plot correlation changes if available
- if len(self.correlation_matrices) > 0:
- last_corr = self.correlation_matrices.iloc[-1]
- plt.figure(figsize=(10, 8))
- sns.heatmap(last_corr, annot=True, cmap='coolwarm', center=0)
- plt.title('Latest Rolling Correlation Matrix')
- plt.show()
-
- def run_pipeline(self):
- """Complete training and detection pipeline"""
- print("Fetching training data...")
- training_data = self.fetch_data(days=30)
- training_features = self.prepare_features(training_data)
-
- print("Training models...")
- self.train_models(training_features)
-
- print("Fetching latest data for detection...")
- # Get data since last training point (plus some overlap)
- current_data = self.fetch_data(days=3)
- current_features = self.prepare_features(current_data)
-
- print("Detecting anomalies...")
- anomalies = self.detect_anomalies(current_data)
-
- print("\nAnomaly Detection Results:")
- print(anomalies[anomalies['final_anomaly'] == 1])
-
- self.visualize_results(current_data, anomalies)
-
- return anomalies
- if __name__ == "__main__":
- # Initialize with major cryptocurrencies
- detector = CryptoAnomalyDetector(tickers=['solana', 'dogecoin', 'ethereum', 'ripple', 'tron'])
-
- # Run complete pipeline
- results = detector.run_pipeline()
|