告别跷跷板效应:手把手教你用PaddlePaddle复现腾讯PLE多任务推荐模型
从零实现腾讯PLE模型用PaddlePaddle解决多任务推荐中的跷跷板难题推荐系统发展到今天早已不再是简单的协同过滤或矩阵分解就能满足业务需求。当我们需要同时优化点击率、观看时长、分享率等多个目标时传统的单任务学习模型往往捉襟见肘。腾讯提出的PLEProgressive Layered Extraction模型通过创新的网络结构设计有效缓解了多任务学习中的负迁移和跷跷板效应。本文将带你用PaddlePaddle框架从零开始实现这个曾获RecSys最佳论文奖的模型。1. 环境准备与数据理解在开始构建PLE模型前我们需要确保开发环境配置正确。推荐使用Python 3.7和PaddlePaddle 2.3版本这些版本对PLE模型所需的动态图功能支持最为完善。pip install paddlepaddle-gpu2.3.2.post112 -f https://www.paddlepaddle.org.cn/whl/linux/mkl/avx/stable.html多任务学习的数据集通常包含多个标签列。以视频推荐为例我们的数据集可能包含以下特征用户特征年龄、性别、历史行为等视频特征类别、时长、创作者等上下文特征时间、设备、网络环境等多任务标签点击二分类、完播率回归、分享二分类import paddle from paddle.io import Dataset class MultiTaskDataset(Dataset): def __init__(self, data_path): self.features ... # 加载特征数据 self.labels ... # 加载多任务标签 def __getitem__(self, idx): return { features: self.features[idx], click_label: self.labels[idx][0], watch_label: self.labels[idx][1], share_label: self.labels[idx][2] } def __len__(self): return len(self.features)提示在实际业务中不同任务的样本分布可能不均衡。建议在数据加载阶段实现样本加权采样确保每个任务都能得到充分学习。2. PLE模型架构深度解析PLE模型的核心创新在于其分层渐进式专家网络结构。与传统的MMoE模型相比PLE主要做了三点改进任务专属专家与共享专家分离每个任务有自己的专家网络同时保留共享专家分层门控机制不同层级采用不同的门控策略渐进式特征提取低层网络提取通用特征高层网络提取任务特定特征2.1 专家网络构建我们先实现PLE的基础组件——专家网络。在PLE中专家网络分为两类任务专属专家Task-specific Experts共享专家Shared Expertsclass ExpertLayer(paddle.nn.Layer): def __init__(self, input_size, expert_size, num_experts): super(ExpertLayer, self).__init__() self.experts paddle.nn.LayerList() for i in range(num_experts): expert paddle.nn.Sequential( paddle.nn.Linear(input_size, expert_size), paddle.nn.ReLU() ) self.experts.append(expert) def forward(self, x): expert_outputs [expert(x) for expert in self.experts] return paddle.stack(expert_outputs, axis1) # shape: [batch, num_experts, expert_size]2.2 门控网络设计门控网络是PLE实现自适应特征选择的关键。PLE采用分层门控策略第一层包含N1个门控N个任务门控1个共享门控第二层仅包含N个任务门控class GateLayer(paddle.nn.Layer): def __init__(self, input_size, num_experts): super(GateLayer, self).__init__() self.gate paddle.nn.Sequential( paddle.nn.Linear(input_size, num_experts), paddle.nn.Softmax(axis-1) ) def forward(self, x): gate_values self.gate(x) # shape: [batch, num_experts] return gate_values.unsqueeze(-1) # shape: [batch, num_experts, 1]3. 完整PLE模型实现现在我们将专家网络和门控网络组合起来构建完整的PLE模型。模型包含两个关键层级第一层Extraction Network任务专属专家网络共享专家网络任务门控网络共享门控网络第二层Progressive Layer更深的专家网络任务特定门控class PLE(paddle.nn.Layer): def __init__(self, input_size, num_tasks, experts_per_task3, shared_experts2, expert_size64, tower_size32): super(PLE, self).__init__() # 第一层网络 self.task_experts_layer1 paddle.nn.LayerList() for _ in range(num_tasks): expert ExpertLayer(input_size, expert_size, experts_per_task) self.task_experts_layer1.append(expert) self.shared_experts_layer1 ExpertLayer(input_size, expert_size, shared_experts) self.task_gates_layer1 paddle.nn.LayerList() for _ in range(num_tasks): gate GateLayer(input_size, experts_per_task shared_experts) self.task_gates_layer1.append(gate) self.shared_gate_layer1 GateLayer(input_size, num_tasks * experts_per_task shared_experts) # 第二层网络 self.task_experts_layer2 paddle.nn.LayerList() for _ in range(num_tasks): expert ExpertLayer(expert_size, expert_size, experts_per_task) self.task_experts_layer2.append(expert) self.shared_experts_layer2 ExpertLayer(expert_size, expert_size, shared_experts) self.task_gates_layer2 paddle.nn.LayerList() for _ in range(num_tasks): gate GateLayer(expert_size, experts_per_task shared_experts) self.task_gates_layer2.append(gate) # 任务塔网络 self.towers paddle.nn.LayerList() for _ in range(num_tasks): tower paddle.nn.Sequential( paddle.nn.Linear(expert_size, tower_size), paddle.nn.ReLU(), paddle.nn.Linear(tower_size, 1) ) self.towers.append(tower) def forward(self, x): # 第一层前向传播 task_expert_outputs1 [] for expert in self.task_experts_layer1: task_expert_outputs1.append(expert(x)) shared_expert_output1 self.shared_experts_layer1(x) # 第一层门控 task_outputs [] for i, gate in enumerate(self.task_gates_layer1): expert_output paddle.concat( [task_expert_outputs1[i], shared_expert_output1], axis1) gated_output expert_output * gate(x) task_output paddle.sum(gated_output, axis1) task_outputs.append(task_output) shared_gate_output self.shared_gate_layer1(x) all_expert_output paddle.concat(task_expert_outputs1 [shared_expert_output1], axis1) shared_output paddle.sum(all_expert_output * shared_gate_output, axis1) # 第二层前向传播 task_expert_outputs2 [] for expert, input_feat in zip(self.task_experts_layer2, task_outputs): task_expert_outputs2.append(expert(input_feat)) shared_expert_output2 self.shared_experts_layer2(shared_output) # 第二层门控 final_outputs [] for i, gate in enumerate(self.task_gates_layer2): expert_output paddle.concat( [task_expert_outputs2[i], shared_expert_output2], axis1) gated_output expert_output * gate(task_outputs[i]) task_output paddle.sum(gated_output, axis1) final_outputs.append(self.towers[i](task_output)) return final_outputs4. 训练策略与调优技巧多任务模型的训练比单任务模型更为复杂我们需要特别注意以下几点4.1 损失函数设计PLE模型通常采用加权多任务损失函数$$ \mathcal{L} \sum_{i1}^N w_i \mathcal{L}_i $$其中权重$w_i$可以是固定权重根据业务重要性设定动态学习权重通过模型自动学习class WeightedLoss(paddle.nn.Layer): def __init__(self, num_tasks): super(WeightedLoss, self).__init__() self.weights paddle.nn.ParameterList([ paddle.nn.Parameter(paddle.ones([])) for _ in range(num_tasks) ]) def forward(self, task_losses): total_loss 0 for loss, weight in zip(task_losses, self.weights): total_loss loss * paddle.exp(-weight) weight return total_loss4.2 学习率策略由于不同任务收敛速度不同建议采用分层学习率共享参数使用较低学习率任务特定参数使用较高学习率动态调整根据任务损失变化自动调整学习率def create_optimizer(model, base_lr0.001, task_lr0.01): shared_params [] task_params [] for name, param in model.named_parameters(): if shared in name: shared_params.append(param) else: task_params.append(param) optimizer paddle.optimizer.Adam( learning_ratebase_lr, parametersshared_params, weight_decay0.001 ) task_optimizer paddle.optimizer.Adam( learning_ratetask_lr, parameterstask_params, weight_decay0.001 ) return optimizer, task_optimizer4.3 评估指标选择多任务模型的评估需要综合考虑各任务指标任务类型常用指标业务意义二分类AUC, LogLoss区分能力预测准确性回归MAE, RMSE预测值与真实值偏差排序NDCG, MAP推荐列表质量def evaluate(model, dataloader): model.eval() metrics { click_auc: paddle.metric.Auc(), watch_mae: paddle.metric.Mae(), share_auc: paddle.metric.Auc() } with paddle.no_grad(): for batch in dataloader: outputs model(batch[features]) metrics[click_auc].update( predsoutputs[0], labelsbatch[click_label]) metrics[watch_mae].update( predsoutputs[1], labelsbatch[watch_label]) metrics[share_auc].update( predsoutputs[2], labelsbatch[share_label]) return {name: metric.accumulate() for name, metric in metrics.items()}5. 部署优化与生产实践将PLE模型部署到生产环境时需要考虑以下优化点5.1 模型量化与加速# 训练后量化 quant_model paddle.quantization.quantize_dynamic( modelmodel, qconfigpaddle.quantization.QConfig( activationpaddle.quantization.MovingAverageAbsMaxScale(), weightpaddle.quantization.AbsMaxQuantizer() ) ) paddle.jit.save(quant_model, ple_quant)5.2 特征工程优化共享特征与任务特定特征分离实时特征与离线特征融合特征重要性分析def feature_importance(model, dataset): # 计算特征重要性 base_pred model.predict(dataset) importance [] for i in range(dataset.feature_dim): perturbed_data dataset.copy() perturbed_data[:, i] 0 # 扰动第i个特征 perturbed_pred model.predict(perturbed_data) importance.append(np.mean(np.abs(base_pred - perturbed_pred))) return importance5.3 A/B测试策略多任务模型的A/B测试需要特别设计指标权重分配根据业务目标确定各任务指标的权重分桶策略确保每个桶内用户分布一致长期观察跷跷板效应可能在长期才会显现提示PLE模型在腾讯视频的实践中相比MMoE模型在多个任务上同时取得了提升VTR提升2.57%VCR提升1.84%充分证明了其有效性。