머신 러닝 (27) Stacking Classification
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
from sklearn.model_selection import train_test_split, KFold, GridSearchCV, cross_val_score
from sklearn.preprocessing import RobustScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
# Base Models
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, HistGradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
# Stacking
from sklearn.ensemble import StackingClassifier
# Metrics
from sklearn.metrics import accuracy_score, classification_report, f1_score, confusion_matrix
print("Loading Heart Disease Classification Dataset...")
df = pd.read_csv('heart_disease_dataset.csv')
print(f"데이터 크기: {df.shape}")
print(f"타깃 분포:\n{df['heart_disease'].value_counts()}")
print(f"\n타깃 비율:\n{df['heart_disease'].value_counts(normalize=True)}")
print(f"\n피처 목록: {list(df.columns)}")
from google.colab import files
uploaded = files.upload()
print("\n데이터 샘플:")
display(df.head())
print("\n데이터 정보:")
display(df.info())
print("\n기술 통계:")
display(df.describe())
# 타깃 분리 (cp 컬럼 제외)
y = df['heart_disease']
X = df.drop(['heart_disease', 'cp'], axis=1)
print(f"특성(Feature) 개수: {X.shape[1]}")
print(f"샘플 개수: {X.shape[0]}")
# 파생 변수 생성
X['age_group'] = pd.cut(X['age'], bins=[0, 39, 59, 100], labels=['under_40', '40_59', 'over_60'])
X['is_over_50'] = (X['age'] >= 50).astype(int)
X['age_sex_inter'] = X['age'] * X['sex']
print("\n파생 변수 생성 완료")
print(f"전체 특성 개수: {X.shape[1]}")
# 수치형/범주형 변수 분리
numeric_features = ['age', 'trestbps', 'chol', 'thalach', 'oldpeak', 'bmi', 'age_sex_inter']
categorical_features = ['sex', 'fbs', 'restecg', 'exang', 'slope', 'ca', 'thal',
'smoking', 'diabetes', 'age_group', 'is_over_50']
print(f"\n수치형 변수 ({len(numeric_features)}개): {numeric_features}")
print(f"\n범주형 변수 ({len(categorical_features)}개): {categorical_features}")
# 전처리 파이프라인 정의
preprocessor = ColumnTransformer(
transformers=[
('num', Pipeline([
('imputer', SimpleImputer(strategy='mean')),
('scaler', RobustScaler())
]), numeric_features),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore'))
]), categorical_features)
])
# 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=42
)
print(f"\n학습 데이터: {X_train.shape}")
print(f"테스트 데이터: {X_test.shape}")
print(f"\n학습 데이터 타깃 분포:\n{y_train.value_counts(normalize=True)}")
print(f"\n테스트 데이터 타깃 분포:\n{y_test.value_counts(normalize=True)}")
# Base Models 정의
base_models = [
('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
('gb', GradientBoostingClassifier(n_estimators=100, random_state=42)),
('xgb', XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42)),
('lgbm', LGBMClassifier(random_state=42, verbose=-1)),
('svc', SVC(kernel='rbf', probability=True, random_state=42))
]
# Meta Model 정의
meta_model = LogisticRegression(max_iter=1000)
# Stacking Classifier 생성
stacking_clf = StackingClassifier(
estimators=base_models,
final_estimator=meta_model,
cv=5,
n_jobs=-1
)
# 전체 파이프라인
stacking_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', stacking_clf)
])
print("Stacking Classifier 생성 완료")
print(f"\nBase Models: {len(base_models)}개")
for name, _ in base_models:
print(f" - {name}")
print(f"\nMeta Model: Logistic Regression")
print("기본 Stacking 모델 학습 중...")
stacking_pipeline.fit(X_train, y_train)
# 예측
y_pred_train = stacking_pipeline.predict(X_train)
y_pred_test = stacking_pipeline.predict(X_test)
# 성능 평가
train_acc = accuracy_score(y_train, y_pred_train)
test_acc = accuracy_score(y_test, y_pred_test)
test_f1 = f1_score(y_test, y_pred_test)
print(f"\n학습 정확도: {train_acc:.4f}")
print(f"테스트 정확도: {test_acc:.4f}")
print(f"테스트 F1-Score: {test_f1:.4f}")
print("\n분류 리포트:")
print(classification_report(y_test, y_pred_test, target_names=['No Disease', 'Disease']))
# 혼동 행렬
cm = confusion_matrix(y_test, y_pred_test)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['No Disease', 'Disease'],
yticklabels=['No Disease', 'Disease'])
plt.title('Confusion Matrix - Stacking Ensemble')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.tight_layout()
plt.show()
print("교차 검증 수행 중 (5-Fold CV)...")
cv_scores = cross_val_score(stacking_pipeline, X_train, y_train, cv=5, scoring='accuracy', n_jobs=-1)
print(f"\n교차 검증 결과:")
print(f"Fold별 정확도: {cv_scores}")
print(f"평균 정확도: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")
# 하이퍼파라미터 그리드 정의
param_grid = {
# Random Forest 파라미터
'classifier__rf__n_estimators': [50, 100],
'classifier__rf__max_depth': [10, 20, None],
# Gradient Boosting 파라미터
'classifier__gb__n_estimators': [50, 100],
'classifier__gb__learning_rate': [0.05, 0.1],
# Meta Model 파라미터
'classifier__final_estimator__C': [0.1, 1.0, 10.0]
}
print("GridSearchCV 설정:")
print(f"파라미터 그리드: {param_grid}")
print(f"\n예상 조합 수: {2 * 3 * 2 * 2 * 3} = 72개")
# GridSearchCV 객체 생성
grid_search = GridSearchCV(
estimator=stacking_pipeline,
param_grid=param_grid,
cv=3, # 3-Fold CV
scoring='accuracy',
n_jobs=-1,
verbose=2
)
print("\nGridSearchCV 시작...")
print("(시간이 다소 걸릴 수 있습니다)\n")
import time
start_time = time.time()
grid_search.fit(X_train, y_train)
elapsed_time = time.time() - start_time
print(f"\nGridSearchCV 완료! 소요 시간: {elapsed_time:.2f}초")
# 최적 파라미터 출력
print("최적 하이퍼파라미터:")
for param, value in grid_search.best_params_.items():
print(f" {param}: {value}")
print(f"\n최적 교차 검증 점수: {grid_search.best_score_:.4f}")
# 최적 모델 저장
best_model = grid_search.best_estimator_
# 테스트 데이터 예측
y_pred_best = best_model.predict(X_test)
# 성능 평가
test_acc_best = accuracy_score(y_test, y_pred_best)
test_f1_best = f1_score(y_test, y_pred_best)
print(f"\n최적화된 모델 테스트 정확도: {test_acc_best:.4f}")
print(f"최적화된 모델 테스트 F1-Score: {test_f1_best:.4f}")
print("\n분류 리포트 (최적화된 모델):")
print(classification_report(y_test, y_pred_best, target_names=['No Disease', 'Disease']))
# 성능 비교 데이터프레임
comparison_df = pd.DataFrame({
'Model': ['Basic Stacking', 'Optimized Stacking'],
'Test Accuracy': [test_acc, test_acc_best],
'Test F1-Score': [test_f1, test_f1_best]
})
print("\n성능 비교:")
display(comparison_df)
# 시각화
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# Accuracy 비교
axes[0].bar(comparison_df['Model'], comparison_df['Test Accuracy'], color=['skyblue', 'orange'])
axes[0].set_ylabel('Accuracy')
axes[0].set_title('Test Accuracy Comparison')
axes[0].set_ylim([0.7, 1.0])
for i, v in enumerate(comparison_df['Test Accuracy']):
axes[0].text(i, v + 0.01, f"{v:.4f}", ha='center', va='bottom')
# F1-Score 비교
axes[1].bar(comparison_df['Model'], comparison_df['Test F1-Score'], color=['skyblue', 'orange'])
axes[1].set_ylabel('F1-Score')
axes[1].set_title('Test F1-Score Comparison')
axes[1].set_ylim([0.7, 1.0])
for i, v in enumerate(comparison_df['Test F1-Score']):
axes[1].text(i, v + 0.01, f"{v:.4f}", ha='center', va='bottom')
plt.tight_layout()
plt.show()
print("Base Models 개별 성능 평가...\n")
base_results = []
for name, model in base_models:
# 파이프라인 생성
pipe = Pipeline([
('preprocessor', preprocessor),
('classifier', model)
])
# 학습
pipe.fit(X_train, y_train)
# 예측 및 평가
y_pred = pipe.predict(X_test)
acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
base_results.append({
'Model': name.upper(),
'Test Accuracy': acc,
'Test F1-Score': f1
})
print(f"{name.upper():<10} | Accuracy: {acc:.4f} | F1-Score: {f1:.4f}")
base_results_df = pd.DataFrame(base_results)
print(f"\nStacking Ensemble | Accuracy: {test_acc_best:.4f} | F1-Score: {test_f1_best:.4f}")
print("\n→ Stacking은 Base Models의 강점을 결합하여 성능을 향상시킵니다.")
# Base Models vs Stacking 시각화
all_results = pd.concat([base_results_df, pd.DataFrame([{
'Model': 'STACKING',
'Test Accuracy': test_acc_best,
'Test F1-Score': test_f1_best
}])], ignore_index=True)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Accuracy 비교
colors = ['skyblue'] * len(base_results) + ['red']
axes[0].barh(all_results['Model'], all_results['Test Accuracy'], color=colors)
axes[0].set_xlabel('Test Accuracy')
axes[0].set_title('Model Accuracy Comparison')
axes[0].set_xlim([0.7, 1.0])
axes[0].axvline(x=test_acc_best, color='red', linestyle='--', alpha=0.5, label='Stacking')
axes[0].legend()
# F1-Score 비교
axes[1].barh(all_results['Model'], all_results['Test F1-Score'], color=colors)
axes[1].set_xlabel('Test F1-Score')
axes[1].set_title('Model F1-Score Comparison')
axes[1].set_xlim([0.7, 1.0])
axes[1].axvline(x=test_f1_best, color='red', linestyle='--', alpha=0.5, label='Stacking')
axes[1].legend()
plt.tight_layout()
plt.show()
# Random Forest 모델 추출
rf_model = best_model.named_steps['classifier'].estimators_[0][1]
# 전처리된 특성명 추출
preprocessor_fitted = best_model.named_steps['preprocessor']
ohe_features = preprocessor_fitted.named_transformers_['cat']['encoder'].get_feature_names_out(categorical_features)
feature_names = numeric_features + list(ohe_features)
# 피처 중요도 추출
importances = rf_model.feature_importances_
indices = np.argsort(importances)[::-1]
# 상위 20개 특성
top_n = 20
top_indices = indices[:top_n]
importance_df = pd.DataFrame({
'Feature': [feature_names[i] for i in top_indices],
'Importance': importances[top_indices]
})
print(f"\n피처 중요도 순위 (Top {top_n}):")
display(importance_df)
# 피처 중요도 시각화
plt.figure(figsize=(10, 8))
sns.barplot(x=importance_df['Importance'], y=importance_df['Feature'], palette='viridis')
plt.title(f"Top {top_n} Feature Importances (Random Forest in Stacking)", fontsize=14, fontweight='bold')
plt.xlabel("Importance", fontsize=12)
plt.ylabel("Features", fontsize=12)
plt.tight_layout()
plt.show()
print("=" * 60)
print("Stacking Ensemble 분석 완료")
print("=" * 60)
print("\n📊 최종 성능 지표:")
print(f" - 테스트 정확도: {test_acc_best:.4f}")
print(f" - 테스트 F1-Score: {test_f1_best:.4f}")
print(f" - 교차 검증 평균 정확도: {grid_search.best_score_:.4f}")
print("\n🎯 Stacking의 장점:")
print(" 1. 다양한 모델의 강점을 결합하여 일반화 성능 향상")
print(" 2. 개별 모델보다 안정적인 예측")
print(" 3. 과적합 위험 감소")
print("\n✅ Base Models:")
for name, _ in base_models:
print(f" - {name.upper()}")
print(f"\n✅ Meta Model: Logistic Regression")
print("\n" + "=" * 60)
댓글
댓글 쓰기