第15章:强化学习微调 II: RL-15.2 人类反馈的强化学习(RLHF)#

15.2 人类反馈的强化学习(RLHF)#

在前一节中,我们探讨了强化学习的基本概念及其在语言模型优化中的应用挑战。虽然传统的强化学习方法为我们提供了一个有价值的框架,但它们在语言模型微调中的直接应用面临着奖励函数设计困难、样本效率低下等问题。人类反馈的强化学习(Reinforcement Learning from Human Feedback,RLHF)应运而生,它提供了一种更加结构化和有效的方法,将人类偏好整合到语言模型的训练过程中。

RLHF已成为当前最先进的大型语言模型(如ChatGPT、Claude和Llama 2)对齐人类偏好的主要方法。在故事讲述AI的背景下,RLHF使我们能够训练模型生成不仅在技术上正确,而且真正符合人类审美和价值观的故事。本节将详细介绍RLHF的工作原理、实现流程和在故事生成中的应用。

RLHF的工作原理#

RLHF的核心思想是利用人类反馈来指导模型的优化过程。与传统的强化学习相比,RLHF不依赖于预定义的奖励函数,而是从人类偏好数据中学习奖励信号。这种方法更好地捕捉了难以形式化的人类价值观和偏好,如故事的创意性、吸引力和情感共鸣。

RLHF的基本工作流程包括三个主要阶段:

  1. 监督式微调(SFT)

    • 使用高质量的示范数据对预训练语言模型进行初步微调

    • 这一阶段与第14章讨论的SFT过程相同

    • 目标是使模型能够生成符合基本要求的输出

  2. 奖励模型训练(Reward Model Training)

    • 收集人类对模型生成内容的偏好数据

    • 训练一个奖励模型来预测人类偏好

    • 这个奖励模型将在后续的强化学习阶段提供奖励信号

  3. 强化学习优化(RL Optimization)

    • 使用奖励模型提供的信号,通过强化学习算法(通常是PPO)优化语言模型

    • 在优化过程中加入约束,防止模型偏离原始语言分布

    • 最终得到一个既保留语言能力又符合人类偏好的模型

这三个阶段形成了一个完整的RLHF流程,下面我们将详细探讨每个阶段的具体实现。

人类偏好数据的收集#

RLHF的成功很大程度上取决于高质量人类偏好数据的收集。在故事讲述的背景下,这涉及到收集人类对不同故事版本的评价和偏好。

偏好数据的类型#

人类偏好数据通常以以下形式之一收集:

  1. 比较数据(Comparative Data)

    • 人类评估者比较同一提示下生成的两个或多个故事版本

    • 选择他们更喜欢的版本或对它们进行排序

    • 这是RLHF中最常用的数据类型,因为相对判断通常比绝对评分更可靠

  2. 评分数据(Rating Data)

    • 人类评估者对单个故事进行评分(例如1-5分)

    • 评分可以是整体评分,也可以针对特定维度(如创意性、连贯性)

    • 这种数据更容易收集,但可能受到评分标准不一致的影响

  3. 多维度评估(Multi-dimensional Assessment)

    • 结合比较和评分,对故事的多个方面进行评估

    • 例如,可以让评估者比较两个故事,并说明哪个故事在创意性、教育价值等方面更好

    • 这提供了更丰富的信息,但增加了数据收集的复杂性

数据收集策略#

有效的偏好数据收集需要精心设计的策略:

  1. 评估者选择

    • 根据目标受众选择合适的评估者

    • 对于儿童故事,可以包括教育工作者、家长和儿童文学专家

    • 理想情况下,也应包括目标年龄段的儿童(在适当监督下)

  2. 提示设计

    • 创建多样化的提示,覆盖不同类型的故事和场景

    • 包括具有挑战性的提示,测试模型在复杂情境中的表现

    • 确保提示分布与实际应用场景相符

  3. 采样策略

    • 使用不同的采样参数(温度、top-p等)生成多样化的故事

    • 包括SFT模型和不同阶段的RL模型生成的样本

    • 主动学习方法可以帮助选择最有信息量的样本对进行评估

  4. 评估指南

    • 为评估者提供明确的指导和标准

    • 解释各评估维度的含义和重要性

    • 提供示例和校准练习,确保评估一致性

  5. 质量控制

    • 包含重复样本,检查评估者的一致性

    • 使用专家评估作为参考标准

    • 过滤掉不一致或低质量的评估

偏好数据收集实例#

以下是一个为故事讲述AI收集偏好数据的实际流程示例:

import pandas as pd
import random
from datetime import datetime

