| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- import requests
- import pandas as pd
- import numpy as np
- from sklearn.ensemble import IsolationForest
- import time
- import schedule
- import smtplib
- from email.mime.text import MIMEText
- from email.mime.multipart import MIMEMultipart
- from datetime import datetime
- import os
- # Configuration
- CRYPTO = 'dogecoin' # CoinGecko ID for DOGE
- CURRENCY = 'usd' # Base currency
- CURRENT_PRICE_URL = f'https://api.coingecko.com/api/v3/simple/price?ids={CRYPTO}&vs_currencies={CURRENCY}'
- HISTORICAL_URL = f'https://api.coingecko.com/api/v3/coins/{CRYPTO}/market_chart?vs_currency={CURRENCY}&days=30 # &interval=hourly'
- CHECK_INTERVAL_SECONDS = 60 # Check every 60 seconds
- RETRAIN_INTERVAL_MINUTES = 60 # Retrain model every 60 minutes
- ANOMALY_THRESHOLD = -0.5 # Isolation Forest score < -0.5 indicates anomaly (range: -1 to 1)
- HISTORY_WINDOW = 1000 # Max historical points to keep (for memory efficiency)
- CSV_FILE = 'doge_price_history.csv' # For persistence
- # Email configuration (replace with your details)
- EMAIL_FROM = 'vortify-lc@algometic.com'
- EMAIL_TO = 'larry1chan@qq.com'
- EMAIL_PASSWORD = 'g33kPoppy!'
- SMTP_SERVER = 'hwsmtp.exmail.qq.com'
- SMTP_PORT = 465
- # Global variables
- price_df = pd.DataFrame(columns=['timestamp', 'price']) # Historical prices
- model = None # Isolation Forest model
- def fetch_historical_data():
- """Fetch historical hourly price data from CoinGecko."""
- try:
- response = requests.get(HISTORICAL_URL)
- response.raise_for_status()
- data = response.json()
- prices = data['prices'] # List of [timestamp_ms, price]
- df = pd.DataFrame(prices, columns=['timestamp', 'price'])
- df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
- return df
- except Exception as e:
- print(f"Error fetching historical data: {e}")
- return pd.DataFrame()
- def load_or_fetch_history():
- """Load history from CSV if exists, else fetch from API."""
- global price_df
- if os.path.exists(CSV_FILE):
- price_df = pd.read_csv(CSV_FILE, parse_dates=['timestamp'])
- print(f"Loaded {len(price_df)} historical points from CSV.")
- else:
- price_df = fetch_historical_data()
- if not price_df.empty:
- price_df.to_csv(CSV_FILE, index=False)
- print(f"Fetched and saved {len(price_df)} historical points.")
- # Trim to window size
- price_df = price_df.tail(HISTORY_WINDOW)
- def fetch_current_price():
- """Fetch current price from CoinGecko API."""
- try:
- response = requests.get(CURRENT_PRICE_URL)
- response.raise_for_status()
- data = response.json()
- price = data[CRYPTO][CURRENCY]
- timestamp = datetime.now()
- return timestamp, price
- except Exception as e:
- print(f"Error fetching current price: {e}")
- return None, None
- def engineer_features(df):
- """Engineer features for anomaly detection."""
- df = df.copy()
- df['pct_change'] = df['price'].pct_change() # Percentage change
- df['abs_diff'] = df['price'].diff() # Absolute difference
- df['rolling_mean_5'] = df['price'].rolling(window=5).mean() # Rolling mean
- df['rolling_std_5'] = df['price'].rolling(window=5).std() # Rolling std
- df['hour'] = df['timestamp'].dt.hour # Time of day
- # Fill NaNs with 0 or forward-fill for simplicity
- df.fillna(0, inplace=True)
- # Features to use (exclude timestamp and price for model input)
- features = ['pct_change', 'abs_diff', 'rolling_mean_5', 'rolling_std_5', 'hour']
- return df[features]
- def train_model():
- """Train or retrain Isolation Forest on historical features."""
- global model
- if len(price_df) < 10: # Need sufficient data
- print("Insufficient data to train model.")
- return
-
- features = engineer_features(price_df)
- model = IsolationForest(contamination=0.01, random_state=42) # Assume 1% anomalies
- model.fit(features)
- print("Model retrained successfully.")
- def detect_anomaly(timestamp, current_price):
- """Detect if current price is an anomaly using the model."""
- if model is None:
- return False, 0.0
-
- # Create a temporary DF with new point appended
- new_row = pd.DataFrame({'timestamp': [timestamp], 'price': [current_price]})
- temp_df = pd.concat([price_df, new_row], ignore_index=True)
- features = engineer_features(temp_df)
- # Predict on the latest point
- score = model.decision_function(features.tail(1))[0] # Score: higher = more normal
- is_anomaly = score < ANOMALY_THRESHOLD
- return is_anomaly, score
- def send_email_alert(timestamp, current_price, score):
- """Send email alert for anomaly."""
- subject = f"Anomaly Detected in {CRYPTO.upper()} Price!"
- body = f"""
- Alert Time: {timestamp}
- Current Price: ${current_price:.4f}
- Anomaly Score: {score:.2f} (Threshold: {ANOMALY_THRESHOLD})
- This indicates a sudden and sharp price fluctuation.
- """
-
- msg = MIMEMultipart()
- msg['From'] = EMAIL_FROM
- msg['To'] = EMAIL_TO
- msg['Subject'] = subject
- msg.attach(MIMEText(body, 'plain'))
-
- try:
- server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
- server.starttls()
- server.login(EMAIL_FROM, EMAIL_PASSWORD)
- server.sendmail(EMAIL_FROM, EMAIL_TO, msg.as_string())
- server.quit()
- print("Email alert sent successfully.")
- except Exception as e:
- print(f"Error sending email: {e}")
- def monitor():
- """Main monitoring function: Fetch price, check for anomaly, alert if needed."""
- global price_df
- timestamp, current_price = fetch_current_price()
- if current_price is None:
- return
-
- # Append new price to history
- new_row = pd.DataFrame({'timestamp': [timestamp], 'price': [current_price]})
- price_df = pd.concat([price_df, new_row], ignore_index=True)
- price_df = price_df.tail(HISTORY_WINDOW) # Keep window size
- price_df.to_csv(CSV_FILE, index=False) # Save updates
-
- is_anomaly, score = detect_anomaly(timestamp, current_price)
-
- print(f"{timestamp} - Current {CRYPTO.upper()} Price: ${current_price:.4f} | Anomaly Score: {score:.2f}")
-
- if is_anomaly:
- print(f"ANOMALY DETECTED! Score: {score:.2f}")
- send_email_alert(timestamp, current_price, score)
- def retrain_scheduler():
- """Scheduled task to retrain the model."""
- train_model()
- # Initialize
- load_or_fetch_history()
- train_model() # Initial training
- # Schedule tasks
- schedule.every(CHECK_INTERVAL_SECONDS).seconds.do(monitor)
- schedule.every(RETRAIN_INTERVAL_MINUTES).minutes.do(retrain_scheduler)
- # Run the monitoring loop
- print(f"Starting advanced {CRYPTO.upper()} price anomaly monitor with Isolation Forest...")
- while True:
- schedule.run_pending()
- time.sleep(1) # Avoid high CPU usage
|