import pandas as pd import numpy as np import yfinance as yf import matplotlib.pyplot as plt import seaborn as sns from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA from sklearn.cluster import KMeans from sklearn.ensemble import IsolationForest from sklearn.svm import OneClassSVM from sklearn.metrics import silhouette_score from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore') class CryptoAnalyzer: def __init__(self): # Top traded cryptocurrencies (using their Yahoo Finance tickers) self.crypto_tickers = [ 'BTC-USD', 'ETH-USD', 'BNB-USD', 'XRP-USD', 'ADA-USD', 'DOGE-USD', 'SOL-USD', 'DOT-USD', 'MATIC-USD', 'LTC-USD', 'SHIB-USD', 'TRX-USD', 'AVAX-USD', 'UNI-USD', 'LINK-USD' ] self.data = {} self.scaler = StandardScaler() self.pca = PCA(n_components=2) self.kmeans = None self.isolation_forest = None self.one_class_svm = None def download_data(self, period='1y', interval='1d'): """ Download historical price data for cryptocurrencies """ print("Downloading cryptocurrency data...") for ticker in self.crypto_tickers: try: crypto_data = yf.download(ticker, period=period, interval=interval) if not crypto_data.empty: self.data[ticker] = crypto_data print(f"✓ Downloaded data for {ticker}") else: print(f"✗ No data for {ticker}") except Exception as e: print(f"✗ Error downloading {ticker}: {str(e)}") print(f"Successfully downloaded data for {len(self.data)} cryptocurrencies\n") return self.data def preprocess_data(self): """ Preprocess the downloaded data for analysis """ print("Preprocessing data...") # Create features for each cryptocurrency features_list = [] returns_list = [] for ticker, df in self.data.items(): # Ensure we're working with single columns by selecting specific column names if isinstance(df['Close'], pd.DataFrame): close_prices = df['Close'].iloc[:, 0] # Take first column if multiple else: close_prices = df['Close'] if isinstance(df['High'], pd.DataFrame): high_prices = df['High'].iloc[:, 0] # Take first column if multiple else: high_prices = df['High'] if isinstance(df['Low'], pd.DataFrame): low_prices = df['Low'].iloc[:, 0] # Take first column if multiple else: low_prices = df['Low'] if isinstance(df['Volume'], pd.DataFrame): volume_data = df['Volume'].iloc[:, 0] # Take first column if multiple else: volume_data = df['Volume'] # Calculate returns and technical indicators df['Returns'] = close_prices.pct_change() df['Volatility'] = df['Returns'].rolling(window=30).std() df['MA_7'] = close_prices.rolling(window=7).mean() df['MA_30'] = close_prices.rolling(window=30).mean() df['Price_MA_Ratio'] = close_prices / df['MA_30'] df['Volume_MA'] = volume_data.rolling(window=30).mean() df['Volume_Ratio'] = volume_data / df['Volume_MA'] # Get the latest data point for each cryptocurrency latest_data = df.iloc[-1] features = { 'Ticker': ticker, 'Close_Price': close_prices.iloc[-1], 'Returns': latest_data['Returns'], 'Volatility': latest_data['Volatility'], 'Price_MA_Ratio': latest_data['Price_MA_Ratio'], 'Volume_Ratio': latest_data['Volume_Ratio'], 'High_Low_Ratio': high_prices.iloc[-1] / low_prices.iloc[-1] } features_list.append(features) # Store returns for similarity analysis returns_data = df['Returns'].dropna() returns_list.append({ 'ticker': ticker, 'returns': returns_data, 'mean_return': returns_data.mean(), 'std_return': returns_data.std() }) self.features_df = pd.DataFrame(features_list) self.returns_data = returns_list # Prepare numerical features for clustering and anomaly detection self.numerical_features = ['Close_Price', 'Returns', 'Volatility', 'Price_MA_Ratio', 'Volume_Ratio', 'High_Low_Ratio'] # Remove rows with NaN values self.features_df = self.features_df.dropna() print("Data preprocessing completed\n") return self.features_df def similarity_analysis(self): """ Perform similarity analysis between cryptocurrencies """ print("Performing similarity analysis...") # Calculate correlation matrix based on returns returns_df = pd.DataFrame() for item in self.returns_data: returns_df[item['ticker']] = item['returns'].tail(252) # Last year of data # Handle missing values by forward filling returns_df = returns_df.fillna(method='ffill').fillna(method='bfill') # Calculate correlation matrix self.correlation_matrix = returns_df.corr() # Display top similar pairs print("Top similar cryptocurrency pairs (based on correlation):") similar_pairs = [] for i in range(len(self.correlation_matrix.columns)): for j in range(i+1, len(self.correlation_matrix.columns)): ticker1 = self.correlation_matrix.columns[i] ticker2 = self.correlation_matrix.columns[j] correlation = self.correlation_matrix.iloc[i, j] similar_pairs.append((ticker1, ticker2, correlation)) # Sort by correlation similar_pairs.sort(key=lambda x: x[2], reverse=True) print("Top 10 most similar pairs:") for i, (t1, t2, corr) in enumerate(similar_pairs[:10]): print(f"{i+1}. {t1} - {t2}: {corr:.3f}") # Visualize correlation matrix plt.figure(figsize=(12, 10)) sns.heatmap(self.correlation_matrix, annot=True, cmap='coolwarm', center=0, fmt='.2f', square=True) plt.title('Cryptocurrency Returns Correlation Matrix') plt.tight_layout() plt.show() print("\nSimilarity analysis completed\n") return self.correlation_matrix def build_anomaly_detection_models(self): """ Build and train anomaly detection models """ print("Building anomaly detection models...") # Prepare data for anomaly detection X = self.features_df[self.numerical_features].values X_scaled = self.scaler.fit_transform(X) # 1. K-Means Clustering for outlier detection self.kmeans = KMeans(n_clusters=3, random_state=42) cluster_labels = self.kmeans.fit_predict(X_scaled) self.features_df['Cluster'] = cluster_labels # 2. Isolation Forest self.isolation_forest = IsolationForest(contamination=0.1, random_state=42) isolation_predictions = self.isolation_forest.fit_predict(X_scaled) self.features_df['Isolation_Anomaly'] = isolation_predictions # 3. One-Class SVM self.one_class_svm = OneClassSVM(nu=0.1) svm_predictions = self.one_class_svm.fit_predict(X_scaled) self.features_df['SVM_Anomaly'] = svm_predictions # Combine anomaly detection results self.features_df['Anomaly_Score'] = ( (self.features_df['Isolation_Anomaly'] == -1).astype(int) + (self.features_df['SVM_Anomaly'] == -1).astype(int) ) # Anomalies are detected if 2 or more models flag them self.features_df['Is_Anomaly'] = self.features_df['Anomaly_Score'] >= 1 print("Anomaly detection models built successfully") # Display anomalies found anomalies = self.features_df[self.features_df['Is_Anomaly']] if len(anomalies) > 0: print(f"\nDetected {len(anomalies)} potential anomalies:") for _, row in anomalies.iterrows(): print(f" • {row['Ticker']}: Anomaly Score = {row['Anomaly_Score']}") else: print("No anomalies detected in current data") print("\nModel building completed\n") return self.features_df def detect_new_anomalies(self, new_data_point): """ Detect anomalies in new incoming data """ print("Detecting anomalies in new data...") # Preprocess new data point (assuming it's in the same format) new_scaled = self.scaler.transform([new_data_point]) # Apply all models kmeans_pred = self.kmeans.predict(new_scaled)[0] isolation_pred = self.isolation_forest.predict(new_scaled)[0] svm_pred = self.one_class_svm.predict(new_scaled)[0] # Calculate anomaly score anomaly_score = (isolation_pred == -1) + (svm_pred == -1) is_anomaly = anomaly_score >= 1 result = { 'Cluster': kmeans_pred, 'Isolation_Prediction': 'Anomaly' if isolation_pred == -1 else 'Normal', 'SVM_Prediction': 'Anomaly' if svm_pred == -1 else 'Normal', 'Anomaly_Score': anomaly_score, 'Is_Anomaly': is_anomaly } print(f"New data point analysis:") print(f" Cluster: {result['Cluster']}") print(f" Isolation Forest: {result['Isolation_Prediction']}") print(f" One-Class SVM: {result['SVM_Prediction']}") print(f" Overall: {'ANOMALY' if is_anomaly else 'NORMAL'}") return result def visualize_results(self): """ Visualize the results of the analysis """ print("Generating visualizations...") # 1. PCA visualization of clusters X_scaled = self.scaler.transform(self.features_df[self.numerical_features]) X_pca = self.pca.fit_transform(X_scaled) plt.figure(figsize=(15, 5)) # Plot 1: Clustering results plt.subplot(1, 3, 1) scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=self.features_df['Cluster'], cmap='viridis', alpha=0.7) plt.colorbar(scatter) plt.title('Cryptocurrency Clustering (PCA)') plt.xlabel('First Principal Component') plt.ylabel('Second Principal Component') # Plot 2: Anomalies vs Normal points plt.subplot(1, 3, 2) colors = ['red' if x else 'blue' for x in self.features_df['Is_Anomaly']] plt.scatter(X_pca[:, 0], X_pca[:, 1], c=colors, alpha=0.7) plt.title('Anomaly Detection Results') plt.xlabel('First Principal Component') plt.ylabel('Second Principal Component') # Plot 3: Feature distributions plt.subplot(1, 3, 3) self.features_df.boxplot(column=['Returns', 'Volatility'], ax=plt.gca()) plt.title('Returns and Volatility Distribution') plt.xticks(rotation=45) plt.tight_layout() plt.show() print("Visualizations completed\n") def generate_report(self): """ Generate a comprehensive report of the analysis """ print("="*60) print("CRYPTOCURRENCY ANALYSIS REPORT") print("="*60) print(f"\n1. DATA SUMMARY:") print(f" • Total cryptocurrencies analyzed: {len(self.features_df)}") print(f" • Time period: 1 year") print(f" • Features analyzed: {', '.join(self.numerical_features)}") print(f"\n2. SIMILARITY ANALYSIS:") print(f" • Correlation matrix generated for all pairs") print(f" • Top similar pairs identified") print(f"\n3. ANOMALY DETECTION:") anomalies = self.features_df[self.features_df['Is_Anomaly']] print(f" • Anomalies detected: {len(anomalies)}") if len(anomalies) > 0: for _, row in anomalies.iterrows(): print(f" - {row['Ticker']}") print(f"\n4. CLUSTERING:") cluster_counts = self.features_df['Cluster'].value_counts() for cluster, count in cluster_counts.items(): print(f" • Cluster {cluster}: {count} cryptocurrencies") print("="*60) # Example usage def main(): # Initialize the analyzer analyzer = CryptoAnalyzer() # Download data analyzer.download_data(period='1y', interval='1d') # Preprocess data analyzer.preprocess_data() # Perform similarity analysis analyzer.similarity_analysis() # Build anomaly detection models analyzer.build_anomaly_detection_models() # Visualize results analyzer.visualize_results() # Generate report analyzer.generate_report() # Example of detecting new anomalies print("Example: Detecting anomaly for new data point...") # Example new data point (features in the same order as numerical_features) example_new_point = [40000, 0.02, 0.05, 1.1, 1.5, 1.02] # BTC-like features analyzer.detect_new_anomalies(example_new_point) if __name__ == "__main__": main()