class PreferenceDataCollector:
    def __init__(self, model, tokenizer, prompts_file, output_file):
        """初始化偏好数据收集器"""
        self.model = model
        self.tokenizer = tokenizer
        self.prompts = self.load_prompts(prompts_file)
        self.output_file = output_file
        self.collected_data = []
        
    def load_prompts(self, prompts_file):
        """加载故事提示"""
        df = pd.read_csv(prompts_file)
        return df["prompt"].tolist()
    
    def generate_story_pair(self, prompt, temp_a=0.7, temp_b=1.0):
        """为同一提示生成两个不同的故事版本"""
        # 生成第一个故事(较低温度)
        inputs = self.tokenizer(prompt, return_tensors="pt")
        outputs_a = self.model.generate(
            inputs.input_ids,
            max_length=1024,
            temperature=temp_a,
            top_p=0.9,
            do_sample=True
        )
        story_a = self.tokenizer.decode(outputs_a[0], skip_special_tokens=True)
        
        # 生成第二个故事(较高温度)
        outputs_b = self.model.generate(
            inputs.input_ids,
            max_length=1024,
            temperature=temp_b,
            top_p=0.9,
            do_sample=True
        )
        story_b = self.tokenizer.decode(outputs_b[0], skip_special_tokens=True)
        
        return story_a, story_b
    
    def collect_human_preference(self, prompt, story_a, story_b, evaluator_id):
        """收集人类评估者的偏好"""
        print("\n" + "="*80)
        print(f"提示: {prompt}")
        print("-"*80)
        print(f"故事 A:\n{story_a}")
        print("-"*80)
        print(f"故事 B:\n{story_b}")
        print("-"*80)
        
        # 收集整体偏好
        while True:
            preference = input("您更喜欢哪个故事? (A/B/相等): ").upper()
            if preference in ["A", "B", "相等"]:
                break
            print("无效输入,请输入 A, B 或 相等")
        
        # 收集维度评分
        dimensions = ["创意性", "连贯性", "教育价值", "吸引力"]
        ratings_a = {}
        ratings_b = {}
        
        print("\n请为每个故事的以下维度评分 (1-5分,5分最高):")
        for dim in dimensions:
            while True:
                try:
                    rating_a = int(input(f"故事 A 的{dim}评分 (1-5): "))
                    if 1 <= rating_a <= 5:
                        ratings_a[dim] = rating_a
                        break
                    print("请输入1-5之间的整数")
                except ValueError:
                    print("请输入有效的整数")
            
            while True:
                try:
                    rating_b = int(input(f"故事 B 的{dim}评分 (1-5): "))
                    if 1 <= rating_b <= 5:
                        ratings_b[dim] = rating_b
                        break
                    print("请输入1-5之间的整数")
                except ValueError:
                    print("请输入有效的整数")
        
        # 收集评论
        comments = input("\n请提供关于您选择的简短解释 (可选): ")
        
        # 记录数据
        data_point = {
            "prompt": prompt,
            "story_a": story_a,
            "story_b": story_b,
            "preference": preference,
            "ratings_a": ratings_a,
            "ratings_b": ratings_b,
            "comments": comments,
            "evaluator_id": evaluator_id,
            "timestamp": datetime.now().isoformat()
        }
        
        self.collected_data.append(data_point)
        return data_point
    
    def run_collection_session(self, num_samples, evaluator_id):
        """运行一个完整的数据收集会话"""
        # 随机选择提示
        selected_prompts = random.sample(self.prompts, min(num_samples, len(self.prompts)))
        
        for prompt in selected_prompts:
            # 生成故事对
            story_a, story_b = self.generate_story_pair(prompt)
            
            # 收集偏好
            self.collect_human_preference(prompt, story_a, story_b, evaluator_id)
            
            # 定期保存数据
            self.save_data()
        
        print(f"\n会话完成!已收集 {len(selected_prompts)} 个偏好数据点。")
    
    def save_data(self):
        """保存收集的数据"""
        df = pd.DataFrame(self.collected_data)
        df.to_json(self.output_file, orient="records", lines=True)
        print(f"数据已保存到 {self.output_file}")

# 使用示例
# collector = PreferenceDataCollector(model, tokenizer, "story_prompts.csv", "preference_data.jsonl")
# collector.run_collection_session(10, "evaluator_001")

这个示例展示了一个交互式的偏好数据收集过程,包括整体偏好和多维度评分。在实际应用中,这可以扩展为一个web界面,使评估者能够更方便地参与。

奖励模型的训练#

一旦收集了足够的人类偏好数据,下一步是训练一个奖励模型(Reward Model,RM),该模型学习预测人类对故事的评价。奖励模型将在后续的强化学习阶段提供奖励信号。

奖励模型架构#

