strange_doge_rf2.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import requests
  2. import pandas as pd
  3. import numpy as np
  4. from sklearn.ensemble import IsolationForest
  5. from datetime import datetime
  6. import os
  7. # Configuration
  8. CRYPTO = 'solana' # CoinGecko ID for DOGE
  9. CURRENCY = 'usd' # Base currency
  10. HISTORICAL_URL = f'https://api.coingecko.com/api/v3/coins/{CRYPTO}/market_chart?vs_currency={CURRENCY}&days=30 #&interval=hourly'
  11. HISTORY_WINDOW = 1000 # Max historical points to keep (for memory efficiency)
  12. CSV_FILE = '%s_price_history.csv' % CRYPTO # For loading historical data
  13. # Global variables
  14. price_df = pd.DataFrame(columns=['timestamp', 'price']) # Historical prices
  15. model = None # Isolation Forest model
  16. def fetch_historical_data():
  17. """Fetch historical hourly price data from CoinGecko."""
  18. try:
  19. response = requests.get(HISTORICAL_URL)
  20. response.raise_for_status()
  21. data = response.json()
  22. prices = data['prices'] # List of [timestamp_ms, price]
  23. df = pd.DataFrame(prices, columns=['timestamp', 'price'])
  24. df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
  25. return df
  26. except Exception as e:
  27. print(f"Error fetching historical data: {e}")
  28. return pd.DataFrame()
  29. def load_or_fetch_history():
  30. """Load history from CSV if exists, else fetch from API."""
  31. global price_df
  32. if os.path.exists(CSV_FILE):
  33. price_df = pd.read_csv(CSV_FILE, parse_dates=['timestamp'])
  34. print(f"Loaded {len(price_df)} historical points from CSV.")
  35. else:
  36. price_df = fetch_historical_data()
  37. if not price_df.empty:
  38. price_df.to_csv(CSV_FILE, index=False)
  39. print(f"Fetched and saved {len(price_df)} historical points.")
  40. # Trim to window size
  41. price_df = price_df.tail(HISTORY_WINDOW)
  42. def engineer_features(df):
  43. """Engineer features for anomaly detection."""
  44. df = df.copy()
  45. df['pct_change'] = df['price'].pct_change() # Percentage change
  46. df['abs_diff'] = df['price'].diff() # Absolute difference
  47. df['rolling_mean_5'] = df['price'].rolling(window=5).mean() # Rolling mean
  48. df['rolling_std_5'] = df['price'].rolling(window=5).std() # Rolling std
  49. df['hour'] = df['timestamp'].dt.hour # Time of day
  50. df.fillna(0, inplace=True)
  51. features = ['pct_change', 'abs_diff', 'rolling_mean_5', 'rolling_std_5', 'hour']
  52. return df[features]
  53. def train_model(train_df):
  54. """Train Isolation Forest on given features."""
  55. if len(train_df) < 10:
  56. print("Insufficient data to train model.")
  57. return None
  58. features = engineer_features(train_df)
  59. model = IsolationForest(contamination=0.01, random_state=42)
  60. model.fit(features)
  61. return model
  62. def get_anomaly_score(model, test_df, index):
  63. """Get anomaly score for a specific point in the test DF."""
  64. # To simulate "appending" without modifying, create temp DF up to this index
  65. temp_df = test_df.iloc[:index + 1] # Includes all prior test points up to this one
  66. features = engineer_features(temp_df)
  67. score = model.decision_function(features.tail(1))[0]
  68. return score
  69. # Sensitivity Analysis
  70. def run_sensitivity_analysis(thresholds=[-0.2, -0.1,-0.07, -0.05, -0.03, -0.01, 0.1], precision=0.0001, max_iterations=100):
  71. """Compute upper/lower price bounds for anomalies across multiple thresholds."""
  72. if model is None:
  73. print("Model not trained yet. Cannot run sensitivity analysis.")
  74. return
  75. if price_df.empty:
  76. print("No historical data available.")
  77. return
  78. current_price = price_df['price'].iloc[-1]
  79. timestamp = datetime.now()
  80. def find_bounds(threshold):
  81. def is_anomaly_func(price):
  82. # Simulate detection with given threshold
  83. new_row = pd.DataFrame({'timestamp': [timestamp], 'price': [price]})
  84. temp_df = pd.concat([price_df, new_row], ignore_index=True)
  85. features = engineer_features(temp_df)
  86. score = model.decision_function(features.tail(1))[0]
  87. return score < threshold
  88. # Lower bound
  89. low = max(0, current_price * 0.5)
  90. high = current_price
  91. lower_bound = None
  92. for _ in range(max_iterations):
  93. mid = (low + high) / 2
  94. if is_anomaly_func(mid):
  95. lower_bound = mid
  96. high = mid
  97. else:
  98. low = mid
  99. if high - low < precision:
  100. break
  101. # Upper bound
  102. low = current_price
  103. high = current_price * 2
  104. upper_bound = None
  105. for _ in range(max_iterations):
  106. mid = (low + high) / 2
  107. if is_anomaly_func(mid):
  108. upper_bound = mid
  109. low = mid
  110. else:
  111. high = mid
  112. if high - low < precision:
  113. break
  114. return lower_bound, upper_bound
  115. print("Sensitivity Analysis: Anomaly Price Bounds for Different Thresholds")
  116. print(f"Based on last historical price: ${current_price:.4f}")
  117. for thresh in thresholds:
  118. lower, upper = find_bounds(thresh)
  119. print(f"\nThreshold {thresh}:")
  120. if lower is not None:
  121. print(f" - Prices BELOW ~${lower:.4f} would trigger anomaly.")
  122. else:
  123. print(" - No lower bound found.")
  124. if upper is not None:
  125. print(f" - Prices ABOVE ~${upper:.4f} would trigger anomaly.")
  126. else:
  127. print(" - No upper bound found.")
  128. # Backtesting
  129. def run_backtest(thresholds=[-0.7, -0.5, -0.3], test_fraction=0.2):
  130. """Backtest the model on a holdout set, reporting flagged anomalies per threshold."""
  131. if len(price_df) < 20:
  132. print("Insufficient data for backtesting.")
  133. return
  134. # Split data: Train on first (1 - test_fraction), test on last test_fraction
  135. split_idx = int(len(price_df) * (1 - test_fraction))
  136. train_df = price_df.iloc[:split_idx]
  137. test_df = price_df.iloc[split_idx:].reset_index(drop=True)
  138. backtest_model = train_model(train_df)
  139. if backtest_model is None:
  140. return
  141. print(f"Backtesting on {len(test_df)} holdout points (trained on {len(train_df)} points).")
  142. # Score each test point sequentially
  143. scores = []
  144. for i in range(len(test_df)):
  145. score = get_anomaly_score(backtest_model, test_df, i)
  146. scores.append(score)
  147. for thresh in thresholds:
  148. flagged = [i for i, score in enumerate(scores) if score < thresh]
  149. flagged_pct = (len(flagged) / len(test_df)) * 100 if len(test_df) > 0 else 0
  150. print(f"\nThreshold {thresh}: {len(flagged)} points flagged as anomalies ({flagged_pct:.2f}%)")
  151. if flagged:
  152. print("Flagged points (timestamp, price):")
  153. for idx in flagged[:10]: # Limit to first 10 for brevity
  154. ts = test_df['timestamp'].iloc[idx]
  155. price = test_df['price'].iloc[idx]
  156. print(f" - {ts}: ${price:.4f}")
  157. if len(flagged) > 10:
  158. print(" ... (more flagged points omitted)")
  159. # Main Execution
  160. load_or_fetch_history()
  161. model = train_model(price_df) # Train on full data for sensitivity analysis
  162. # Interactive CLI for analysis
  163. print(f"{CRYPTO.upper()} price anomaly analyzer loaded with Isolation Forest.")
  164. print("Enter 'sensitivity' for sensitivity analysis, 'backtest' for backtesting, or 'quit' to exit.")
  165. while True:
  166. user_input = input("> ").strip().lower()
  167. if user_input == 'quit':
  168. print("Exiting...")
  169. break
  170. elif user_input == 'sensitivity':
  171. run_sensitivity_analysis()
  172. elif user_input == 'backtest':
  173. run_backtest()
  174. else:
  175. print("Unknown command. Use 'sensitivity', 'backtest', or 'quit'.")