머신 러닝 (27) Stacking Classification

 import pandas as pd

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

from sklearn.model_selection import train_test_split, KFold, GridSearchCV, cross_val_score
from sklearn.preprocessing import RobustScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline

# Base Models
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, HistGradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

# Stacking
from sklearn.ensemble import StackingClassifier

# Metrics
from sklearn.metrics import accuracy_score, classification_report, f1_score, confusion_matrix

print("Loading Heart Disease Classification Dataset...")
df = pd.read_csv('heart_disease_dataset.csv')

print(f"데이터 크기: {df.shape}")
print(f"타깃 분포:\n{df['heart_disease'].value_counts()}")
print(f"\n타깃 비율:\n{df['heart_disease'].value_counts(normalize=True)}")
print(f"\n피처 목록: {list(df.columns)}")

from google.colab import files
uploaded = files.upload()

print("\n데이터 샘플:")
display(df.head())

print("\n데이터 정보:")
display(df.info())

print("\n기술 통계:")
display(df.describe())

# 타깃 분리 (cp 컬럼 제외)
y = df['heart_disease']
X = df.drop(['heart_disease', 'cp'], axis=1)

print(f"특성(Feature) 개수: {X.shape[1]}")
print(f"샘플 개수: {X.shape[0]}")

# 파생 변수 생성
X['age_group'] = pd.cut(X['age'], bins=[0, 39, 59, 100], labels=['under_40', '40_59', 'over_60'])
X['is_over_50'] = (X['age'] >= 50).astype(int)
X['age_sex_inter'] = X['age'] * X['sex']

print("\n파생 변수 생성 완료")
print(f"전체 특성 개수: {X.shape[1]}")

# 수치형/범주형 변수 분리
numeric_features = ['age', 'trestbps', 'chol', 'thalach', 'oldpeak', 'bmi', 'age_sex_inter']
categorical_features = ['sex', 'fbs', 'restecg', 'exang', 'slope', 'ca', 'thal',
                        'smoking', 'diabetes', 'age_group', 'is_over_50']

print(f"\n수치형 변수 ({len(numeric_features)}개): {numeric_features}")
print(f"\n범주형 변수 ({len(categorical_features)}개): {categorical_features}")

# 전처리 파이프라인 정의
preprocessor = ColumnTransformer(
    transformers=[
        ('num', Pipeline([
            ('imputer', SimpleImputer(strategy='mean')),
            ('scaler', RobustScaler())
        ]), numeric_features),
        ('cat', Pipeline([
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('encoder', OneHotEncoder(handle_unknown='ignore'))
        ]), categorical_features)
    ])

# 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

print(f"\n학습 데이터: {X_train.shape}")
print(f"테스트 데이터: {X_test.shape}")
print(f"\n학습 데이터 타깃 분포:\n{y_train.value_counts(normalize=True)}")
print(f"\n테스트 데이터 타깃 분포:\n{y_test.value_counts(normalize=True)}")

# Base Models 정의
base_models = [
    ('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
    ('gb', GradientBoostingClassifier(n_estimators=100, random_state=42)),
    ('xgb', XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42)),
    ('lgbm', LGBMClassifier(random_state=42, verbose=-1)),
    ('svc', SVC(kernel='rbf', probability=True, random_state=42))
]

# Meta Model 정의
meta_model = LogisticRegression(max_iter=1000)

# Stacking Classifier 생성
stacking_clf = StackingClassifier(
    estimators=base_models,
    final_estimator=meta_model,
    cv=5,
    n_jobs=-1
)

# 전체 파이프라인
stacking_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', stacking_clf)
])

print("Stacking Classifier 생성 완료")
print(f"\nBase Models: {len(base_models)}개")
for name, _ in base_models:
    print(f"  - {name}")
print(f"\nMeta Model: Logistic Regression")

print("기본 Stacking 모델 학습 중...")
stacking_pipeline.fit(X_train, y_train)

# 예측
y_pred_train = stacking_pipeline.predict(X_train)
y_pred_test = stacking_pipeline.predict(X_test)

# 성능 평가
train_acc = accuracy_score(y_train, y_pred_train)
test_acc = accuracy_score(y_test, y_pred_test)
test_f1 = f1_score(y_test, y_pred_test)

