import requests import pandas as pd import numpy as np from sklearn.ensemble import IsolationForest import time import schedule import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime import os # Configuration CRYPTO = 'dogecoin' # CoinGecko ID for DOGE CURRENCY = 'usd' # Base currency CURRENT_PRICE_URL = f'https://api.coingecko.com/api/v3/simple/price?ids={CRYPTO}&vs_currencies={CURRENCY}' HISTORICAL_URL = f'https://api.coingecko.com/api/v3/coins/{CRYPTO}/market_chart?vs_currency={CURRENCY}&days=30 # &interval=hourly' CHECK_INTERVAL_SECONDS = 60 # Check every 60 seconds RETRAIN_INTERVAL_MINUTES = 60 # Retrain model every 60 minutes ANOMALY_THRESHOLD = -0.5 # Isolation Forest score < -0.5 indicates anomaly (range: -1 to 1) HISTORY_WINDOW = 1000 # Max historical points to keep (for memory efficiency) CSV_FILE = 'doge_price_history.csv' # For persistence # Email configuration (replace with your details) EMAIL_FROM = 'vortify-lc@algometic.com' EMAIL_TO = 'larry1chan@qq.com' EMAIL_PASSWORD = 'g33kPoppy!' SMTP_SERVER = 'hwsmtp.exmail.qq.com' SMTP_PORT = 465 # Global variables price_df = pd.DataFrame(columns=['timestamp', 'price']) # Historical prices model = None # Isolation Forest model def fetch_historical_data(): """Fetch historical hourly price data from CoinGecko.""" try: response = requests.get(HISTORICAL_URL) response.raise_for_status() data = response.json() prices = data['prices'] # List of [timestamp_ms, price] df = pd.DataFrame(prices, columns=['timestamp', 'price']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') return df except Exception as e: print(f"Error fetching historical data: {e}") return pd.DataFrame() def load_or_fetch_history(): """Load history from CSV if exists, else fetch from API.""" global price_df if os.path.exists(CSV_FILE): price_df = pd.read_csv(CSV_FILE, parse_dates=['timestamp']) print(f"Loaded {len(price_df)} historical points from CSV.") else: price_df = fetch_historical_data() if not price_df.empty: price_df.to_csv(CSV_FILE, index=False) print(f"Fetched and saved {len(price_df)} historical points.") # Trim to window size price_df = price_df.tail(HISTORY_WINDOW) def fetch_current_price(): """Fetch current price from CoinGecko API.""" try: response = requests.get(CURRENT_PRICE_URL) response.raise_for_status() data = response.json() price = data[CRYPTO][CURRENCY] timestamp = datetime.now() return timestamp, price except Exception as e: print(f"Error fetching current price: {e}") return None, None def engineer_features(df): """Engineer features for anomaly detection.""" df = df.copy() df['pct_change'] = df['price'].pct_change() # Percentage change df['abs_diff'] = df['price'].diff() # Absolute difference df['rolling_mean_5'] = df['price'].rolling(window=5).mean() # Rolling mean df['rolling_std_5'] = df['price'].rolling(window=5).std() # Rolling std df['hour'] = df['timestamp'].dt.hour # Time of day # Fill NaNs with 0 or forward-fill for simplicity df.fillna(0, inplace=True) # Features to use (exclude timestamp and price for model input) features = ['pct_change', 'abs_diff', 'rolling_mean_5', 'rolling_std_5', 'hour'] return df[features] def train_model(): """Train or retrain Isolation Forest on historical features.""" global model if len(price_df) < 10: # Need sufficient data print("Insufficient data to train model.") return features = engineer_features(price_df) model = IsolationForest(contamination=0.01, random_state=42) # Assume 1% anomalies model.fit(features) print("Model retrained successfully.") def detect_anomaly(timestamp, current_price): """Detect if current price is an anomaly using the model.""" if model is None: return False, 0.0 # Create a temporary DF with new point appended new_row = pd.DataFrame({'timestamp': [timestamp], 'price': [current_price]}) temp_df = pd.concat([price_df, new_row], ignore_index=True) features = engineer_features(temp_df) # Predict on the latest point score = model.decision_function(features.tail(1))[0] # Score: higher = more normal is_anomaly = score < ANOMALY_THRESHOLD return is_anomaly, score def send_email_alert(timestamp, current_price, score): """Send email alert for anomaly.""" subject = f"Anomaly Detected in {CRYPTO.upper()} Price!" body = f""" Alert Time: {timestamp} Current Price: ${current_price:.4f} Anomaly Score: {score:.2f} (Threshold: {ANOMALY_THRESHOLD}) This indicates a sudden and sharp price fluctuation. """ msg = MIMEMultipart() msg['From'] = EMAIL_FROM msg['To'] = EMAIL_TO msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) try: server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT) server.starttls() server.login(EMAIL_FROM, EMAIL_PASSWORD) server.sendmail(EMAIL_FROM, EMAIL_TO, msg.as_string()) server.quit() print("Email alert sent successfully.") except Exception as e: print(f"Error sending email: {e}") def monitor(): """Main monitoring function: Fetch price, check for anomaly, alert if needed.""" global price_df timestamp, current_price = fetch_current_price() if current_price is None: return # Append new price to history new_row = pd.DataFrame({'timestamp': [timestamp], 'price': [current_price]}) price_df = pd.concat([price_df, new_row], ignore_index=True) price_df = price_df.tail(HISTORY_WINDOW) # Keep window size price_df.to_csv(CSV_FILE, index=False) # Save updates is_anomaly, score = detect_anomaly(timestamp, current_price) print(f"{timestamp} - Current {CRYPTO.upper()} Price: ${current_price:.4f} | Anomaly Score: {score:.2f}") if is_anomaly: print(f"ANOMALY DETECTED! Score: {score:.2f}") send_email_alert(timestamp, current_price, score) def retrain_scheduler(): """Scheduled task to retrain the model.""" train_model() # Initialize load_or_fetch_history() train_model() # Initial training # Schedule tasks schedule.every(CHECK_INTERVAL_SECONDS).seconds.do(monitor) schedule.every(RETRAIN_INTERVAL_MINUTES).minutes.do(retrain_scheduler) # Run the monitoring loop print(f"Starting advanced {CRYPTO.upper()} price anomaly monitor with Isolation Forest...") while True: schedule.run_pending() time.sleep(1) # Avoid high CPU usage