奖励模型通常基于与策略模型相同或相似的预训练语言模型,但输出层被修改为产生单一的标量奖励值。常见的架构选择包括:

  1. 基于编码器的奖励模型

    • 使用BERT、RoBERTa等编码器模型

    • 将[CLS]标记的最终表示通过一个线性层映射到标量奖励

    • 优点:计算效率高,适合处理固定长度的输入

    • 缺点:可能不如解码器模型那样擅长捕捉长文本的细微差别

  2. 基于解码器的奖励模型

    • 使用GPT、Llama等解码器模型

    • 将最后一个标记的隐藏状态通过一个线性层映射到标量奖励

    • 优点:与生成模型架构一致,可能更好地理解生成内容的质量

    • 缺点:计算成本较高

  3. 混合架构

    • 结合编码器和解码器的优势

    • 例如,使用T5等编码器-解码器模型

    • 可以更灵活地处理不同类型的输入

奖励模型训练目标#

奖励模型的训练目标是学习一个函数,该函数能够准确预测人类对故事的偏好。对于比较数据,常用的训练目标是Bradley-Terry模型,它假设人类选择故事A而非故事B的概率与两个故事的奖励差异成正比:

$$P(A \succ B) = \sigma(r(A) - r(B))$$

其中,$\sigma$是sigmoid函数,$r(A)$和$r(B)$是奖励模型对故事A和B的预测奖励。

相应的损失函数通常是二元交叉熵:

$$\mathcal{L} = -y \log(P(A \succ B)) - (1-y) \log(1 - P(A \succ B))$$

其中,$y$是人类偏好的标签(如果人类偏好A则为1,偏好B则为0)。

奖励模型训练实现#

以下是一个基于PyTorch的奖励模型训练实现示例:

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments
import pandas as pd
import json
import numpy as np

class PreferenceDataset(Dataset):
    """人类偏好数据集"""
    def __init__(self, data_file, tokenizer, max_length=512):
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.data = []
        
        # 加载数据
        with open(data_file, 'r') as f:
            for line in f:
                item = json.loads(line)
                # 只使用明确偏好A或B的数据
                if item["preference"] in ["A", "B"]:
                    self.data.append(item)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        
        # 编码故事A
        encoding_a = self.tokenizer(
            item["story_a"],
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        
        # 编码故事B
        encoding_b = self.tokenizer(
            item["story_b"],
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        
        # 准备标签(1表示偏好A,0表示偏好B)
        label = 1.0 if item["preference"] == "A" else 0.0
        
        return {
            "input_ids_a": encoding_a["input_ids"].squeeze(),
            "attention_mask_a": encoding_a["attention_mask"].squeeze(),
            "input_ids_b": encoding_b["input_ids"].squeeze(),
            "attention_mask_b": encoding_b["attention_mask"].squeeze(),
            "label": torch.tensor(label, dtype=torch.float)
        }

class RewardModel(nn.Module):
    """奖励模型"""
    def __init__(self, model_name):
        super(RewardModel, self).__init__()
        # 加载预训练模型
        self.backbone = AutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=1  # 单一奖励分数
        )
    
    def forward(self, input_ids, attention_mask):
        outputs = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
        return outputs.logits  # 形状为 [batch_size, 1]

def train_reward_model(data_file, model_name, output_dir, batch_size=8, epochs=3):
    """训练奖励模型"""
    # 初始化分词器和模型
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = RewardModel(model_name)
    
    # 准备数据集
    dataset = PreferenceDataset(data_file, tokenizer)
    
    # 80-20分割训练集和验证集
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    # 数据加载器
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    
    # 优化器
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
    
    # 训练循环
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    for epoch in range(epochs):
        # 训练阶段
        model.train()
        train_loss = 0
        for batch in train_loader:
            # 将数据移到设备
            input_ids_a = batch["input_ids_a"].to(device)
            attention_mask_a = batch["attention_mask_a"].to(device)
            input_ids_b = batch["input_ids_b"].to(device)
            attention_mask_b = batch["attention_mask_b"].to(device)
            labels = batch["label"].to(device)
            
            # 前向传播
            rewards_a = model(input_ids_a, attention_mask_a).squeeze()
            rewards_b = model(input_ids_b, attention_mask_b).squeeze()
            
            # 计算概率和损失
            probs = torch.sigmoid(rewards_a - rewards_b)
            loss = F.binary_cross_entropy(probs, labels)
            
            # 反向传播
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # 验证阶段
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for batch in val_loader:
                # 将数据移到设备
                input_ids_a = batch["input_ids_a"].to(device)
                attention_mask_a = batch["attention_mask_a"].to(device)
                input_ids_b = batch["input_ids_b"].to(device)
                attention_mask_b = batch["attention_mask_b"].to(device)
                labels = batch["label"].to(device)
                
                # 前向传播
                rewards_a = model(input_ids_a, attention_mask_a).squeeze()
                rewards_b = model(input_ids_b, attention_mask_b).squeeze()
                
                # 计算概率和损失
                probs = torch.sigmoid(rewards_a - rewards_b)
                loss = F.binary_cross_entropy(probs, labels)
                
                val_loss += loss.item()
                
                # 计算准确率
                predictions = (probs >= 0.5).float()
                correct += (predictions == labels).sum().item()
                total += labels.size(0)
        
        # 打印统计信息
        print(f"Epoch {epoch+1}/{epochs}")
        print(f"Train Loss: {train_loss/len(train_loader):.4f}")
        print(f"Val Loss: {val_loss/len(val_loader):.4f}")
        print(f"Val Accuracy: {correct/total:.4f}")
    
    # 保存模型
    model.backbone.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)
    
    return model, tokenizer

