complex_anomaly.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import requests
  2. from datetime import datetime, timedelta
  3. import time
  4. import numpy as np
  5. import pandas as pd
  6. import yfinance as yf
  7. from datetime import datetime, timedelta
  8. from sklearn.ensemble import IsolationForest
  9. from sklearn.covariance import EllipticEnvelope
  10. from sklearn.preprocessing import RobustScaler
  11. from statsmodels.tsa.api import VAR
  12. from pyod.models.lof import LOF
  13. import matplotlib.pyplot as plt
  14. import seaborn as sns
  15. import warnings
  16. warnings.filterwarnings('ignore')
  17. class CryptoAnomalyDetector:
  18. def __init__(self, tickers=['bitcoin', 'ethereum', 'solana']):
  19. # CoinGecko uses lowercase names or IDs instead of tickers
  20. self.tickers = tickers
  21. self.coin_ids = tickers # For simplicity, using names as IDs
  22. self.models = {}
  23. self.scalers = {}
  24. self.thresholds = {}
  25. self.correlation_matrices = []
  26. self.last_training_date = None
  27. def fetch_data(self, days=30):
  28. """Fetch historical data using CoinGecko API"""
  29. base_url = "https://api.coingecko.com/api/v3"
  30. end_date = datetime.now()
  31. start_date = end_date - timedelta(days=days)
  32. # Convert to UNIX timestamps
  33. end_timestamp = int(end_date.timestamp())
  34. start_timestamp = int(start_date.timestamp())
  35. data = {}
  36. for coin_id in self.coin_ids:
  37. url = f"{base_url}/coins/{coin_id}/market_chart/range"
  38. params = {
  39. 'vs_currency': 'usd',
  40. # 'from': start_timestamp,
  41. # 'to': end_timestamp
  42. 'days': 30
  43. }
  44. # Add delay to avoid rate limiting
  45. time.sleep(1)
  46. try:
  47. response = requests.get(url, params=params)
  48. response.raise_for_status()
  49. coin_data = response.json()
  50. # Extract prices and create DataFrame
  51. prices = pd.DataFrame(coin_data['prices'], columns=['timestamp', 'price'])
  52. prices['date'] = pd.to_datetime(prices['timestamp'], unit='ms')
  53. prices.set_index('date', inplace=True)
  54. data[coin_id] = prices['price']
  55. except requests.exceptions.RequestException as e:
  56. print(f"Error fetching data for {coin_id}: {e}")
  57. data[coin_id] = pd.Series() # Empty series if error occurs
  58. # Combine all series into a DataFrame
  59. combined_data = pd.DataFrame(data)
  60. # Resample to daily data if needed (CoinGecko might return more granular data)
  61. combined_data = combined_data.resample('D').last().ffill()
  62. return combined_data.dropna()
  63. # ... [rest of the class remains the same] ...
  64. def prepare_features(self, data):
  65. """Create features for anomaly detection"""
  66. # Calculate log returns
  67. returns = np.log(data).diff().dropna()
  68. # Calculate rolling volatilities (3-day and 7-day)
  69. features = pd.DataFrame()
  70. for ticker in self.tickers:
  71. features[f'{ticker}_return'] = returns[ticker]
  72. features[f'{ticker}_vol_3d'] = returns[ticker].rolling(3).std()
  73. features[f'{ticker}_vol_7d'] = returns[ticker].rolling(7).std()
  74. # Add correlation features
  75. rolling_corr = returns.rolling(5).corr().dropna()
  76. self.correlation_matrices = rolling_corr.groupby(level=0).apply(lambda x: x)
  77. # Add market-wide volatility (average of all volatilities)
  78. features['market_vol'] = features.filter(regex='vol_3d').mean(axis=1)
  79. return features.dropna()
  80. def train_models(self, training_data):
  81. """Train multiple anomaly detection models"""
  82. # Robust scaling (better for crypto data)
  83. scaler = RobustScaler()
  84. scaled_data = scaler.fit_transform(training_data)
  85. self.scalers['main'] = scaler
  86. # 1. Isolation Forest (for point anomalies)
  87. iso_forest = IsolationForest(
  88. n_estimators=100,
  89. contamination=0.01,
  90. random_state=42
  91. )
  92. iso_forest.fit(scaled_data)
  93. self.models['isolation_forest'] = iso_forest
  94. # 2. Mahalanobis Distance (Elliptic Envelope)
  95. elliptic = EllipticEnvelope(
  96. contamination=0.01,
  97. random_state=42
  98. )
  99. elliptic.fit(scaled_data)
  100. self.models['elliptic_envelope'] = elliptic
  101. # 3. Local Outlier Factor (for density-based anomalies)
  102. lof = LOF(
  103. n_neighbors=20,
  104. contamination=0.01
  105. )
  106. lof.fit(scaled_data)
  107. self.models['lof'] = lof
  108. # 4. Vector Autoregression (for multivariate time series)
  109. var = VAR(training_data)
  110. var_model = var.fit(maxlags=3)
  111. self.models['var'] = var_model
  112. # Store training date
  113. self.last_training_date = datetime.now()
  114. def detect_anomalies(self, current_data):
  115. """Detect anomalies in new data points"""
  116. # Prepare features for current data
  117. current_features = self.prepare_features(current_data)
  118. scaled_data = self.scalers['main'].transform(current_features)
  119. # Get predictions from all models
  120. results = pd.DataFrame(index=current_features.index)
  121. # Isolation Forest
  122. results['iso_forest'] = self.models['isolation_forest'].predict(scaled_data)
  123. # Elliptic Envelope
  124. results['elliptic_env'] = self.models['elliptic_envelope'].predict(scaled_data)
  125. # LOF
  126. lof_scores = self.models['lof'].decision_function(scaled_data)
  127. results['lof_score'] = lof_scores
  128. results['lof_anomaly'] = (lof_scores < np.percentile(lof_scores, 1)).astype(int)
  129. # VAR model residuals
  130. var_pred = self.models['var'].forecast(current_features.values, 1)
  131. residuals = current_features.iloc[-1] - var_pred[0]
  132. mahalanobis_dist = np.sqrt(np.dot(np.dot(residuals, np.linalg.inv(self.models['var'].sigma_u)), residuals))
  133. results['var_mahalanobis'] = mahalanobis_dist
  134. results['var_anomaly'] = (mahalanobis_dist > np.percentile(results['var_mahalanobis'], 99)).astype(int)
  135. # Combined anomaly score
  136. results['combined_score'] = (
  137. (results['iso_forest'] == -1).astype(int) +
  138. (results['elliptic_env'] == -1).astype(int) +
  139. results['lof_anomaly'] +
  140. results['var_anomaly']
  141. )
  142. results['final_anomaly'] = (results['combined_score'] >= 2).astype(int)
  143. return results
  144. def visualize_results(self, prices, anomalies):
  145. """Visualize prices with anomalies marked"""
  146. plt.figure(figsize=(15, 10))
  147. # Plot prices
  148. for i, ticker in enumerate(self.tickers, 1):
  149. plt.subplot(len(self.tickers), 1, i)
  150. plt.plot(prices[ticker], label=ticker)
  151. # Mark anomalies
  152. anomaly_points = prices.loc[anomalies[anomalies['final_anomaly'] == 1].index]
  153. for point in anomaly_points:
  154. plt.axvline(x=point, color='r', alpha=0.3)
  155. plt.title(f'{ticker} Price with Anomalies Marked')
  156. plt.legend()
  157. plt.tight_layout()
  158. plt.show()
  159. # Plot correlation changes if available
  160. if len(self.correlation_matrices) > 0:
  161. last_corr = self.correlation_matrices.iloc[-1]
  162. plt.figure(figsize=(10, 8))
  163. sns.heatmap(last_corr, annot=True, cmap='coolwarm', center=0)
  164. plt.title('Latest Rolling Correlation Matrix')
  165. plt.show()
  166. def run_pipeline(self):
  167. """Complete training and detection pipeline"""
  168. print("Fetching training data...")
  169. training_data = self.fetch_data(days=30)
  170. training_features = self.prepare_features(training_data)
  171. print("Training models...")
  172. self.train_models(training_features)
  173. print("Fetching latest data for detection...")
  174. # Get data since last training point (plus some overlap)
  175. current_data = self.fetch_data(days=3)
  176. current_features = self.prepare_features(current_data)
  177. print("Detecting anomalies...")
  178. anomalies = self.detect_anomalies(current_data)
  179. print("\nAnomaly Detection Results:")
  180. print(anomalies[anomalies['final_anomaly'] == 1])
  181. self.visualize_results(current_data, anomalies)
  182. return anomalies
  183. if __name__ == "__main__":
  184. # Initialize with major cryptocurrencies
  185. detector = CryptoAnomalyDetector(tickers=['solana', 'dogecoin', 'ethereum', 'ripple', 'tron'])
  186. # Run complete pipeline
  187. results = detector.run_pipeline()