从日志文件到数据集:用Python把JSONL批量转成JSON,喂给大模型做微调
从日志文件到数据集用Python实现JSONL到JSON的高效转换与LLM微调实战JSONLJSON Lines作为一种轻量级的日志存储格式在大规模机器学习数据预处理中扮演着重要角色。当我们需要将模型推理日志或标注结果转换为标准JSON数据集时往往会遇到编码混乱、批量处理效率低下等问题。本文将深入探讨如何用Python实现工业级JSONL到JSON的转换特别针对大语言模型微调场景提供完整解决方案。1. JSONL与JSON在AI工程中的核心差异JSONL每行一个JSON对象与标准JSON整体一个对象或数组在数据处理流程中各有优劣。JSONL特别适合流式处理和增量写入而标准JSON更适合作为模型的输入格式。关键对比特征特性JSONLJSON文件结构每行独立JSON对象整体单一结构内存效率可逐行处理内存友好需整体加载内存消耗大错误恢复单行错误不影响其他数据单个错误导致整个文件失效适用场景日志记录、流式数据配置、完整数据集在实际项目中我们经常需要处理类似KoRC_outputs.jsonl这样的模型输出日志。这类文件通常包含大量独立预测结果每个结果以JSON对象形式存储在一行中。2. 基础转换方法与编码陷阱最基本的转换思路是逐行读取JSONL文件并将所有对象合并为一个字典或数组。以下是一个健壮的转换函数实现import json from pathlib import Path def convert_jsonl_to_json( input_path: str, output_path: str, output_format: str object ) - None: 将JSONL文件转换为JSON格式 参数: input_path: 输入JSONL文件路径 output_path: 输出JSON文件路径 output_format: 输出格式object或array input_path Path(input_path) output_path Path(output_path) data [] with input_path.open(r, encodingutf-8) as f: for line in f: line line.strip() if not line: continue try: data.append(json.loads(line)) except json.JSONDecodeError as e: print(f解码错误跳过行: {line[:50]}... 错误: {e}) with output_path.open(w, encodingutf-8) as f: if output_format object: result {} for item in data: if isinstance(item, dict): result.update(item) json.dump(result, f, ensure_asciiFalse, indent2) else: json.dump(data, f, ensure_asciiFalse, indent2)常见编码问题解决方案统一编码处理始终明确指定utf-8编码避免不同系统默认编码差异非ASCII字符处理设置ensure_asciiFalse保留原始字符大文件内存优化对于超大文件考虑分块处理或使用流式转换3. 面向LLM微调的高级数据处理大语言模型微调通常需要特定格式的训练数据。假设我们需要将问答日志转换为标准的指令微调格式可以这样处理def transform_for_llm_finetuning( input_jsonl: str, output_json: str, template: dict None ) - None: 将原始JSONL转换为LLM微调所需的格式 参数: input_jsonl: 输入JSONL路径 output_json: 输出JSON路径 template: 可选的数据转换模板 if template is None: template { instruction: 根据问题生成回答, input: {question}, output: {answer} } transformed [] with open(input_jsonl, r, encodingutf-8) as f: for line in f: try: item json.loads(line) # 假设原始数据包含question和answer字段 new_item { instruction: template[instruction], input: template[input].format(questionitem[question]), output: template[output].format(answeritem[answer]) } transformed.append(new_item) except (json.JSONDecodeError, KeyError) as e: print(f转换失败: {line[:50]}... 错误: {e}) with open(output_json, w, encodingutf-8) as f: json.dump(transformed, f, ensure_asciiFalse, indent2)处理复杂答案的实用技巧当答案包含多个选项时如answer: A, B, C可以使用以下方法进行标准化def normalize_answers(answers_str): 将逗号分隔的答案转换为列表 return [a.strip() for a in answers_str.split(,) if a.strip()]4. 工业级数据处理管道实现在实际生产环境中我们需要考虑更多工程化因素import logging from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm class JsonlToJsonProcessor: def __init__(self, max_workers4): self.logger logging.getLogger(self.__class__.__name__) self.max_workers max_workers def process_directory(self, input_dir, output_dir): 批量处理目录下的所有JSONL文件 input_dir Path(input_dir) output_dir Path(output_dir) output_dir.mkdir(exist_okTrue) jsonl_files list(input_dir.glob(*.jsonl)) with ThreadPoolExecutor(max_workersself.max_workers) as executor: list(tqdm( executor.map(self.process_file, jsonl_files), totallen(jsonl_files), descProcessing files )) def process_file(self, input_path): 处理单个文件 try: output_path Path(output_dir) / f{input_path.stem}.json convert_jsonl_to_json(input_path, output_path) self.logger.info(f成功处理: {input_path}) except Exception as e: self.logger.error(f处理失败 {input_path}: {str(e)})性能优化技巧并行处理使用线程池加速多个文件的转换内存映射对于超大文件考虑使用mmap进行内存高效读取增量写入边读取边写入避免内存中保存全部数据压缩处理直接处理gzip压缩的JSONL文件5. 错误处理与数据质量保障健壮的数据处理管道需要完善的错误处理机制def safe_json_loads(line): 带错误处理的JSON解析 try: return json.loads(line) except json.JSONDecodeError: # 尝试修复常见的JSONL格式问题 line line.strip() if line.startswith() and line.endswith(): line f{line[1:-1]} try: return json.loads(line) except json.JSONDecodeError as e: raise ValueError(f无效的JSON格式: {line[:100]}) from e def validate_schema(item, schema): 验证数据是否符合预期模式 from jsonschema import validate try: validate(instanceitem, schemaschema) return True except Exception as e: return False数据验证策略模式验证使用JSON Schema定义数据格式规范类型检查确保字段类型符合预期值域验证检查数值范围或枚举值关系验证验证跨字段的逻辑一致性6. 实战构建端到端数据处理流水线将上述技术组合起来我们可以构建完整的LLM数据预处理流水线def build_llm_data_pipeline( input_path, output_path, schemaNone, transform_fnNone, batch_size1000 ): 端到端LLM数据预处理流水线 processed [] with open(input_path, r, encodingutf-8) as f: for i, line in enumerate(f): try: item safe_json_loads(line) if schema and not validate_schema(item, schema): continue if transform_fn: item transform_fn(item) processed.append(item) # 分批写入减少内存压力 if len(processed) batch_size: write_batch(output_path, processed, modea) processed [] except Exception as e: logging.warning(f跳过无效数据行 {i}: {str(e)}) # 写入剩余数据 if processed: write_batch(output_path, processed, modea) def write_batch(output_path, batch, modew): 批量写入数据 with open(output_path, mode, encodingutf-8) as f: json.dump(batch, f, ensure_asciiFalse) f.write(\n) # 添加换行便于后续追加流水线优化建议分阶段处理将清洗、转换、验证等步骤分离中间存储使用临时文件存储中间结果进度监控添加进度条和日志记录断点续传记录处理位置以便恢复在实际项目中处理类似KoRC_outputs.jsonl这样的文件时这套方法可以确保数据转换的高效和可靠。我曾在一个对话系统项目中应用这种技术成功将每天数GB的对话日志实时转换为训练数据显著提升了模型迭代速度。