Skip to content
Snippets Groups Projects

Adding the main workflow

Merged BERRADA Mehdi requested to merge workflow into main
1 file
+ 290
0
Compare changes
  • Side-by-side
  • Inline
Workflow.py 0 → 100644
+ 290
0
# Importing the required libraries
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler, StandardScaler, OrdinalEncoder, OneHotEncoder, LabelEncoder
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, confusion_matrix
from sklearn.model_selection import learning_curve
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
# Preprocessing the data
def load_data(data_path):
# Load the data
data = pd.read_csv(data_path)
return data
# Cleaning the data
def column_label(data_path):
data = pd.read_csv(data_path)
try:
# Check if column headers are numerical or not correctly labeled
for i in data.columns:
if float(i): # Attempting to cast headers to float as a check
return pd.read_csv(data_path, header=None)
except:
return data
# Cleaning the data
def preprocessing(data_path, skew_threshold=0.5, z_score_threshold=3):
# Load the data
data = column_label(data_path)
# Saving the target column and its values for later and encoding it
target = data.iloc[:, -1]
encoder = LabelEncoder()
target = encoder.fit_transform(target)
# There is a problem with the encoding of the target column, so we will fix it
for i in range(len(target)):
if target[i]==2:
target[i]=1
# Handle "id" column if present
if 'id' in data.columns:
data.drop(columns=['id'], inplace=True)
# Identify numerical and categorical columns
numerical_cols = data.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_cols = data.select_dtypes(include=['object', 'category']).columns.tolist()
# Scale and normalize numerical columns
if numerical_cols:
print('#######BEFORE SCALING AND NORMALIZING########')
print(data[numerical_cols].describe())
min_max_scaler = MinMaxScaler()
data[numerical_cols] = min_max_scaler.fit_transform(data[numerical_cols])
standard_scaler = StandardScaler()
data[numerical_cols] = standard_scaler.fit_transform(data[numerical_cols])
print('#######AFTER SCALING AND NORMALIZING########')
print(data[numerical_cols].describe())
# Fill categorical columns
if categorical_cols:
# Clean up specific kidney dataset quirks
data['dm'].replace({'\tno': 'no', '\tyes': 'yes', ' yes': 'yes'}, inplace=True)
data['cad'].replace({'\tno': 'no'}, inplace=True)
data['classification'].replace({'ckd\t': 'ckd'}, inplace=True)
data.replace(to_replace=r'\t', value='', regex=True, inplace=True)
data.replace(to_replace='?', value=np.nan, inplace=True)
for col in tqdm(categorical_cols, desc="Processing categorical features"):
print(f"\nProcessing column: {col}")
category_frequencies = data[col].value_counts(normalize=True)
print("Possible categories and their frequencies:")
print(category_frequencies)
missing_mask = data[col].isnull()
data.loc[missing_mask, col] = np.random.choice(category_frequencies.index,
size=missing_mask.sum(),
p=category_frequencies.values)
# One-hot encode categorical columns
ord_encoder = OrdinalEncoder()
one_hot_encoder = OneHotEncoder(sparse_output=False) # Ensure dense output
for col in categorical_cols:
if len(data[col].value_counts()) <= 2:
# Ordinal encoding for binary categorical columns
data[col] = ord_encoder.fit_transform(data[[col]]).astype(int)
else:
# One-hot encoding for multi-class categorical columns
one_hot_data = one_hot_encoder.fit_transform(data[[col]])
names_cols = one_hot_encoder.get_feature_names_out([col])
one_hot_df = pd.DataFrame(one_hot_data, columns=names_cols, index=data.index) # Match index
data = pd.concat([data, one_hot_df], axis=1) # Add one-hot columns
c = 0
for char in numerical_cols:
if type(char) == str:
c += 1
if c != 0:
# Fill numerical columns
for col in numerical_cols:
is_skewed = abs(data[col].skew()) > skew_threshold
z_scores = np.abs(stats.zscore(data[col].dropna())) if not data[col].isnull().all() else []
has_outliers = any(z_scores > z_score_threshold)
if is_skewed or has_outliers:
median = data[col].median()
data[col] = data[col].fillna(median)
print(f"{col}: Filled missing values with median.")
else:
mean = data[col].mean()
data[col] = data[col].fillna(mean)
print(f"{col}: Filled missing values with mean.")
# Handle columns with similar names (e.g., rc5.8, rc5.9)
column_groups = {}
for col in data.columns:
# Extract the prefix of the column name (e.g., 'rc' from 'rc5.8')
prefix = col.split('.')[0]
if prefix not in column_groups:
column_groups[prefix] = []
column_groups[prefix].append(col)
# Keep only one column from each group
for group, cols in column_groups.items():
if len(cols) > 1:
# Drop all but the first column in the group
data.drop(columns=cols[1:], inplace=True)
# Handle columns with similar names (e.g., rc5.8, rc5.9)
column_groups = {}
for col in data.columns:
# Extract the prefix of the column name (e.g., 'rc' from 'rc5.8')
prefix = col.split('_')[0]
if prefix not in column_groups:
column_groups[prefix] = []
column_groups[prefix].append(col)
# Keep only one column from each group
for group, cols in column_groups.items():
if len(cols) > 1:
# Drop all but the first column in the group
data.drop(columns=cols[1:], inplace=True)
return data, target
# Data Visualisation
def get_categorical_columns(data):
return data.select_dtypes(include=['object', 'category']).columns.tolist()
# Sub-Function to Identify Numerical Columns
def get_numerical_columns(data):
# Drop the 'id' column if present
if 'id' in data.columns:
data.drop(columns = ['id'], inplace=True) #Not a relevant column for the classification
return data.iloc[:,:-1].select_dtypes(include=['int64', 'float64']).columns.tolist()
def dataviz(df,columns=None):
if not columns :
df.hist(bins=10, figsize=(15, 10))
plt.suptitle('Distribution of Numerical Columns', y=0.92)
plt.show()
else :
for c in columns :
# Plot a histogram
sns.histplot(df[c], kde=True)
plt.title(f'Histogram of the feature {str(c)}')
plt.xlabel(str(c))
plt.ylabel('Frequency')
plt.show()
return None
# Splitting the data
def split_data(data, target, test_size=0.2, random_state=42):
X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=test_size, random_state=random_state)
return X_train, X_test, y_train, y_test
# Training and fine-tuning the modles' hyperparameters
def train_and_tune_model(X_train, y_train, model, param_grid, cv, scoring="f1", verbose=1, return_full_search=False):
# Initialize GridSearchCV
grid_search = GridSearchCV(
estimator=model,
param_grid=param_grid,
cv=cv,
scoring=scoring,
n_jobs=-1, # Parallelize to speed up training
verbose=verbose,
refit="f1" if isinstance(scoring, dict) else scoring # Refits on the primary metric if multiple scoring metrics are used
)
# Fit the model
print("Starting Grid Search...")
grid_search.fit(X_train, y_train)
print("Grid Search Complete!")
# Retrieve the best model and its parameters
best_model = grid_search.best_estimator_
best_params = grid_search.best_params_
best_score = grid_search.best_score_
# Display best parameters and score
print("\nBest Parameters Found:")
print(best_params)
print(f"Best {scoring if isinstance(scoring, str) else 'f1'} Score: {best_score:.4f}")
# Return options
if return_full_search:
return best_model, grid_search
return best_model
# Testing the model
def test_model(X_test, y_test, model):
y_pred = model.predict(X_test)
score = f1_score(y_test, y_pred)
return score
# Displaying results
def display_results(dict_models, X_train, y_train, X_test, y_test, cv, disp_col="F1 Score"):
results = []
for model_name, model_details in tqdm(dict_models.items(), desc="Évaluation des modèles"):
# Extraction du modèle et de sa grille de paramètres
model = model_details["model"]
param_grid = model_details["param_grid"]
# Entraînement et recherche des meilleurs hyperparamètres
best_model = train_and_tune_model(X_train, y_train, model, param_grid, cv, scoring="f1", verbose=0)
# Évaluation du modèle sur les données de test
y_pred = best_model.predict(X_test)
y_prob = best_model.predict_proba(X_test)[:, 1] if hasattr(best_model, "predict_proba") else None
metrics = {
"Model Name": model_name,
disp_col: np.round(f1_score(y_test, y_pred) * 100, 2),
"Accuracy": np.round(accuracy_score(y_test, y_pred) * 100, 2),
"Precision": np.round(precision_score(y_test, y_pred) * 100, 2),
"Recall": np.round(recall_score(y_test, y_pred) * 100, 2),
"ROC-AUC": np.round(roc_auc_score(y_test, y_prob) * 100, 2) if y_prob is not None else "N/A",
}
results.append(metrics)
# Affichage de la matrice de confusion
conf_matrix = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.title(f"Matrice de confusion - {model_name}")
plt.xlabel("Prédictions")
plt.ylabel("Vérité")
plt.show()
# Courbes d'apprentissage
train_sizes, train_scores, valid_scores = learning_curve(
best_model, X_train, y_train, cv=cv, scoring='f1', n_jobs=-1
)
plt.figure(figsize=(8, 6))
plt.plot(train_sizes, np.mean(train_scores, axis=1), label="F1 Score (Train)")
plt.plot(train_sizes, np.mean(valid_scores, axis=1), label="F1 Score (Validation)")
plt.xlabel("Nombre d'exemples d'entraînement")
plt.ylabel("F1 Score")
plt.title(f"Courbes d'apprentissage - {model_name}")
plt.legend()
plt.show()
# Construction du DataFrame des résultats
df_results = pd.DataFrame(results)
styled_df = df_results.style.highlight_max(subset=[disp_col], color='salmon', axis=0)
return styled_df
\ No newline at end of file
Loading