# 使用示例
# train_reward_model("preference_data.jsonl", "roberta-base", "story_reward_model")

这个实现展示了如何使用比较数据训练奖励模型。在实际应用中,可能需要更复杂的数据处理和模型架构,以及更多的超参数调优。

奖励模型评估#

训练奖励模型后,需要评估其性能以确保它能够准确预测人类偏好。常用的评估指标包括:

  1. 偏好预测准确率

    • 模型正确预测人类偏好的比例

    • 在保留的测试集上计算

    • 基准线是50%(随机猜测)

  2. 奖励分布分析

    • 检查奖励分数的分布是否合理

    • 确保分数能够区分不同质量的故事

    • 可以通过直方图或箱线图可视化

  3. 与人类评分的相关性

    • 如果有单独的评分数据,计算模型预测与人类评分的相关性

    • 使用皮尔逊或斯皮尔曼相关系数

  4. 维度一致性

    • 检查模型是否与人类在不同评估维度上的判断一致

    • 例如,模型应该给予创意性高的故事更高的奖励

以下是一个评估奖励模型的示例代码:

def evaluate_reward_model(model, tokenizer, test_file, max_length=512):
    """评估奖励模型性能"""
    # 加载测试数据
    test_data = []
    with open(test_file, 'r') as f:
        for line in f:
            item = json.loads(line)
            if item["preference"] in ["A", "B"]:
                test_data.append(item)
    
    # 设置设备
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    
    # 评估指标
    correct = 0
    total = 0
    rewards_a = []
    rewards_b = []
    
    with torch.no_grad():
        for item in test_data:
            # 编码故事
            encoding_a = tokenizer(
                item["story_a"],
                max_length=max_length,
                padding="max_length",
                truncation=True,
                return_tensors="pt"
            ).to(device)
            
            encoding_b = tokenizer(
                item["story_b"],
                max_length=max_length,
                padding="max_length",
                truncation=True,
                return_tensors="pt"
            ).to(device)
            
            # 获取奖励分数
            reward_a = model(encoding_a["input_ids"], encoding_a["attention_mask"]).item()
            reward_b = model(encoding_b["input_ids"], encoding_b["attention_mask"]).item()
            
            # 记录奖励分数
            rewards_a.append(reward_a)
            rewards_b.append(reward_b)
            
            # 检查预测是否与人类偏好一致
            predicted_preference = "A" if reward_a > reward_b else "B"
            if predicted_preference == item["preference"]:
                correct += 1
            total += 1
    
    # 计算准确率
    accuracy = correct / total
    
    # 分析奖励分布
    rewards_a = np.array(rewards_a)
    rewards_b = np.array(rewards_b)
    reward_diff = rewards_a - rewards_b
    
    # 打印统计信息
    print(f"测试集大小: {total}")
    print(f"准确率: {accuracy:.4f}")
    print(f"奖励A均值: {rewards_a.mean():.4f}, 标准差: {rewards_a.std():.4f}")
    print(f"奖励B均值: {rewards_b.mean():.4f}, 标准差: {rewards_b.std():.4f}")
    print(f"奖励差异均值: {reward_diff.mean():.4f}, 标准差: {reward_diff.std():.4f}")
    
    # 可以添加更多分析,如绘制分布图等
    
    return {
        "accuracy": accuracy,
        "rewards_a": rewards_a.tolist(),
        "rewards_b": rewards_b.tolist(),
        "reward_diff": reward_diff.tolist()
    }

一个好的奖励模型应该在测试集上达到显著高于随机猜测的准确率(通常至少70-80%),并且能够为不同质量的故事分配明显不同的奖励分数。

RLHF的实现流程#

在训练好奖励模型后,我们可以进入RLHF的最后阶段:使用强化学习优化语言模型。这一阶段通常使用近端策略优化(Proximal Policy Optimization,PPO)算法,我们将在下一节详细讨论。这里,我们先概述RLHF的完整实现流程。

RLHF流程概述#

完整的RLHF实现流程包括以下步骤:

  1. 准备基础模型

    • 选择一个预训练的语言模型作为起点

    • 确保模型具有足够的基础能力,如语言理解和生成

  2. 监督式微调(SFT): -To save on context only part of this file has been shown to you. You should retry this tool after you have searched inside the file with grep -n in order to find the line numbers of what you are looking for.