JustPaste
HomeCategoriesAboutDonateContactTerms of UsePrivacy Policy
JustPaste

Free online notepad — write and share instantly

Navigate

  • Home
  • Timeline
  • Categories

Info

  • About
  • Donate
  • Contact

Legal

  • Terms of Use
  • Privacy Policy

© 2026 JustPaste.app. All rights reserved.

Made with ♥ by JustPaste

Untitled Page | JustPaste.app
2 days ago2 views
👨‍💻Programming
import os
import torch
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

from datasets import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
)


# =========================
# Config
# =========================

CSV_PATH = r"C:\Users\pc\Desktop\final_phishing_dataset_cleaned.csv"  # must contain: email_content, label
OUTPUT_DIR = "./encoder_distillbert_last1_results"

MODEL_NAMES = [
   # "bert-base-uncased",
    "distilbert-base-uncased",
   # "roberta-base",
  #  "microsoft/deberta-v3-base",
]

MAX_LENGTH = 512
NUM_LABELS = 2
EPOCHS = 3
LEARNING_RATE = 2e-5
BATCH_SIZE = 16
SEED = 42


# =========================
# Dataset class
# =========================

class EmailDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {
            key: torch.tensor(value[idx])
            for key, value in self.encodings.items()
        }
        item["labels"] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)


# =========================
# Weighted Trainer
# =========================

class WeightedTrainer(Trainer):
    def __init__(self, class_weights=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits

        loss_fct = torch.nn.CrossEntropyLoss(
            weight=self.class_weights.to(logits.device)
        )

        loss = loss_fct(logits, labels)

        return (loss, outputs) if return_outputs else loss


# =========================
# Metrics
# =========================

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    precision, recall, f1, _ = precision_recall_fscore_support(
        labels,
        predictions,
        average="binary",
        zero_division=0,
    )

    accuracy = accuracy_score(labels, predictions)

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
    }


# =========================
# Freezing logic
# =========================

def freeze_all_parameters(model):
    for param in model.parameters():
        param.requires_grad = False


def unfreeze_classification_head(model):
    """
    Different encoder models use different classifier names.
    This covers BERT, RoBERTa, DeBERTa, DistilBERT-style heads.
    """

    possible_head_names = [
        "classifier",
        "score",
        "pre_classifier",
    ]

    for name, module in model.named_children():
        if name in possible_head_names:
            for param in module.parameters():
                param.requires_grad = True

    # Some models, especially DistilBERT, have both pre_classifier and classifier.
    if hasattr(model, "pre_classifier"):
        for param in model.pre_classifier.parameters():
            param.requires_grad = True

    if hasattr(model, "classifier"):
        for param in model.classifier.parameters():
            param.requires_grad = True


def get_encoder_layers(model):
    """
    Returns the transformer layers for common encoder-only models.

    BERT:
        model.bert.encoder.layer

    RoBERTa:
        model.roberta.encoder.layer

    DeBERTa:
        model.deberta.encoder.layer

    DistilBERT:
        model.distilbert.transformer.layer
    """

    if hasattr(model, "bert"):
        return model.bert.encoder.layer

    if hasattr(model, "roberta"):
        return model.roberta.encoder.layer

    if hasattr(model, "deberta"):
        return model.deberta.encoder.layer

    if hasattr(model, "distilbert"):
        print("Number of layers:", len(model.distilbert.transformer.layer))
        return model.distilbert.transformer.layer

    raise ValueError(
        f"Unsupported model architecture: {model.__class__.__name__}"
    )


def unfreeze_last_n_layers(model, n=4):
    layers = get_encoder_layers(model)

    for layer in layers[-n:]:
        for param in layer.parameters():
            param.requires_grad = True


def prepare_last_n_layer_finetuning(model, n=1):
    """
    Freezes the whole model, then unfreezes:
      1. classification head
      2. last n transformer layers
    """

    freeze_all_parameters(model)
    unfreeze_classification_head(model)
    unfreeze_last_n_layers(model, n=n)

    return model


def print_trainable_parameters(model):
    trainable = 0
    total = 0

    for _, param in model.named_parameters():
        total += param.numel()
        if param.requires_grad:
            trainable += param.numel()

    percent = 100 * trainable / total

    print(f"Trainable parameters: {trainable:,}")
    print(f"Total parameters:     {total:,}")
    print(f"Trainable percent:   {percent:.2f}%")


# =========================
# Data loading
# =========================

df = pd.read_csv(CSV_PATH)

df = df[["email_content", "label"]].dropna()
df["email_content"] = df["email_content"].astype(str)
df["label"] = df["label"].astype(int)

train_df, test_df = train_test_split(
    df,
    test_size=0.2,
    random_state=SEED,
    stratify=df["label"],
)

class_weights = compute_class_weight(
    class_weight="balanced",
    classes=np.unique(train_df["label"]),
    y=train_df["label"],
)

class_weights = torch.tensor(class_weights, dtype=torch.float)

print("Class weights:", class_weights)


# =========================
# Training loop
# =========================

results = []

for model_name in MODEL_NAMES:
    print("\n" + "=" * 80)
    print(f"Training model: {model_name}")
    print("=" * 80)

    safe_model_name = model_name.replace("/", "_")
    model_output_dir = os.path.join(OUTPUT_DIR, safe_model_name)

    tokenizer = AutoTokenizer.from_pretrained(model_name)

    train_encodings = tokenizer(
        train_df["email_content"].tolist(),
        truncation=True,
        padding=True,
        max_length=MAX_LENGTH,
    )

    test_encodings = tokenizer(
        test_df["email_content"].tolist(),
        truncation=True,
        padding=True,
        max_length=MAX_LENGTH,
    )

    train_dataset = EmailDataset(
        train_encodings,
        train_df["label"].tolist(),
    )

    test_dataset = EmailDataset(
        test_encodings,
        test_df["label"].tolist(),
    )

    model = AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=NUM_LABELS,
    )

    # Important part:
    # Freeze everything except classification head + last 4 encoder layers.
    model = prepare_last_n_layer_finetuning(model, n=4)

    print_trainable_parameters(model)

    training_args = TrainingArguments(
        output_dir=model_output_dir,
        num_train_epochs=EPOCHS,
        per_device_train_batch_size=BATCH_SIZE,
        per_device_eval_batch_size=BATCH_SIZE,
        learning_rate=LEARNING_RATE,
        weight_decay=0.01,

        eval_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="steps",
        logging_steps=100,

        load_best_model_at_end=True,
        metric_for_best_model="f1",
        greater_is_better=True,

        save_total_limit=2,
        report_to="none",

        seed=SEED,
        fp16=torch.cuda.is_available(),
    )

    trainer = WeightedTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=test_dataset,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
        class_weights=class_weights,
    )

    trainer.train()

    eval_result = trainer.evaluate()

    print(f"Evaluation result for {model_name}:")
    print(eval_result)

    results.append({
        "model": model_name,
        "accuracy": eval_result.get("eval_accuracy"),
        "precision": eval_result.get("eval_precision"),
        "recall": eval_result.get("eval_recall"),
        "f1": eval_result.get("eval_f1"),
    })

    trainer.save_model(model_output_dir)
    tokenizer.save_pretrained(model_output_dir)


# =========================
# Save comparison results
# =========================

results_df = pd.DataFrame(results)
results_df.to_csv("encoder_last4_comparison_results.csv", index=False)

print("\nFinal Results:")
print(results_df)
← Back to timeline