import os
import torch
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from datasets import Dataset
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer,
)
# =========================
# Config
# =========================
CSV_PATH = r"C:\Users\pc\Desktop\final_phishing_dataset_cleaned.csv" # must contain: email_content, label
OUTPUT_DIR = "./encoder_distillbert_last1_results"
MODEL_NAMES = [
# "bert-base-uncased",
"distilbert-base-uncased",
# "roberta-base",
# "microsoft/deberta-v3-base",
]
MAX_LENGTH = 512
NUM_LABELS = 2
EPOCHS = 3
LEARNING_RATE = 2e-5
BATCH_SIZE = 16
SEED = 42
# =========================
# Dataset class
# =========================
class EmailDataset(torch.utils.data.Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __getitem__(self, idx):
item = {
key: torch.tensor(value[idx])
for key, value in self.encodings.items()
}
item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long)
return item
def __len__(self):
return len(self.labels)
# =========================
# Weighted Trainer
# =========================
class WeightedTrainer(Trainer):
def __init__(self, class_weights=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.class_weights = class_weights
def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
labels = inputs.pop("labels")
outputs = model(**inputs)
logits = outputs.logits
loss_fct = torch.nn.CrossEntropyLoss(
weight=self.class_weights.to(logits.device)
)
loss = loss_fct(logits, labels)
return (loss, outputs) if return_outputs else loss
# =========================
# Metrics
# =========================
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
precision, recall, f1, _ = precision_recall_fscore_support(
labels,
predictions,
average="binary",
zero_division=0,
)
accuracy = accuracy_score(labels, predictions)
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1": f1,
}
# =========================
# Freezing logic
# =========================
def freeze_all_parameters(model):
for param in model.parameters():
param.requires_grad = False
def unfreeze_classification_head(model):
"""
Different encoder models use different classifier names.
This covers BERT, RoBERTa, DeBERTa, DistilBERT-style heads.
"""
possible_head_names = [
"classifier",
"score",
"pre_classifier",
]
for name, module in model.named_children():
if name in possible_head_names:
for param in module.parameters():
param.requires_grad = True
# Some models, especially DistilBERT, have both pre_classifier and classifier.
if hasattr(model, "pre_classifier"):
for param in model.pre_classifier.parameters():
param.requires_grad = True
if hasattr(model, "classifier"):
for param in model.classifier.parameters():
param.requires_grad = True
def get_encoder_layers(model):
"""
Returns the transformer layers for common encoder-only models.
BERT:
model.bert.encoder.layer
RoBERTa:
model.roberta.encoder.layer
DeBERTa:
model.deberta.encoder.layer
DistilBERT:
model.distilbert.transformer.layer
"""
if hasattr(model, "bert"):
return model.bert.encoder.layer
if hasattr(model, "roberta"):
return model.roberta.encoder.layer
if hasattr(model, "deberta"):
return model.deberta.encoder.layer
if hasattr(model, "distilbert"):
print("Number of layers:", len(model.distilbert.transformer.layer))
return model.distilbert.transformer.layer
raise ValueError(
f"Unsupported model architecture: {model.__class__.__name__}"
)
def unfreeze_last_n_layers(model, n=4):
layers = get_encoder_layers(model)
for layer in layers[-n:]:
for param in layer.parameters():
param.requires_grad = True
def prepare_last_n_layer_finetuning(model, n=1):
"""
Freezes the whole model, then unfreezes:
1. classification head
2. last n transformer layers
"""
freeze_all_parameters(model)
unfreeze_classification_head(model)
unfreeze_last_n_layers(model, n=n)
return model
def print_trainable_parameters(model):
trainable = 0
total = 0
for _, param in model.named_parameters():
total += param.numel()
if param.requires_grad:
trainable += param.numel()
percent = 100 * trainable / total
print(f"Trainable parameters: {trainable:,}")
print(f"Total parameters: {total:,}")
print(f"Trainable percent: {percent:.2f}%")
# =========================
# Data loading
# =========================
df = pd.read_csv(CSV_PATH)
df = df[["email_content", "label"]].dropna()
df["email_content"] = df["email_content"].astype(str)
df["label"] = df["label"].astype(int)
train_df, test_df = train_test_split(
df,
test_size=0.2,
random_state=SEED,
stratify=df["label"],
)
class_weights = compute_class_weight(
class_weight="balanced",
classes=np.unique(train_df["label"]),
y=train_df["label"],
)
class_weights = torch.tensor(class_weights, dtype=torch.float)
print("Class weights:", class_weights)
# =========================
# Training loop
# =========================
results = []
for model_name in MODEL_NAMES:
print("\n" + "=" * 80)
print(f"Training model: {model_name}")
print("=" * 80)
safe_model_name = model_name.replace("/", "_")
model_output_dir = os.path.join(OUTPUT_DIR, safe_model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
train_encodings = tokenizer(
train_df["email_content"].tolist(),
truncation=True,
padding=True,
max_length=MAX_LENGTH,
)
test_encodings = tokenizer(
test_df["email_content"].tolist(),
truncation=True,
padding=True,
max_length=MAX_LENGTH,
)
train_dataset = EmailDataset(
train_encodings,
train_df["label"].tolist(),
)
test_dataset = EmailDataset(
test_encodings,
test_df["label"].tolist(),
)
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
num_labels=NUM_LABELS,
)
# Important part:
# Freeze everything except classification head + last 4 encoder layers.
model = prepare_last_n_layer_finetuning(model, n=4)
print_trainable_parameters(model)
training_args = TrainingArguments(
output_dir=model_output_dir,
num_train_epochs=EPOCHS,
per_device_train_batch_size=BATCH_SIZE,
per_device_eval_batch_size=BATCH_SIZE,
learning_rate=LEARNING_RATE,
weight_decay=0.01,
eval_strategy="epoch",
save_strategy="epoch",
logging_strategy="steps",
logging_steps=100,
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
save_total_limit=2,
report_to="none",
seed=SEED,
fp16=torch.cuda.is_available(),
)
trainer = WeightedTrainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=test_dataset,
tokenizer=tokenizer,
compute_metrics=compute_metrics,
class_weights=class_weights,
)
trainer.train()
eval_result = trainer.evaluate()
print(f"Evaluation result for {model_name}:")
print(eval_result)
results.append({
"model": model_name,
"accuracy": eval_result.get("eval_accuracy"),
"precision": eval_result.get("eval_precision"),
"recall": eval_result.get("eval_recall"),
"f1": eval_result.get("eval_f1"),
})
trainer.save_model(model_output_dir)
tokenizer.save_pretrained(model_output_dir)
# =========================
# Save comparison results
# =========================
results_df = pd.DataFrame(results)
results_df.to_csv("encoder_last4_comparison_results.csv", index=False)
print("\nFinal Results:")
print(results_df)2 views