import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Protocol
import numpy as np
import pandas as pd
from loguru import logger
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
f1_score,
precision_score,
recall_score,
)
from sklearn.model_selection import KFold
from sklearn.preprocessing import PolynomialFeatures
from titanicprediction.core.algorithms import (
gradient_descent,
predict,
predict_proba,
standard_gradient_descent,
)
from titanicprediction.data.preprocessing import DataPreprocessor
from titanicprediction.entities.core import (
Dataset,
FeatureImpactAnalysis,
Passenger,
PredictionExplanation,
TrainedModel,
)
[docs]
@dataclass(frozen=True)
class TrainingConfig:
learning_rate: float = 0.01
epochs: int = 1000
test_size: float = 0.2
random_state: int = 42
convergence_tol: float = 1e-6
lambda_reg: float = 0.01
polynomial_degree: int = 2
use_adam: bool = True
beta1: float = 0.9
beta2: float = 0.999
early_stopping_patience: int = 50
[docs]
@dataclass(frozen=True)
class TrainingResult:
model: TrainedModel
training_time: float
final_loss: float
metrics: dict[str, float]
learning_curve: list[float]
feature_importance: dict[str, float]
config: TrainingConfig
[docs]
@dataclass(frozen=True)
class EvaluationResult:
accuracy: float
precision: float
recall: float
f1_score: float
confusion_matrix: np.ndarray
classification_report: dict[str, Any]
[docs]
@dataclass(frozen=True)
class PredictionResult:
passenger: Passenger
probability: float
prediction: bool
confidence: float
timestamp: datetime
[docs]
@dataclass(frozen=True)
class CrossValidationResult:
fold_results: list[EvaluationResult]
mean_accuracy: float
mean_precision: float
mean_recall: float
mean_f1: float
std_accuracy: float
std_precision: float
std_recall: float
std_f1: float
[docs]
@dataclass(frozen=True)
class ConfidenceInterval:
lower_bound: float
upper_bound: float
confidence_level: float
[docs]
class IModelTrainingService(Protocol):
self, dataset: Dataset, config: TrainingConfig
) -> TrainingResult: ...
self, model: TrainedModel, test_data: Dataset
) -> EvaluationResult: ...
self, dataset: Dataset, config: TrainingConfig, folds: int
) -> CrossValidationResult: ...
[docs]
class IPredictionService(Protocol):
[docs]
def predict_survival(self, passenger: Passenger) -> PredictionResult: ...
[docs]
def batch_predict(self, passengers: list[Passenger]) -> list[PredictionResult]: ...
[docs]
def get_prediction_confidence(
self, prediction: PredictionResult
) -> ConfidenceInterval: ...
[docs]
class ModelTrainingService:
[docs]
def __init__(self, preprocessor: DataPreprocessor):
self.preprocessor = preprocessor
[docs]
def train_model(self, dataset: Dataset, config: TrainingConfig) -> TrainingResult:
start_time = time.time()
processed_data = self.preprocessor.fit_transform(dataset)
numeric_features = processed_data.features.select_dtypes(include=[np.number])
X_train = numeric_features.values.astype(np.float64)
original_numeric_feature_names = numeric_features.columns.tolist()
poly_transformer = PolynomialFeatures(
degree=config.polynomial_degree, include_bias=False, interaction_only=True
)
X_train_poly = poly_transformer.fit_transform(X_train)
poly_feature_names = poly_transformer.get_feature_names_out(
original_numeric_feature_names
)
X_mean = np.mean(X_train_poly, axis=0)
X_std = np.std(X_train_poly, axis=0)
X_std[X_std == 0] = 1
X_train_normalized = (X_train_poly - X_mean) / X_std
y_train = processed_data.target.values.astype(int)
if config.use_adam:
result = gradient_descent(
x=X_train_normalized,
y=y_train,
learning_rate=config.learning_rate,
epochs=config.epochs,
convergence_tol=config.convergence_tol,
beta1=config.beta1,
beta2=config.beta2,
lambda_reg=config.lambda_reg,
)
else:
result = standard_gradient_descent(
x=X_train_normalized,
y=y_train,
learning_rate=config.learning_rate,
epochs=config.epochs,
convergence_tol=config.convergence_tol,
lambda_reg=config.lambda_reg,
)
training_time = time.time() - start_time
model = TrainedModel(
weights=result.weights,
bias=result.bias,
feature_names=poly_feature_names.tolist(),
training_metrics={},
validation_metrics={},
training_history=result.loss_history,
model_config=config.__dict__,
preprocessing_artifacts={
"poly_transformer": poly_transformer,
"X_mean": X_mean,
"X_std": X_std,
"original_feature_names": original_numeric_feature_names,
"config": {
"polynomial_degree": config.polynomial_degree,
"interaction_only": True,
},
},
)
return TrainingResult(
model=model,
training_time=training_time,
final_loss=result.loss_history[-1],
metrics={},
learning_curve=result.loss_history,
feature_importance=self._calculate_feature_importance(model),
config=config,
)
[docs]
def evaluate_model(
self, model: TrainedModel, test_data: Dataset
) -> EvaluationResult:
processed_test = self.preprocessor.transform(test_data)
preprocessing_artifacts = model.preprocessing_artifacts
if not preprocessing_artifacts:
raise ValueError("Model does not have preprocessing artifacts")
poly_transformer = preprocessing_artifacts["poly_transformer"]
X_mean = preprocessing_artifacts["X_mean"]
X_std = preprocessing_artifacts["X_std"]
numeric_test_features = processed_test.features.select_dtypes(
include=[np.number]
)
X_test_original = numeric_test_features.values.astype(np.float64)
X_test_poly = poly_transformer.transform(X_test_original)
X_test = (X_test_poly - X_mean) / X_std
y_true = processed_test.target.values.astype(np.float64)
if X_test.shape[1] != len(model.feature_names):
aligned_X_test = np.zeros((X_test.shape[0], len(model.feature_names)))
for i, _feature_name in enumerate(model.feature_names):
if i < X_test.shape[1]:
aligned_X_test[:, i] = X_test[:, i]
X_test = aligned_X_test
predict_proba(X_test, model.weights, model.bias)
y_pred = predict(X_test, model.weights, model.bias, threshold=0.5)
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, zero_division=0, average="binary")
recall = recall_score(y_true, y_pred, zero_division=0, average="binary")
f1 = f1_score(y_true, y_pred, zero_division=0, average="binary")
cm = confusion_matrix(y_true, y_pred)
classification_report = {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1,
"support": len(y_true),
}
return EvaluationResult(
accuracy=accuracy,
precision=precision,
recall=recall,
f1_score=f1,
confusion_matrix=cm,
classification_report=classification_report,
)
[docs]
def _align_features_with_model(
self, features: pd.DataFrame, model: TrainedModel
) -> pd.DataFrame:
aligned_features = pd.DataFrame()
for feature in model.feature_names:
if feature in features.columns:
aligned_features[feature] = features[feature]
else:
aligned_features[feature] = 0.0
return aligned_features[model.feature_names]
[docs]
def cross_validate(
self, dataset: Dataset, config: TrainingConfig, folds: int = 5
) -> CrossValidationResult:
kf = KFold(n_splits=folds, shuffle=True, random_state=config.random_state)
fold_results = []
X = dataset.features.values
y = dataset.target.values
for train_index, test_index in kf.split(X):
X_train, X_test = X[train_index], X[test_index]
y_train, y_test = y[train_index], y[test_index]
train_dataset = Dataset(
features=pd.DataFrame(X_train, columns=dataset.feature_names),
target=pd.Series(y_train),
feature_names=dataset.feature_names,
target_name=dataset.target_name,
)
test_dataset = Dataset(
features=pd.DataFrame(X_test, columns=dataset.feature_names),
target=pd.Series(y_test),
feature_names=dataset.feature_names,
target_name=dataset.target_name,
)
training_result = self.train_model(train_dataset, config)
eval_result = self.evaluate_model(training_result.model, test_dataset)
fold_results.append(eval_result)
accuracies = [r.accuracy for r in fold_results]
precisions = [r.precision for r in fold_results]
recalls = [r.recall for r in fold_results]
f1_scores = [r.f1_score for r in fold_results]
return CrossValidationResult(
fold_results=fold_results,
mean_accuracy=np.mean(accuracies),
mean_precision=np.mean(precisions),
mean_recall=np.mean(recalls),
mean_f1=np.mean(f1_scores),
std_accuracy=np.std(accuracies),
std_precision=np.std(precisions),
std_recall=np.std(recalls),
std_f1=np.std(f1_scores),
)
[docs]
def _calculate_feature_importance(self, model: TrainedModel) -> dict[str, float]:
importance = {}
total_importance = np.sum(np.abs(model.weights))
for i, feature_name in enumerate(model.feature_names):
if total_importance > 0:
importance[feature_name] = float(
np.abs(model.weights[i]) / total_importance * 100
)
else:
importance[feature_name] = 0.0
return importance
[docs]
class PredictionService:
[docs]
def __init__(self, model: TrainedModel, preprocessor: DataPreprocessor):
self.model = model
self.preprocessor = preprocessor
self.preprocessing_artifacts = model.preprocessing_artifacts
if not self.preprocessing_artifacts:
raise ValueError("Model must have preprocessing artifacts for prediction")
[docs]
def predict_survival(self, passenger: Passenger) -> PredictionResult:
try:
passenger_df = self._passenger_to_dataframe(passenger)
dummy_dataset = Dataset(
features=passenger_df,
target=None,
feature_names=list(passenger_df.columns),
target_name="dummy",
)
processed_data = self.preprocessor.transform(dummy_dataset)
numeric_features = processed_data.features.select_dtypes(
include=[np.number]
)
if numeric_features.empty:
raise ValueError("No numeric features after preprocessing")
X_pred_original = numeric_features.values.astype(np.float64)
if "poly_transformer" in self.preprocessing_artifacts:
poly_transformer = self.preprocessing_artifacts["poly_transformer"]
X_pred_poly = poly_transformer.transform(X_pred_original)
if (
"X_mean" in self.preprocessing_artifacts
and "X_std" in self.preprocessing_artifacts
):
X_mean = self.preprocessing_artifacts["X_mean"]
X_std = self.preprocessing_artifacts["X_std"]
X_pred = (X_pred_poly - X_mean) / X_std
else:
X_pred = X_pred_poly
else:
X_pred = X_pred_original
if X_pred.shape[1] != len(self.model.feature_names):
X_pred = self._align_features(X_pred, self.model.feature_names)
probability = predict_proba(X_pred, self.model.weights, self.model.bias)[0]
if np.isnan(probability) or not np.isfinite(probability):
logger.warning(f"Invalid probability computed: {probability}")
probability = 0.5
probability = float(np.clip(probability, 0.0, 1.0))
prediction = probability >= 0.5
confidence = self._calculate_confidence(probability)
return PredictionResult(
passenger=passenger,
probability=probability,
prediction=bool(prediction),
confidence=confidence,
timestamp=datetime.now(),
)
except Exception as e:
logger.error(f"Prediction error: {e}")
raise RuntimeError(f"Failed to make prediction: {e}")
[docs]
def _align_features(
self, features: np.ndarray, expected_feature_names: list[str]
) -> np.ndarray:
expected_count = len(expected_feature_names)
current_count = features.shape[1]
if current_count == expected_count:
return features
aligned_features = np.zeros((features.shape[0], expected_count))
min_features = min(current_count, expected_count)
aligned_features[:, :min_features] = features[:, :min_features]
return aligned_features
[docs]
def batch_predict(self, passengers: list[Passenger]) -> list[PredictionResult]:
return [self.predict_survival(passenger) for passenger in passengers]
[docs]
def get_prediction_confidence(
self, prediction: PredictionResult
) -> ConfidenceInterval:
probability = prediction.probability
n = 100
z = 1.96
p_hat = probability
denominator = 1 + z**2 / n
center = (p_hat + z**2 / (2 * n)) / denominator
margin = (z / denominator) * np.sqrt(
(p_hat * (1 - p_hat) / n) + (z**2 / (4 * n**2))
)
return ConfidenceInterval(
lower_bound=max(0.0, center - margin),
upper_bound=min(1.0, center + margin),
confidence_level=0.95,
)
[docs]
def _passenger_to_dataframe(self, passenger: Passenger) -> pd.DataFrame:
data = {
"PassengerId": [passenger.passenger_id],
"Pclass": [passenger.pclass],
"Name": [passenger.name],
"Sex": [passenger.sex],
"Age": [passenger.age],
"SibSp": [passenger.sibsp],
"Parch": [passenger.parch],
"Ticket": [passenger.ticket or "Unknown"],
"Fare": [passenger.fare],
"Cabin": [passenger.cabin or "Unknown"],
"Embarked": [passenger.embarked],
}
return pd.DataFrame(data)
[docs]
def _calculate_confidence(self, probability: float) -> float:
distance_from_decision = abs(probability - 0.5)
confidence = 0.5 + distance_from_decision
return float(np.clip(confidence, 0.0, 1.0))
[docs]
@dataclass
class ModelExplanationService:
prediction_service: PredictionService
[docs]
def explain_prediction(self, passenger: Passenger) -> PredictionExplanation:
prediction_result = self.prediction_service.predict_survival(passenger)
feature_impacts = self._calculate_feature_impacts(passenger)
decision_factors = self._extract_decision_factors(feature_impacts)
confidence_level = self._determine_confidence_level(
prediction_result.probability
)
return PredictionExplanation(
prediction=prediction_result.prediction,
probability=prediction_result.probability,
feature_impacts=feature_impacts,
decision_factors=decision_factors,
confidence_level=confidence_level,
)
[docs]
def get_model_statistics(self, model: TrainedModel) -> dict[str, Any]:
weights = model.weights
return {
"total_features": len(weights),
"weight_magnitude": float(np.linalg.norm(weights)),
"positive_weights": int(np.sum(weights > 0)),
"negative_weights": int(np.sum(weights < 0)),
"weight_range": {"min": float(weights.min()), "max": float(weights.max())},
"bias": float(model.bias),
"weight_mean": float(weights.mean()),
"weight_std": float(weights.std()),
"weights_sum": float(np.sum(weights)),
}
[docs]
def _calculate_feature_impacts(
self, passenger: Passenger
) -> list[FeatureImpactAnalysis]:
model = self.prediction_service.model
preprocessor = self.prediction_service.preprocessor
passenger_df = self.prediction_service._passenger_to_dataframe(passenger)
dummy_dataset = Dataset(
features=passenger_df,
target=None,
feature_names=list(passenger_df.columns),
target_name="dummy",
)
processed_data = preprocessor.transform(dummy_dataset)
numeric_features = processed_data.features.select_dtypes(include=[np.number])
if numeric_features.empty:
return []
X_original = numeric_features.values.astype(np.float64)
preprocessing_artifacts = model.preprocessing_artifacts
if preprocessing_artifacts and "poly_transformer" in preprocessing_artifacts:
poly_transformer = preprocessing_artifacts["poly_transformer"]
X_poly = poly_transformer.transform(X_original)
if (
"X_mean" in preprocessing_artifacts
and "X_std" in preprocessing_artifacts
):
X_mean = preprocessing_artifacts["X_mean"]
X_std = preprocessing_artifacts["X_std"]
X = (X_poly - X_mean) / X_std
else:
X = X_poly
else:
X = X_original
if X.shape[1] != len(model.feature_names):
aligned_X = np.zeros((X.shape[0], len(model.feature_names)))
min_features = min(X.shape[1], len(model.feature_names))
aligned_X[:, :min_features] = X[:, :min_features]
X = aligned_X
feature_impacts = []
total_impact = 0.0
for i, feature_name in enumerate(model.feature_names):
if i < len(model.weights):
feature_value = X[0][i]
weight = model.weights[i]
impact = weight * feature_value
total_impact += abs(impact)
feature_impacts.append(
FeatureImpactAnalysis(
feature_name=feature_name,
impact_score=float(impact),
weight=float(weight),
feature_value=float(feature_value),
contribution=0.0,
)
)
if total_impact > 0:
for impact in feature_impacts:
impact.contribution = abs(impact.impact_score) / total_impact * 100
else:
for impact in feature_impacts:
impact.contribution = 0.0
return sorted(feature_impacts, key=lambda x: abs(x.impact_score), reverse=True)
[docs]
def _determine_confidence_level(self, probability: float) -> str:
distance = abs(probability - 0.5)
if distance > 0.3: # probability < 0.2 or > 0.8
return "Высокий"
if distance > 0.15: # probability < 0.35 or > 0.65
return "Средний"
return "Низкий"
[docs]
class ServiceFactory:
[docs]
@staticmethod
def create_training_service(preprocessor: DataPreprocessor) -> ModelTrainingService:
return ModelTrainingService(preprocessor)
[docs]
@staticmethod
def create_prediction_service(
model: TrainedModel, preprocessor: DataPreprocessor
) -> PredictionService:
return PredictionService(model, preprocessor)
[docs]
@staticmethod
def create_explanation_service(
prediction_service: PredictionService,
) -> ModelExplanationService:
return ModelExplanationService(prediction_service)