strange_cryptos.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. ### Python Code
  2. import requests
  3. import pandas as pd
  4. import numpy as np
  5. import time
  6. import schedule
  7. import smtplib
  8. from email.mime.text import MIMEText
  9. from email.mime.multipart import MIMEMultipart
  10. from datetime import datetime
  11. # Configuration
  12. CRYPTO_LIST = ['solana', 'dogecoin', 'ethereum', 'ripple', 'tron'] # CoinGecko IDs
  13. CRYPTO_LABELS = {
  14. 'solana': 'SOL',
  15. 'dogecoin': 'DOGE',
  16. 'ethereum': 'ETH',
  17. 'ripple': 'XRP',
  18. 'tron': 'TRX'
  19. }
  20. CURRENCY = 'usd'
  21. API_URL = 'https://api.coingecko.com/api/v3/simple/price'
  22. CHECK_INTERVAL_SECONDS = 60 # Check every 60 seconds
  23. Z_SCORE_THRESHOLD = 3.0
  24. HISTORY_WINDOW = 100
  25. EMAIL_FROM = 'vortify-lc@algometic.com'
  26. EMAIL_TO = 'larry1chan@qq.com'
  27. EMAIL_PASSWORD = 'g33kPoppy!'
  28. SMTP_SERVER = 'hwsmtp.exmail.qq.com'
  29. SMTP_PORT = 465
  30. # In-memory storage for historical prices per crypto
  31. price_history = {crypto: [] for crypto in CRYPTO_LIST}
  32. def fetch_prices():
  33. """Fetch current prices for all cryptos from CoinGecko API."""
  34. try:
  35. ids = ','.join(CRYPTO_LIST)
  36. params = {'ids': ids, 'vs_currencies': CURRENCY}
  37. response = requests.get(API_URL, params=params)
  38. response.raise_for_status()
  39. data = response.json()
  40. return {crypto: data.get(crypto, {}).get(CURRENCY) for crypto in CRYPTO_LIST}
  41. except Exception as e:
  42. print(f"Error fetching prices: {e}")
  43. return {}
  44. def detect_anomaly(crypto, current_price):
  45. """Detect if the current price is an anomaly based on Z-score."""
  46. history = price_history[crypto]
  47. if len(history) < 2:
  48. return False, 0.0
  49. df = pd.DataFrame(history, columns=['price'])
  50. mean = df['price'].mean()
  51. std_dev = df['price'].std()
  52. if std_dev == 0:
  53. return False, 0.0
  54. z_score = (current_price - mean) / std_dev
  55. is_anomaly = abs(z_score) > Z_SCORE_THRESHOLD
  56. return is_anomaly, z_score
  57. def send_email_alert(crypto, current_price, z_score):
  58. """Send email alert for anomaly."""
  59. symbol = CRYPTO_LABELS.get(crypto, crypto.upper())
  60. subject = f"Anomaly Detected in {symbol} Price!"
  61. body = f"""
  62. Alert Time: {datetime.now()}
  63. Crypto: {symbol}
  64. Current Price: ${current_price:.4f}
  65. Z-Score: {z_score:.2f} (Threshold: {Z_SCORE_THRESHOLD})
  66. This indicates a potential anomalous movement (spike or drop).
  67. """
  68. msg = MIMEMultipart()
  69. msg['From'] = EMAIL_FROM
  70. msg['To'] = EMAIL_TO
  71. msg['Subject'] = subject
  72. msg.attach(MIMEText(body, 'plain'))
  73. try:
  74. server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
  75. server.starttls()
  76. server.login(EMAIL_FROM, EMAIL_PASSWORD)
  77. server.sendmail(EMAIL_FROM, EMAIL_TO, msg.as_string())
  78. server.quit()
  79. print(f"Email alert sent for {symbol}.")
  80. except Exception as e:
  81. e.printstacktrace()
  82. print(f"Error sending email for {symbol}: {e}")
  83. def monitor():
  84. """Main monitoring function: Fetch prices, check for anomaly, alert if needed."""
  85. current_prices = fetch_prices()
  86. for crypto, price in current_prices.items():
  87. if price is None:
  88. continue
  89. # Append to history (maintain fixed window size)
  90. price_history[crypto].append(price)
  91. if len(price_history[crypto]) > HISTORY_WINDOW:
  92. price_history[crypto].pop(0)
  93. is_anomaly, z_score = detect_anomaly(crypto, price)
  94. symbol = CRYPTO_LABELS.get(crypto, crypto.upper())
  95. print(f"{datetime.now()} - {symbol} Price: ${price:.4f} | Z-Score: {z_score:.2f}")
  96. if is_anomaly:
  97. print(f"ANOMALY DETECTED for {symbol}! Z-Score: {z_score:.2f}")
  98. send_email_alert(crypto, price, z_score)
  99. # Schedule the monitor to run every CHECK_INTERVAL_SECONDS
  100. schedule.every(CHECK_INTERVAL_SECONDS).seconds.do(monitor)
  101. print("Starting multi-crypto price anomaly monitor...")
  102. while True:
  103. schedule.run_pending()
  104. time.sleep(1)