print(f"\n학습 정확도: {train_acc:.4f}")
print(f"테스트 정확도: {test_acc:.4f}")
print(f"테스트 F1-Score: {test_f1:.4f}")

print("\n분류 리포트:")
print(classification_report(y_test, y_pred_test, target_names=['No Disease', 'Disease']))

# 혼동 행렬
cm = confusion_matrix(y_test, y_pred_test)

plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['No Disease', 'Disease'],
            yticklabels=['No Disease', 'Disease'])
plt.title('Confusion Matrix - Stacking Ensemble')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.tight_layout()
plt.show()

print("교차 검증 수행 중 (5-Fold CV)...")
cv_scores = cross_val_score(stacking_pipeline, X_train, y_train, cv=5, scoring='accuracy', n_jobs=-1)

print(f"\n교차 검증 결과:")
print(f"Fold별 정확도: {cv_scores}")
print(f"평균 정확도: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")

# 하이퍼파라미터 그리드 정의
param_grid = {
    # Random Forest 파라미터
    'classifier__rf__n_estimators': [50, 100],
    'classifier__rf__max_depth': [10, 20, None],

    # Gradient Boosting 파라미터
    'classifier__gb__n_estimators': [50, 100],
    'classifier__gb__learning_rate': [0.05, 0.1],

    # Meta Model 파라미터
    'classifier__final_estimator__C': [0.1, 1.0, 10.0]
}

print("GridSearchCV 설정:")
print(f"파라미터 그리드: {param_grid}")
print(f"\n예상 조합 수: {2 * 3 * 2 * 2 * 3} = 72개")

# GridSearchCV 객체 생성
grid_search = GridSearchCV(
    estimator=stacking_pipeline,
    param_grid=param_grid,
    cv=3,  # 3-Fold CV
    scoring='accuracy',
    n_jobs=-1,
    verbose=2
)

print("\nGridSearchCV 시작...")
print("(시간이 다소 걸릴 수 있습니다)\n")

import time
start_time = time.time()

grid_search.fit(X_train, y_train)

elapsed_time = time.time() - start_time
print(f"\nGridSearchCV 완료! 소요 시간: {elapsed_time:.2f}초")

# 최적 파라미터 출력
print("최적 하이퍼파라미터:")
for param, value in grid_search.best_params_.items():
    print(f"  {param}: {value}")

print(f"\n최적 교차 검증 점수: {grid_search.best_score_:.4f}")

# 최적 모델 저장
best_model = grid_search.best_estimator_

# 테스트 데이터 예측
y_pred_best = best_model.predict(X_test)

# 성능 평가
test_acc_best = accuracy_score(y_test, y_pred_best)
test_f1_best = f1_score(y_test, y_pred_best)

print(f"\n최적화된 모델 테스트 정확도: {test_acc_best:.4f}")
print(f"최적화된 모델 테스트 F1-Score: {test_f1_best:.4f}")

print("\n분류 리포트 (최적화된 모델):")
print(classification_report(y_test, y_pred_best, target_names=['No Disease', 'Disease']))

# 성능 비교 데이터프레임
comparison_df = pd.DataFrame({
    'Model': ['Basic Stacking', 'Optimized Stacking'],
    'Test Accuracy': [test_acc, test_acc_best],
    'Test F1-Score': [test_f1, test_f1_best]
})

print("\n성능 비교:")
display(comparison_df)

# 시각화
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Accuracy 비교
axes[0].bar(comparison_df['Model'], comparison_df['Test Accuracy'], color=['skyblue', 'orange'])
axes[0].set_ylabel('Accuracy')
axes[0].set_title('Test Accuracy Comparison')
axes[0].set_ylim([0.7, 1.0])
for i, v in enumerate(comparison_df['Test Accuracy']):
    axes[0].text(i, v + 0.01, f"{v:.4f}", ha='center', va='bottom')

# F1-Score 비교
axes[1].bar(comparison_df['Model'], comparison_df['Test F1-Score'], color=['skyblue', 'orange'])
axes[1].set_ylabel('F1-Score')
axes[1].set_title('Test F1-Score Comparison')
axes[1].set_ylim([0.7, 1.0])
for i, v in enumerate(comparison_df['Test F1-Score']):
    axes[1].text(i, v + 0.01, f"{v:.4f}", ha='center', va='bottom')

