| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- import requests
- import pandas as pd
- import numpy as np
- import time
- import schedule
- import smtplib
- from email.mime.text import MIMEText
- from email.mime.multipart import MIMEMultipart
- from datetime import datetime
- import threading
- import os
- from flask import Flask, render_template_string, jsonify
- # Configuration
- CRYPTO_LIST = ['solana', 'dogecoin', 'ethereum', 'ripple', 'tron'] # CoinGecko IDs
- CRYPTO_LABELS = {
- 'solana': 'SOL',
- 'dogecoin': 'DOGE',
- 'ethereum': 'ETH',
- 'ripple': 'XRP',
- 'tron': 'TRX'
- }
- CURRENCY = 'usd'
- API_URL = 'https://api.coingecko.com/api/v3/simple/price'
- CHECK_INTERVAL_SECONDS = 60 # Check every 60 seconds
- Z_SCORE_THRESHOLD = 3.0
- HISTORY_WINDOW = 100
- EMAIL_FROM = 'vortify-lc@algometic.com'
- EMAIL_TO = 'larry1chan@qq.com'
- EMAIL_PASSWORD = 'g33kPoppy!'
- SMTP_SERVER = 'hwsmtp.exmail.qq.com'
- SMTP_PORT = 465
- DATA_FILE = "crypto_history.csv"
- # In-memory storage for historical prices and z-scores per crypto
- price_history = {crypto: [] for crypto in CRYPTO_LIST}
- zscore_history = {crypto: [] for crypto in CRYPTO_LIST}
- timestamp_history = []
- def fetch_prices():
- """Fetch current prices for all cryptos from CoinGecko API."""
- try:
- ids = ','.join(CRYPTO_LIST)
- params = {'ids': ids, 'vs_currencies': CURRENCY}
- response = requests.get(API_URL, params=params)
- response.raise_for_status()
- data = response.json()
- return {crypto: data.get(crypto, {}).get(CURRENCY) for crypto in CRYPTO_LIST}
- except Exception as e:
- print(f"Error fetching prices: {e}")
- return {}
- def detect_anomaly(crypto, current_price):
- """Detect if the current price is an anomaly based on Z-score."""
- history = price_history[crypto]
- if len(history) < 2:
- return False, 0.0
- df = pd.DataFrame(history, columns=['price'])
- mean = df['price'].mean()
- std_dev = df['price'].std()
- if std_dev == 0:
- return False, 0.0
- z_score = (current_price - mean) / std_dev
- is_anomaly = abs(z_score) > Z_SCORE_THRESHOLD
- return is_anomaly, z_score
- def send_email_alert(crypto, current_price, z_score):
- """Send email alert for anomaly."""
- symbol = CRYPTO_LABELS.get(crypto, crypto.upper())
- subject = f"Anomaly Detected in {symbol} Price!"
- body = f"""
- Alert Time: {datetime.now()}
- Crypto: {symbol}
- Current Price: ${current_price:.4f}
- Z-Score: {z_score:.2f} (Threshold: {Z_SCORE_THRESHOLD})
- This indicates a potential anomalous movement (spike or drop).
- """
- msg = MIMEMultipart()
- msg['From'] = EMAIL_FROM
- msg['To'] = EMAIL_TO
- msg['Subject'] = subject
- msg.attach(MIMEText(body, 'plain'))
- try:
- server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
- server.starttls()
- server.login(EMAIL_FROM, EMAIL_PASSWORD)
- server.sendmail(EMAIL_FROM, EMAIL_TO, msg.as_string())
- server.quit()
- print(f"Email alert sent for {symbol}.")
- except Exception as e:
- print(f"Error sending email for {symbol}: {e}")
- def save_history():
- """Save the price and z-score history to a CSV file."""
- if not timestamp_history:
- return
- rows = []
- for idx, ts in enumerate(timestamp_history):
- row = {'timestamp': ts}
- for crypto in CRYPTO_LIST:
- row[f'{crypto}_price'] = price_history[crypto][idx] if idx < len(price_history[crypto]) else ''
- row[f'{crypto}_zscore'] = zscore_history[crypto][idx] if idx < len(zscore_history[crypto]) else ''
- rows.append(row)
- df = pd.DataFrame(rows)
- df.to_csv(DATA_FILE, index=False)
- def monitor():
- """Main monitoring function: Fetch prices, check for anomaly, alert if needed, and save history."""
- current_prices = fetch_prices()
- now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- timestamp_history.append(now)
- for crypto, price in current_prices.items():
- if price is None:
- price_history[crypto].append(np.nan)
- zscore_history[crypto].append(np.nan)
- continue
- # Append to history (maintain fixed window size)
- price_history[crypto].append(price)
- if len(price_history[crypto]) > HISTORY_WINDOW:
- price_history[crypto].pop(0)
- is_anomaly, z_score = detect_anomaly(crypto, price)
- zscore_history[crypto].append(z_score)
- if len(zscore_history[crypto]) > HISTORY_WINDOW:
- zscore_history[crypto].pop(0)
- symbol = CRYPTO_LABELS.get(crypto, crypto.upper())
- print(f"{now} - {symbol} Price: ${price:.4f} | Z-Score: {z_score:.2f}")
- if is_anomaly:
- print(f"ANOMALY DETECTED for {symbol}! Z-Score: {z_score:.2f}")
- send_email_alert(crypto, price, z_score)
- # Keep timestamp_history in sync
- if len(timestamp_history) > HISTORY_WINDOW:
- timestamp_history.pop(0)
- save_history()
- # --- Web server for visualization ---
- app = Flask(__name__)
- HTML_TEMPLATE = """
- <!DOCTYPE html>
- <html>
- <head>
- <title>Crypto Z-Score & Price Monitor</title>
- <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
- </head>
- <body>
- <h2>Crypto Z-Score & Price Monitor</h2>
- <div id="charts"></div>
- <script>
- async function fetchData() {
- const resp = await fetch('/data');
- return await resp.json();
- }
- function renderCharts(data) {
- const container = document.getElementById('charts');
- container.innerHTML = '';
- for (const crypto of data.cryptos) {
- const label = data.labels[crypto];
- const canvasPrice = document.createElement('canvas');
- const canvasZ = document.createElement('canvas');
- container.appendChild(document.createElement('hr'));
- container.appendChild(document.createTextNode(label + " Price"));
- container.appendChild(canvasPrice);
- container.appendChild(document.createTextNode(label + " Z-Score"));
- container.appendChild(canvasZ);
- new Chart(canvasPrice, {
- type: 'line',
- data: {
- labels: data.timestamps,
- datasets: [{
- label: label + ' Price',
- data: data.prices[crypto],
- borderColor: 'blue',
- fill: false
- }]
- }
- });
- new Chart(canvasZ, {
- type: 'line',
- data: {
- labels: data.timestamps,
- datasets: [{
- label: label + ' Z-Score',
- data: data.zscores[crypto],
- borderColor: 'red',
- fill: false
- }]
- }
- });
- }
- }
- fetchData().then(renderCharts);
- setInterval(() => fetchData().then(renderCharts), 60000);
- </script>
- </body>
- </html>
- """
- @app.route('/')
- def index():
- return render_template_string(HTML_TEMPLATE)
- @app.route('/data')
- def data():
- # Prepare data for chart
- return jsonify({
- 'cryptos': CRYPTO_LIST,
- 'labels': CRYPTO_LABELS,
- 'timestamps': timestamp_history,
- 'prices': price_history,
- 'zscores': zscore_history
- })
- def start_flask():
- app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False)
- # --- Main ---
- def main():
- # Start web server in a separate thread
- threading.Thread(target=start_flask, daemon=True).start()
- # Schedule the monitor to run every CHECK_INTERVAL_SECONDS
- schedule.every(CHECK_INTERVAL_SECONDS).seconds.do(monitor)
- print("Starting multi-crypto price anomaly monitor and web server on http://localhost:5000 ...")
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == "__main__":
- main()
|