| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- import pandas as pd
- import numpy as np
- import yfinance as yf
- import matplotlib.pyplot as plt
- import seaborn as sns
- from sklearn.preprocessing import StandardScaler
- from sklearn.decomposition import PCA
- from sklearn.cluster import KMeans
- from sklearn.ensemble import IsolationForest
- from sklearn.svm import OneClassSVM
- from sklearn.metrics import silhouette_score
- from datetime import datetime, timedelta
- import warnings
- warnings.filterwarnings('ignore')
- class CryptoAnalyzer:
- def __init__(self):
- # Top traded cryptocurrencies (using their Yahoo Finance tickers)
- self.crypto_tickers = [
- 'BTC-USD', 'ETH-USD', 'BNB-USD', 'XRP-USD', 'ADA-USD',
- 'DOGE-USD', 'SOL-USD', 'DOT-USD', 'MATIC-USD', 'LTC-USD',
- 'SHIB-USD', 'TRX-USD', 'AVAX-USD', 'UNI-USD', 'LINK-USD'
- ]
- self.data = {}
- self.scaler = StandardScaler()
- self.pca = PCA(n_components=2)
- self.kmeans = None
- self.isolation_forest = None
- self.one_class_svm = None
-
- def download_data(self, period='1y', interval='1d'):
- """
- Download historical price data for cryptocurrencies
- """
- print("Downloading cryptocurrency data...")
-
- for ticker in self.crypto_tickers:
- try:
- crypto_data = yf.download(ticker, period=period, interval=interval)
- if not crypto_data.empty:
- self.data[ticker] = crypto_data
- print(f"✓ Downloaded data for {ticker}")
- else:
- print(f"✗ No data for {ticker}")
- except Exception as e:
- print(f"✗ Error downloading {ticker}: {str(e)}")
-
- print(f"Successfully downloaded data for {len(self.data)} cryptocurrencies\n")
- return self.data
-
- def preprocess_data(self):
- """
- Preprocess the downloaded data for analysis
- """
- print("Preprocessing data...")
-
- # Create features for each cryptocurrency
- features_list = []
- returns_list = []
-
- for ticker, df in self.data.items():
- # Ensure we're working with single columns by selecting specific column names
- if isinstance(df['Close'], pd.DataFrame):
- close_prices = df['Close'].iloc[:, 0] # Take first column if multiple
- else:
- close_prices = df['Close']
-
- if isinstance(df['High'], pd.DataFrame):
- high_prices = df['High'].iloc[:, 0] # Take first column if multiple
- else:
- high_prices = df['High']
-
- if isinstance(df['Low'], pd.DataFrame):
- low_prices = df['Low'].iloc[:, 0] # Take first column if multiple
- else:
- low_prices = df['Low']
-
- if isinstance(df['Volume'], pd.DataFrame):
- volume_data = df['Volume'].iloc[:, 0] # Take first column if multiple
- else:
- volume_data = df['Volume']
-
- # Calculate returns and technical indicators
- df['Returns'] = close_prices.pct_change()
- df['Volatility'] = df['Returns'].rolling(window=30).std()
- df['MA_7'] = close_prices.rolling(window=7).mean()
- df['MA_30'] = close_prices.rolling(window=30).mean()
- df['Price_MA_Ratio'] = close_prices / df['MA_30']
- df['Volume_MA'] = volume_data.rolling(window=30).mean()
- df['Volume_Ratio'] = volume_data / df['Volume_MA']
-
- # Get the latest data point for each cryptocurrency
- latest_data = df.iloc[-1]
-
- features = {
- 'Ticker': ticker,
- 'Close_Price': close_prices.iloc[-1],
- 'Returns': latest_data['Returns'],
- 'Volatility': latest_data['Volatility'],
- 'Price_MA_Ratio': latest_data['Price_MA_Ratio'],
- 'Volume_Ratio': latest_data['Volume_Ratio'],
- 'High_Low_Ratio': high_prices.iloc[-1] / low_prices.iloc[-1]
- }
-
- features_list.append(features)
-
- # Store returns for similarity analysis
- returns_data = df['Returns'].dropna()
- returns_list.append({
- 'ticker': ticker,
- 'returns': returns_data,
- 'mean_return': returns_data.mean(),
- 'std_return': returns_data.std()
- })
-
- self.features_df = pd.DataFrame(features_list)
- self.returns_data = returns_list
-
- # Prepare numerical features for clustering and anomaly detection
- self.numerical_features = ['Close_Price', 'Returns', 'Volatility',
- 'Price_MA_Ratio', 'Volume_Ratio', 'High_Low_Ratio']
-
- # Remove rows with NaN values
- self.features_df = self.features_df.dropna()
-
- print("Data preprocessing completed\n")
- return self.features_df
-
- def similarity_analysis(self):
- """
- Perform similarity analysis between cryptocurrencies
- """
- print("Performing similarity analysis...")
-
- # Calculate correlation matrix based on returns
- returns_df = pd.DataFrame()
- for item in self.returns_data:
- returns_df[item['ticker']] = item['returns'].tail(252) # Last year of data
-
- # Handle missing values by forward filling
- returns_df = returns_df.fillna(method='ffill').fillna(method='bfill')
-
- # Calculate correlation matrix
- self.correlation_matrix = returns_df.corr()
-
- # Display top similar pairs
- print("Top similar cryptocurrency pairs (based on correlation):")
- similar_pairs = []
- for i in range(len(self.correlation_matrix.columns)):
- for j in range(i+1, len(self.correlation_matrix.columns)):
- ticker1 = self.correlation_matrix.columns[i]
- ticker2 = self.correlation_matrix.columns[j]
- correlation = self.correlation_matrix.iloc[i, j]
- similar_pairs.append((ticker1, ticker2, correlation))
-
- # Sort by correlation
- similar_pairs.sort(key=lambda x: x[2], reverse=True)
-
- print("Top 10 most similar pairs:")
- for i, (t1, t2, corr) in enumerate(similar_pairs[:10]):
- print(f"{i+1}. {t1} - {t2}: {corr:.3f}")
-
- # Visualize correlation matrix
- plt.figure(figsize=(12, 10))
- sns.heatmap(self.correlation_matrix, annot=True, cmap='coolwarm', center=0,
- fmt='.2f', square=True)
- plt.title('Cryptocurrency Returns Correlation Matrix')
- plt.tight_layout()
- plt.show()
-
- print("\nSimilarity analysis completed\n")
- return self.correlation_matrix
-
- def build_anomaly_detection_models(self):
- """
- Build and train anomaly detection models
- """
- print("Building anomaly detection models...")
-
- # Prepare data for anomaly detection
- X = self.features_df[self.numerical_features].values
- X_scaled = self.scaler.fit_transform(X)
-
- # 1. K-Means Clustering for outlier detection
- self.kmeans = KMeans(n_clusters=3, random_state=42)
- cluster_labels = self.kmeans.fit_predict(X_scaled)
- self.features_df['Cluster'] = cluster_labels
-
- # 2. Isolation Forest
- self.isolation_forest = IsolationForest(contamination=0.1, random_state=42)
- isolation_predictions = self.isolation_forest.fit_predict(X_scaled)
- self.features_df['Isolation_Anomaly'] = isolation_predictions
-
- # 3. One-Class SVM
- self.one_class_svm = OneClassSVM(nu=0.1)
- svm_predictions = self.one_class_svm.fit_predict(X_scaled)
- self.features_df['SVM_Anomaly'] = svm_predictions
-
- # Combine anomaly detection results
- self.features_df['Anomaly_Score'] = (
- (self.features_df['Isolation_Anomaly'] == -1).astype(int) +
- (self.features_df['SVM_Anomaly'] == -1).astype(int)
- )
-
- # Anomalies are detected if 2 or more models flag them
- self.features_df['Is_Anomaly'] = self.features_df['Anomaly_Score'] >= 1
-
- print("Anomaly detection models built successfully")
-
- # Display anomalies found
- anomalies = self.features_df[self.features_df['Is_Anomaly']]
- if len(anomalies) > 0:
- print(f"\nDetected {len(anomalies)} potential anomalies:")
- for _, row in anomalies.iterrows():
- print(f" • {row['Ticker']}: Anomaly Score = {row['Anomaly_Score']}")
- else:
- print("No anomalies detected in current data")
-
- print("\nModel building completed\n")
- return self.features_df
-
- def detect_new_anomalies(self, new_data_point):
- """
- Detect anomalies in new incoming data
- """
- print("Detecting anomalies in new data...")
-
- # Preprocess new data point (assuming it's in the same format)
- new_scaled = self.scaler.transform([new_data_point])
-
- # Apply all models
- kmeans_pred = self.kmeans.predict(new_scaled)[0]
- isolation_pred = self.isolation_forest.predict(new_scaled)[0]
- svm_pred = self.one_class_svm.predict(new_scaled)[0]
-
- # Calculate anomaly score
- anomaly_score = (isolation_pred == -1) + (svm_pred == -1)
- is_anomaly = anomaly_score >= 1
-
- result = {
- 'Cluster': kmeans_pred,
- 'Isolation_Prediction': 'Anomaly' if isolation_pred == -1 else 'Normal',
- 'SVM_Prediction': 'Anomaly' if svm_pred == -1 else 'Normal',
- 'Anomaly_Score': anomaly_score,
- 'Is_Anomaly': is_anomaly
- }
-
- print(f"New data point analysis:")
- print(f" Cluster: {result['Cluster']}")
- print(f" Isolation Forest: {result['Isolation_Prediction']}")
- print(f" One-Class SVM: {result['SVM_Prediction']}")
- print(f" Overall: {'ANOMALY' if is_anomaly else 'NORMAL'}")
-
- return result
-
- def visualize_results(self):
- """
- Visualize the results of the analysis
- """
- print("Generating visualizations...")
-
- # 1. PCA visualization of clusters
- X_scaled = self.scaler.transform(self.features_df[self.numerical_features])
- X_pca = self.pca.fit_transform(X_scaled)
-
- plt.figure(figsize=(15, 5))
-
- # Plot 1: Clustering results
- plt.subplot(1, 3, 1)
- scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=self.features_df['Cluster'],
- cmap='viridis', alpha=0.7)
- plt.colorbar(scatter)
- plt.title('Cryptocurrency Clustering (PCA)')
- plt.xlabel('First Principal Component')
- plt.ylabel('Second Principal Component')
-
- # Plot 2: Anomalies vs Normal points
- plt.subplot(1, 3, 2)
- colors = ['red' if x else 'blue' for x in self.features_df['Is_Anomaly']]
- plt.scatter(X_pca[:, 0], X_pca[:, 1], c=colors, alpha=0.7)
- plt.title('Anomaly Detection Results')
- plt.xlabel('First Principal Component')
- plt.ylabel('Second Principal Component')
-
- # Plot 3: Feature distributions
- plt.subplot(1, 3, 3)
- self.features_df.boxplot(column=['Returns', 'Volatility'], ax=plt.gca())
- plt.title('Returns and Volatility Distribution')
- plt.xticks(rotation=45)
-
- plt.tight_layout()
- plt.show()
-
- print("Visualizations completed\n")
-
- def generate_report(self):
- """
- Generate a comprehensive report of the analysis
- """
- print("="*60)
- print("CRYPTOCURRENCY ANALYSIS REPORT")
- print("="*60)
-
- print(f"\n1. DATA SUMMARY:")
- print(f" • Total cryptocurrencies analyzed: {len(self.features_df)}")
- print(f" • Time period: 1 year")
- print(f" • Features analyzed: {', '.join(self.numerical_features)}")
-
- print(f"\n2. SIMILARITY ANALYSIS:")
- print(f" • Correlation matrix generated for all pairs")
- print(f" • Top similar pairs identified")
-
- print(f"\n3. ANOMALY DETECTION:")
- anomalies = self.features_df[self.features_df['Is_Anomaly']]
- print(f" • Anomalies detected: {len(anomalies)}")
- if len(anomalies) > 0:
- for _, row in anomalies.iterrows():
- print(f" - {row['Ticker']}")
-
- print(f"\n4. CLUSTERING:")
- cluster_counts = self.features_df['Cluster'].value_counts()
- for cluster, count in cluster_counts.items():
- print(f" • Cluster {cluster}: {count} cryptocurrencies")
-
- print("="*60)
- # Example usage
- def main():
- # Initialize the analyzer
- analyzer = CryptoAnalyzer()
-
- # Download data
- analyzer.download_data(period='1y', interval='1d')
-
- # Preprocess data
- analyzer.preprocess_data()
-
- # Perform similarity analysis
- analyzer.similarity_analysis()
-
- # Build anomaly detection models
- analyzer.build_anomaly_detection_models()
-
- # Visualize results
- analyzer.visualize_results()
-
- # Generate report
- analyzer.generate_report()
-
- # Example of detecting new anomalies
- print("Example: Detecting anomaly for new data point...")
- # Example new data point (features in the same order as numerical_features)
- example_new_point = [40000, 0.02, 0.05, 1.1, 1.5, 1.02] # BTC-like features
- analyzer.detect_new_anomalies(example_new_point)
- if __name__ == "__main__":
- main()
|