plt.tight_layout()
plt.show()

print("Base Models 개별 성능 평가...\n")

base_results = []

for name, model in base_models:
    # 파이프라인 생성
    pipe = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])

    # 학습
    pipe.fit(X_train, y_train)

    # 예측 및 평가
    y_pred = pipe.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    base_results.append({
        'Model': name.upper(),
        'Test Accuracy': acc,
        'Test F1-Score': f1
    })

    print(f"{name.upper():<10} | Accuracy: {acc:.4f} | F1-Score: {f1:.4f}")

base_results_df = pd.DataFrame(base_results)

print(f"\nStacking Ensemble | Accuracy: {test_acc_best:.4f} | F1-Score: {test_f1_best:.4f}")
print("\n→ Stacking은 Base Models의 강점을 결합하여 성능을 향상시킵니다.")

# Base Models vs Stacking 시각화
all_results = pd.concat([base_results_df, pd.DataFrame([{
    'Model': 'STACKING',
    'Test Accuracy': test_acc_best,
    'Test F1-Score': test_f1_best
}])], ignore_index=True)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Accuracy 비교
colors = ['skyblue'] * len(base_results) + ['red']
axes[0].barh(all_results['Model'], all_results['Test Accuracy'], color=colors)
axes[0].set_xlabel('Test Accuracy')
axes[0].set_title('Model Accuracy Comparison')
axes[0].set_xlim([0.7, 1.0])
axes[0].axvline(x=test_acc_best, color='red', linestyle='--', alpha=0.5, label='Stacking')
axes[0].legend()

# F1-Score 비교
axes[1].barh(all_results['Model'], all_results['Test F1-Score'], color=colors)
axes[1].set_xlabel('Test F1-Score')
axes[1].set_title('Model F1-Score Comparison')
axes[1].set_xlim([0.7, 1.0])
axes[1].axvline(x=test_f1_best, color='red', linestyle='--', alpha=0.5, label='Stacking')
axes[1].legend()

plt.tight_layout()
plt.show()

# Random Forest 모델 추출
rf_model = best_model.named_steps['classifier'].estimators_[0][1]

# 전처리된 특성명 추출
preprocessor_fitted = best_model.named_steps['preprocessor']
ohe_features = preprocessor_fitted.named_transformers_['cat']['encoder'].get_feature_names_out(categorical_features)
feature_names = numeric_features + list(ohe_features)

# 피처 중요도 추출
importances = rf_model.feature_importances_
indices = np.argsort(importances)[::-1]

# 상위 20개 특성
top_n = 20
top_indices = indices[:top_n]

importance_df = pd.DataFrame({
    'Feature': [feature_names[i] for i in top_indices],
    'Importance': importances[top_indices]
})

print(f"\n피처 중요도 순위 (Top {top_n}):")
display(importance_df)

# 피처 중요도 시각화
plt.figure(figsize=(10, 8))
sns.barplot(x=importance_df['Importance'], y=importance_df['Feature'], palette='viridis')
plt.title(f"Top {top_n} Feature Importances (Random Forest in Stacking)", fontsize=14, fontweight='bold')
plt.xlabel("Importance", fontsize=12)
plt.ylabel("Features", fontsize=12)
plt.tight_layout()
plt.show()

print("=" * 60)
print("Stacking Ensemble 분석 완료")
print("=" * 60)

print("\n📊 최종 성능 지표:")
print(f"  - 테스트 정확도: {test_acc_best:.4f}")
print(f"  - 테스트 F1-Score: {test_f1_best:.4f}")
print(f"  - 교차 검증 평균 정확도: {grid_search.best_score_:.4f}")

print("\n🎯 Stacking의 장점:")
print("  1. 다양한 모델의 강점을 결합하여 일반화 성능 향상")
print("  2. 개별 모델보다 안정적인 예측")
print("  3. 과적합 위험 감소")

print("\n✅ Base Models:")
for name, _ in base_models:
    print(f"  - {name.upper()}")
print(f"\n✅ Meta Model: Logistic Regression")

print("\n" + "=" * 60)

댓글

이 블로그의 인기 게시물

베이스 캠프에서 (1)

베이스 캠프에서 (2)

Database 분석 (4)