1. 项目概述这不是一次简单的SageMaker演示而是一场MLOps实践者的压力测试“Data Acquisition Exploration: Exploring 5 Key MLOps Questions using AWS SageMaker”——这个标题里藏着的不是五个待回答的理论问题而是五道横亘在数据科学家和机器学习工程师日常工作中、几乎每天都要面对的现实关卡。我带过十几支从零搭建MLOps流水线的团队最常听到的抱怨不是“模型不准”而是“数据又断了”“特征版本对不上”“昨天还跑通的pipeline今天报错说S3路径不存在”“业务方说线上效果下滑但根本找不到是哪次数据变更导致的”。这五个问题就是这些抱怨背后最核心的骨架。它们分别是数据从哪儿来、数据是否可信、数据如何被追踪、数据怎样支撑迭代、数据如何与模型生命周期对齐。AWS SageMaker本身不直接回答这些问题但它提供了一套可编程、可审计、可自动化的基础设施层让这五个问题从“靠人盯、靠经验、靠Excel表格管理”的混沌状态变成可以用代码定义、用日志验证、用API调用的确定性流程。你不需要是AWS认证专家但必须理解SageMaker中FeatureStore、ProcessingJob、Experiment、Trial和Lineage这几个核心组件之间的咬合逻辑——它们不是孤立的功能按钮而是一套精密的齿轮组。比如一个ProcessingJob的输出会自动成为FeatureStore中某个FeatureGroup的输入源而这个FeatureGroup的每一次写入又会触发Lineage图谱中一条新的DataSet节点生成并关联到下游某个TrainingJob的TrialComponent。这种自动化的血缘关系才是解决“数据怎么来的”和“模型用的是哪批数据”这两个问题的技术底座。这篇文章就是我用真实项目复盘的方式把这套齿轮组拆开、上油、再装回去的过程。它不讲SageMaker控制台怎么点只讲在JupyterLab里敲下每一行Python SDK代码时背后发生了什么、为什么必须这么写、以及如果写错了系统会在哪个环节给你一个明确的错误提示而不是让你在CloudWatch日志里翻三小时。适合正在为数据混乱头疼的ML工程师也适合想把第一个模型真正推上线的数据科学家——因为上线之后90%的维护成本都藏在这五个问题的答案里。2. 核心问题拆解与SageMaker能力映射为什么这五个问题必须被结构化回答2.1 问题一数据从哪儿来——打破“数据黑箱”建立可追溯的数据供应链在传统工作流里“数据从哪儿来”往往是一句模糊的口头承诺“找数仓同事要一份ODS层的用户行为表”“爬虫组昨天更新了商品库”。这种依赖人工沟通的数据获取方式在MLOps语境下是灾难性的。它直接导致两个后果第一无法自动化第二无法回溯。当模型效果异常时你无法快速确认“上周五训练用的数据是不是和前天A/B测试用的数据来自同一个ETL任务的同一轮输出”。SageMaker对此的解法不是提供一个更漂亮的数据库连接界面而是强制将“数据来源”这个概念编码进每一个计算任务的元数据中。具体来说SageMaker通过ProcessingJob的AppSpecification和InputDataConfig两个参数把数据来源变成了一个可声明、可验证的对象。InputDataConfig里定义的S3Input其S3Uri字段不仅是一个路径更是一个指向S3对象版本Version ID的精确指针。这意味着当你在代码里写下from sagemaker.sklearn.processing import SKLearnProcessor processor SKLearnProcessor( framework_version0.23-1, rolerole, instance_typeml.m5.xlarge, instance_count1 ) processor.run( codepreprocess.py, inputs[ ProcessingInput( sources3://my-bucket/raw-data/2024-06-01/, destination/opt/ml/processing/input/raw, s3_data_typeS3Prefix, s3_input_modeFile ) ], outputs[ ProcessingOutput( output_nametrain, source/opt/ml/processing/output/train/, destinations3://my-bucket/processed-data/train-20240601/ ) ] )这段代码执行后SageMaker不仅会启动一个EC2实例运行你的preprocess.py还会在后台自动生成一条ProcessingJob的元数据记录其中inputs[0].source字段被精确地记录为s3://my-bucket/raw-data/2024-06-01/并且这个值会被持久化存储在SageMaker的元数据服务中而非仅仅存在于你的本地Jupyter Notebook里。更重要的是如果你的S3桶启用了版本控制这是强烈建议的SageMaker会自动捕获该路径下所有对象的版本ID并将其作为Lineage图谱中DataSet节点的version属性。这就意味着三个月后你想复现当时的训练过程你不需要去翻邮件问同事“那天的数据放哪儿了”你只需要查询这个ProcessingJob的元数据就能拿到一个精确到字节的S3对象版本列表。我见过太多团队因为没启用S3版本控制导致在一次误删操作后只能眼睁睁看着价值百万的标注数据永远丢失。所以回答“数据从哪儿来”这个问题的第一步不是写SQL而是检查你的S3桶配置——aws s3api get-bucket-versioning --bucket my-bucket确保返回的是{Status: Enabled}。这是整个数据供应链可追溯性的物理基石没有它后面所有的自动化都是空中楼阁。2.2 问题二数据是否可信——用特征工程的“出厂检验”替代人工抽查“数据是否可信”是数据科学家最常被业务方灵魂拷问的问题。一句“我们数据没问题”毫无说服力而一份长达50页的《数据质量报告》又没人愿意看。SageMaker的解法是把数据质量验证变成特征工程流水线中一个不可跳过的、有明确成功/失败信号的标准化步骤。这个步骤的核心是FeatureStore的IngestionJob和OfflineStore的DataQuality监控能力。FeatureStore不是一个简单的键值存储。当你创建一个FeatureGroup时你必须明确定义每个FeatureDefinition的FeatureTypeIntegral、Fractional或String和ValueType例如Integer、Float、String。这个定义本身就是一份轻量级的数据契约Data Contract。当IngestionJob开始向FeatureGroup写入数据时SageMaker会实时执行类型校验。如果某条记录的user_age字段传入了一个字符串N/A而你在FeatureDefinition里定义的是Integral那么这条记录会被直接拒绝并在IngestionJob的日志中留下清晰的错误信息ValueError: Expected integral type for feature user_age, got N/A。这比任何事后的数据质量扫描都更早、更精准地拦截了问题。但这还不够。类型校验只能保证“格式正确”不能保证“业务合理”。比如user_age字段可以是整数但值为200显然不合理。这时就需要OfflineStore的DataQuality监控。SageMaker允许你为FeatureGroup的OfflineStore配置一个DataQualityMonitoringSchedule它会定期例如每天凌晨2点扫描OfflineStore中的Parquet文件并计算一系列统计指标空值率、唯一值数量、数值型字段的均值/标准差/分位数、字符串型字段的长度分布等。关键在于这些指标的计算结果会被写入一个你指定的S3位置并且SageMaker会自动生成一个MonitoringExecution资源其状态Completed、Failed、Aborted就是数据质量的最终判决书。你可以把这个状态直接集成到你的CI/CD流水线中。例如在Jenkins或CodePipeline里添加一个DescribeMonitoringExecution的API调用步骤如果返回的状态是Failed就立即中断后续的模型训练任务并触发告警。我曾经在一个电商推荐项目里把item_price字段的p95值设为监控阈值。当某次上游ERP系统故障导致一批商品价格被错误地同步为0.01元时DataQuality监控在30分钟内就检测到p95值骤降并自动阻断了当天的特征更新和模型重训避免了一场可能影响数百万用户的资损事故。所以“数据是否可信”的答案不应该是一份静态报告而应该是一个动态的、能驱动行动的布尔值True表示可以继续False表示必须停止并调查。SageMaker提供的正是这个布尔值的生成引擎。2.3 问题三数据怎样被追踪——构建端到端的血缘图谱让每一次变更都有迹可循当模型在生产环境出现偏差Bias或漂移Drift时最耗时的环节从来不是修复模型而是定位问题根源。是上游数据源的Schema变了是特征工程脚本里一个隐藏的fillna(0)逻辑被悄悄修改了还是TrainingJob用的超参数配置文件其实引用了一个早已过期的FeatureGroup版本在缺乏血缘追踪的系统里这个问题的排查往往需要召集数据工程师、算法工程师、运维工程师开一场长达数小时的“破案会议”。SageMaker的Lineage血缘功能就是为终结这种低效协作而生的。Lineage的本质是一个由Artifact制品、Action动作和Context上下文三类节点构成的有向无环图DAG。Artifact代表一切有状态的实体一个S3路径、一个FeatureGroup、一个ModelPackage、甚至是一段Python代码的Git Commit Hash。Action代表一切改变状态的操作一个ProcessingJob、一个TrainingJob、一个TransformJob。Context则代表环境和意图一个Experiment实验、一个Trial试验、一个Project项目。SageMaker SDK在你调用estimator.fit()或processor.run()时会自动为你创建对应的Action节点并尝试将输入的Artifact如S3路径和输出的Artifact如训练好的模型S3路径与之关联。但这个“自动”是有前提的你必须使用SageMaker原生的SDK对象如SKLearnProcessor,Estimator而不是自己用boto3手动调用create_training_job。后者虽然也能完成任务但会完全绕过Lineage系统让你的血缘图谱变成一片空白。真正的威力在于Lineage的可编程性。你可以用boto3的sagemaker.lineage客户端编写脚本来查询任意一个Artifact的完整上游依赖链。例如要找出影响某个生产模型的所有数据源你可以这样写from sagemaker.lineage import Artifact, Action, Context # 首先根据模型的S3 URI找到它的Artifact model_artifact Artifact.load( artifact_arnarn:aws:sagemaker:us-east-1:123456789012:artifact/abc123-model-20240601 ) # 然后向上遍历所有关联的Action upstream_actions model_artifact.parents() for action in upstream_actions: # 找出这个Action的输入Artifact input_artifacts action.inputs() for input_art in input_artifacts: print(fInput Artifact: {input_art.artifact_name} ({input_art.source.uri})) # 如果这个输入Artifact本身也是一个FeatureGroup继续向上查 if feature-group in input_art.source.uri: fg_artifact Artifact.load(artifact_arninput_art.artifact_arn) fg_upstream fg_artifact.parents() for fg_action in fg_upstream: print(f - FeatureGroup upstream: {fg_action.action_name})这段代码的输出会是一张清晰的树状图从最顶层的原始S3数据桶一直贯穿到最终的模型文件。它不会告诉你“可能”是哪里出了问题而是用确凿的ARNAmazon Resource Name告诉你“一定”是哪些资源参与了这次构建。我在一个金融风控模型的迭代中就用这个方法在5分钟内定位到问题一个新加入的ProcessingJob其输入S3路径指向了一个尚未经过DataQuality监控的临时数据集导致模型学习到了大量异常的credit_score值。没有Lineage这个排查至少需要两天。所以“数据怎样被追踪”的答案不是一张静态的架构图而是一个随时可以执行的、返回精确ARN列表的Python函数。这才是现代MLOps团队应有的响应速度。2.4 问题四数据如何支撑迭代——用FeatureStore实现特征的“即插即用”与“版本共存”模型迭代的瓶颈常常不在算法本身而在特征。一个新想法比如“用户过去7天的平均点击率”需要经历数据工程师开发ETL脚本、测试、上线数据科学家等待数据就绪、下载、清洗、建模最后还要确保线上服务能实时计算这个新特征。整个周期动辄数周。FeatureStore的设计哲学就是把特征从“一次性计算产物”变成“可复用、可版本化、可在线/离线统一访问的服务”。FeatureStore的核心是FeatureGroup。一个FeatureGroup就像一个数据库表但它有三个独特之处第一它同时拥有OnlineStore和OfflineStore。OnlineStore是一个毫秒级响应的NoSQL存储基于DynamoDB用于实时推理OfflineStore是一个低成本、高吞吐的S3数据湖用于批量训练。第二FeatureGroup支持RecordIdentifierFeatureName和EventTimeFeatureName这使得它可以天然地处理时间序列数据和事件驱动的更新。第三也是最关键的FeatureGroup支持FeatureGroupName的版本化。你不能直接修改一个已存在的FeatureGroup的Schema但你可以创建一个名为user_features_v2的新FeatureGroup它与user_features_v1共享大部分特征但新增了7d_avg_click_rate。这两个FeatureGroup可以并存互不影响。这种设计带来的迭代效率提升是颠覆性的。数据科学家在JupyterLab里只需几行代码就能为自己的实验加载不同版本的特征from sagemaker.feature_store.feature_group import FeatureGroup # 加载v1版本用于基线模型 fg_v1 FeatureGroup(nameuser_features_v1, sagemaker_sessionsess) training_data_v1 fg_v1.athena_query().to_dataframe() # 加载v2版本用于新模型实验 fg_v2 FeatureGroup(nameuser_features_v2, sagemaker_sessionsess) training_data_v2 fg_v2.athena_query().to_dataframe() # 直接比较两个版本在相同评估集上的AUC baseline_auc evaluate_model(training_data_v1, test_data) new_auc evaluate_model(training_data_v2, test_data) print(fv1 AUC: {baseline_auc:.4f}, v2 AUC: {new_auc:.4f})而线上服务也只需修改一行配置就能切换到新特征# 在推理端点的预处理代码中 def model_fn(model_dir): # ... 加载模型 # 切换FeatureGroup名称即可 feature_group_name user_features_v2 # 从 user_features_v1 改为 user_features_v2 return model def input_fn(request_body, request_content_type): # ... 解析请求 # 使用v2的FeatureGroup进行实时特征查找 record feature_store.get_record( feature_group_nameuser_features_v2, record_identifier_value_as_stringuser_id ) return process_features(record)整个过程无需等待ETL上线无需修改任何数据管道甚至不需要重启线上服务得益于Lambda或容器的热更新能力。这就是“数据如何支撑迭代”的终极答案不是更快地造轮子而是让轮子本身具备无限组合和无缝切换的能力。FeatureStore就是那个承载所有轮子的标准化底盘。2.5 问题五数据如何与模型生命周期对齐——用Experiment Trial实现“谁在什么时候用什么数据训练了什么模型”的原子化记录最后一个也是最根本的问题“数据如何与模型生命周期对齐”它直指MLOps的核心矛盾数据是持续流动的模型是离散发布的。一次TrainingJob的启动本质上是一次对特定时刻、特定版本数据的快照Snapshot。如果这个快照没有被精确地、不可篡改地记录下来那么“模型”就只是一个孤立的二进制文件失去了其全部业务意义。SageMaker的Experiment和Trial机制就是为了解决这个快照记录问题而设计的。Experiment是一个逻辑容器代表一个宏观的研究目标比如“Q2用户留存率提升项目”。Trial则是Experiment下的一个具体实验实例代表一次独立的、可重复的模型训练尝试。而TrialComponent是Trial中最细粒度的单元它精确地记录了一次TrainingJob或ProcessingJob的全部输入、输出、参数和指标。当你调用estimator.fit()时SageMaker SDK会自动为你创建一个TrialComponent并将其与当前的Trial和Experiment关联。这个TrialComponent的元数据包含了input_artifacts: 一个字典键是输入名称如training-dataset值是S3 URIoutput_artifacts: 一个字典键是输出名称如model-artifact值是S3 URIparameters: 一个字典包含了所有传递给训练脚本的超参数--learning-rate,--batch-size等metrics: 一个字典包含了训练过程中自动捕获的指标validation:auc,train:loss等。这个结构完美地封装了“谁在什么时候用什么数据训练了什么模型”这四个要素。who是Trial的CreatedBy字段通常是IAM Role ARNwhen是TrialComponent的CreationTimewhat data是input_artifactswhat model是output_artifacts[model-artifact]。更重要的是TrialComponent是不可变的。一旦创建其内容无法被修改。如果你想调整超参数重新训练SDK会创建一个新的TrialComponent而不是覆盖旧的。这保证了历史记录的绝对可信。我曾在一个医疗影像诊断项目中严格遵循这个模式。每次模型迭代我们都创建一个新的Trial并为其命名如lung-cancer-detection-v3.2.1-20240601。在Trial内部我们会创建多个TrialComponent分别对应数据预处理、模型训练、模型评估。当监管机构要求提供“模型v3.2.1的完整训练证据链”时我们只需提供这个Trial的ARN他们就可以在SageMaker Studio里点开Trial看到所有TrialComponent的详细日志、输入输出S3路径、超参数和评估指标。整个过程不需要我们整理任何额外文档所有证据都内生于SageMaker的元数据系统。所以“数据如何与模型生命周期对齐”的答案就是放弃用Excel表格管理模型版本转而用Trial这个原生的、受AWS IAM策略保护的、可审计的云原生对象来承载每一次模型诞生的全部上下文。这是MLOps从“手工作坊”走向“现代工厂”的标志性一步。3. 实操全流程从零构建一个可验证的MLOps数据探索流水线3.1 环境准备与权限配置安全不是事后补救而是架构起点在SageMaker上构建任何MLOps流水线第一步永远不是写代码而是配置权限。我见过太多团队因为一开始用了过于宽泛的AdministratorAccess策略导致后期在生产环境加固时不得不推倒重来浪费数周时间。正确的做法是从最小权限原则Principle of Least Privilege出发为流水线的每一个组件精确地授予它所需的、仅够用的权限。首先你需要一个专门的IAM Role我们称之为SageMakerMLOpsRole。这个Role的信任策略Trust Policy必须允许sagemaker.amazonaws.com代入。然后附加以下三条自定义策略Policy每一条都对应一个核心能力策略一S3数据访问策略sagemaker-s3-access-policy{ Version: 2012-10-17, Statement: [ { Effect: Allow, Action: [ s3:GetObject, s3:ListBucket, s3:PutObject ], Resource: [ arn:aws:s3:::my-mlops-bucket, arn:aws:s3:::my-mlops-bucket/* ] } ] }注意这里ListBucket的资源是桶名本身无/*而GetObject和PutObject的资源是桶名加/*。这是S3权限的常见陷阱ListBucket操作的资源必须是桶否则会静默失败。策略二FeatureStore访问策略sagemaker-featurestore-policy{ Version: 2012-10-17, Statement: [ { Effect: Allow, Action: [ sagemaker:CreateFeatureGroup, sagemaker:DeleteFeatureGroup, sagemaker:DescribeFeatureGroup, sagemaker:PutRecord, sagemaker:GetRecord, sagemaker:StartIngestionJob, sagemaker:DescribeIngestionJob ], Resource: arn:aws:sagemaker:*:*:feature-group/my-feature-group-* } ] }这里的关键是Resource的ARN模式。my-feature-group-*允许你创建以my-feature-group-开头的所有FeatureGroup这为版本化命名my-feature-group-v1,my-feature-group-v2提供了灵活性同时又避免了*带来的过度授权。策略三Lineage与Experiment管理策略sagemaker-lineage-policy{ Version: 2012-10-17, Statement: [ { Effect: Allow, Action: [ sagemaker:CreateExperiment, sagemaker:CreateTrial, sagemaker:CreateTrialComponent, sagemaker:AssociateTrialComponent, sagemaker:DescribeTrialComponent, sagemaker:ListArtifacts, sagemaker:ListActions, sagemaker:ListContexts ], Resource: * } ] }这个策略的Resource是*因为Lineage相关的API大多不支持资源级权限Resource-level permissions。但这是安全的因为这些API本身不涉及数据读写只涉及元数据的创建和查询。配置完Role后在SageMaker Studio中你需要为你的User Profile用户配置文件显式地关联这个SageMakerMLOpsRole。这一步至关重要因为Studio的Notebook Kernel默认使用的是User Profile的Role而不是你在代码里assume_role的Role。很多初学者的Lineage图谱为空原因就是Kernel没有权限写入Lineage元数据。你可以通过在Notebook里运行!aws sts get-caller-identity来验证当前Kernel使用的Role ARN是否正确。提示在生产环境中我强烈建议为SageMakerMLOpsRole启用AWS CloudTrail日志记录并设置一个CloudWatch告警当该Role被用于CreateTrainingJob或CreateProcessingJob时发送通知。这能让你第一时间感知到任何非预期的模型训练活动是安全审计的第一道防线。3.2 数据获取与初步探索用ProcessingJob构建可复现的数据快照数据获取阶段的目标不是“把数据弄进来”而是“把数据的上下文和契约一起弄进来”。我们以一个经典的电商用户行为数据集为例原始数据存放在S3路径s3://my-raw-bucket/user-behavior/2024-06-01/下格式为JSON LinesJSONL。我们的ProcessingJob不仅要解析JSON还要做三件事第一为每条记录打上一个精确的event_time事件时间戳第二对关键字段如user_id,item_id进行非空校验第三将处理后的数据以Parquet格式写入一个带有日期后缀的S3路径作为本次数据快照的唯一标识。以下是完整的preprocess.py脚本它将被SKLearnProcessor调用import sys import json import pandas as pd from datetime import datetime import logging logger logging.getLogger(__name__) logging.basicConfig(levellogging.INFO) def main(): # 1. 从命令行参数获取输入/输出路径 input_path sys.argv[1] # /opt/ml/processing/input/raw/ output_path sys.argv[2] # /opt/ml/processing/output/train/ logger.info(fReading raw data from {input_path}) # 2. 读取所有JSONL文件 all_records [] for file_path in Path(input_path).rglob(*.jsonl): with open(file_path, r) as f: for line_num, line in enumerate(f, 1): try: record json.loads(line.strip()) # 3. 强制添加event_time使用当前处理时间确保一致性 record[event_time] datetime.utcnow().isoformat() Z all_records.append(record) except json.JSONDecodeError as e: logger.warning(fInvalid JSON on line {line_num} of {file_path}: {e}) if not all_records: raise ValueError(No valid records found in input data) # 4. 转为DataFrame并进行基础清洗 df pd.DataFrame(all_records) logger.info(fLoaded {len(df)} records) # 5. 关键字段非空校验 required_fields [user_id, item_id, event_type] missing_fields df[required_fields].isnull().any(axis1) if missing_fields.any(): logger.warning(fFound {missing_fields.sum()} records with missing required fields) df df[~missing_fields].copy() # 6. 类型转换与标准化 df[user_id] df[user_id].astype(str) df[item_id] df[item_id].astype(str) df[timestamp] pd.to_datetime(df[timestamp], units, errorscoerce) df df.dropna(subset[timestamp]) # 7. 写入Parquet分区存储 # 按照event_type分区便于后续按行为类型高效查询 df.to_parquet( f{output_path}/data/, partition_cols[event_type], indexFalse, compressionsnappy ) logger.info(fProcessed data written to {output_path}) if __name__ __main__: main()现在我们在Notebook中启动这个ProcessingJobfrom sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from pathlib import Path # 定义处理器 processor SKLearnProcessor( framework_version0.23-1, rolearn:aws:iam::123456789012:role/SageMakerMLOpsRole, instance_typeml.m5.xlarge, instance_count1, volume_size_in_gb30, max_runtime_in_seconds3600 ) # 启动处理任务 processor.run( codepreprocess.py, # 输入指向原始数据的S3路径包含日期形成快照 inputs[ ProcessingInput( sources3://my-raw-bucket/user-behavior/2024-06-01/, destination/opt/ml/processing/input/raw, s3_data_typeS3Prefix, s3_input_modeFile ) ], # 输出写入一个带有相同日期后缀的路径确保可追溯 outputs[ ProcessingOutput( output_nameprocessed-data, source/opt/ml/processing/output/train/, destinations3://my-mlops-bucket/processed-data/2024-06-01/ ) ], # 传递参数给preprocess.py arguments[/opt/ml/processing/input/raw/, /opt/ml/processing/output/train/] ) # 获取ProcessingJob的ARN用于后续Lineage关联 processing_job processor.jobs[-1] print(fProcessingJob ARN: {processing_job.describe()[ProcessingJobArn]})这个ProcessingJob的成功执行标志着我们完成了第一个可验证的数据快照。它的输出路径s3://my-mlops-bucket/processed-data/2024-06-01/就是一个精确的、不可变的数据版本。任何后续的分析、训练都必须基于这个路径而不是一个模糊的latest别名。这是MLOps可靠性的第一块基石。3.3 特征工程与FeatureStore注入从数据快照到可复用的特征服务有了ProcessingJob生成的Parquet数据下一步是将其注入FeatureStore使其从“静态数据”变为“活的特征”。这个过程分为两步首先定义FeatureGroup的Schema其次启动IngestionJob进行数据灌入。Step 1: 定义FeatureGroup Schema我们为用户行为数据定义一个名为user_behavior_features_v1的FeatureGroup。其Schema如下from sagemaker.feature_store.feature_definition import ( FeatureDefinition, FeatureTypeEnum ) feature_definitions [ FeatureDefinition(feature_nameuser_id, feature_typeFeatureTypeEnum.STRING), FeatureDefinition(feature_nameitem_id, feature_typeFeatureTypeEnum.STRING), FeatureDefinition(feature_nameevent_type, feature_typeFeatureTypeEnum.STRING), FeatureDefinition(feature_nametimestamp, feature_typeFeatureTypeEnum.FRACTIONAL), FeatureDefinition(feature_nameevent_time, feature_typeFeatureTypeEnum.STRING), # 作为EventTimeFeatureName FeatureDefinition(feature_namesession_id, feature_typeFeatureTypeEnum.STRING), FeatureDefinition(feature_nameduration_sec, feature_typeFeatureTypeEnum.INTEGRAL), ] # 创建FeatureGroup feature_group FeatureGroup( nameuser_behavior_features_v1, feature_definitionsfeature_definitions, record_identifier_nameuser_id, event_time_feature_nameevent_time, role_arnarn:aws:iam::123456789012:role/SageMakerMLOpsRole, sagemaker_sessionsess ) # 创建FeatureGroup指定OnlineStore和OfflineStore feature_group.create( s3_uris3://my-mlops-bucket/feature-store-offline/, record_identifier_nameuser_id, event_time_feature_nameevent_time, enable_online_storeTrue, online_store_security_config{ KmsKeyId: arn:aws:kms:us-east-1:123456789012:key/abc123-def456 } )这里的关键点是record_identifier_name和event_time_feature_name。user_id作为主键意味着OnlineStore会以user_id为DynamoDB的Partition Keyevent_time作为事件时间SageMaker会自动为每条记录生成一个event_time索引用于高效的时序查询。Step 2: 启动IngestionJobIngestionJob不是一次性任务而是一个长期运行的、从S3批量读取数据并写入FeatureStore的作业。我们需要告诉它从哪里读以及如何映射字段。# 从ProcessingJob的输出路径读取Parquet数据 ingestion_job feature_group.ingest( data_source_uris3://my-mlops-bucket/processed-data/2024-06-01/, # 字段映射S3 Parquet中的列名 - FeatureGroup中的feature_name # 这里假设Parquet文件的列名与FeatureGroup的feature_name完全一致 # 如果不一致需要在这里做显式映射 # feature_name_to_column_map{user_id: user_id, item_id: item_id, ...} waitTrue, # 等待作业完成再返回 timeout3600 ) print(fIngestionJob completed. Status: {ingestion_job.describe()[IngestionJobStatus]})IngestionJob完成后FeatureStore的OfflineStore中就会有这批数据的副本而OnlineStore也会被填充。此时你可以立即进行两种查询离线查询用于训练# 使用Athena查询OfflineStore query feature_group.athena_query() query.run( sqlfSELECT * FROM \{query.table_name}\ WHERE user_id u123 LIMIT 10, output_locations3://my-mlops-bucket/athena-results/ ) df query.as_dataframe()在线查询用于实时推理# 查询单个用户的最新特征 record feature_store.get_record( feature_group_nameuser_behavior_features_v1, record_identifier_value_as_stringu123 ) print(record)这个过程将数据获取、清洗、特征化、存储、服务化全部串联成一个闭环。FeatureGroup的v1版本就是我们为本次数据探索所定义的第一个、可被所有下游任务引用的“特征契约”。3.4 构建Experiment与Trial为每一次模型训练打上精确的时间戳现在我们拥有了一个干净的、可追溯的FeatureGroup。接下来我们要用它来训练一个简单的XGBoost模型并将整个过程精确地记录在Experiment和Trial中。首先创建Experiment和Trial