Blossom Data – 面向大模型的数据处理框架
Blossom Data 是在 Blossom模型 的迭代过程中逐步设计出来的一套数据处理框架,目标是为大模型训练数据提供一个结构化、可组合、可迁移的工程化方案。
在大模型训练的过程中,数据往往比模型更“棘手”,一套完整的数据流水线,通常要经历:
- 原始数据清洗与规范化 -> 多轮改写、翻译、合成 -> 质量验证与过滤 -> 不同规模环境下重复执行
如果这些步骤依赖的是一批一次性脚本或 Notebook,随着时间推移,很容易演变成难以复用、难以维护的复杂系统。
项目主页:
使用 Blossom Data 合成的数据示例:
- NuminaMath-1.5-Pro:一个经过验证的高难度、高质量的混合推理数据集。
- BlossomLM 训练数据:性能强大的通用模型,训练数据已开源。
框架设计
BlossomData核心特性如下:
- 使用 Schema 描述数据结构,避免“全是 dict 和魔法字段”的混乱状态。
- 用 Operator + Dataset / DataFrame 组织数据处理流程,使每一步处理具备明确边界、便于组合与复用。
- 抽象出 Local / Ray / Spark 等不同执行后端,使同一套代码可以在不同执行引擎上运行。
- 针对大模型训练场景,提供一批常用的内置算子(翻译、合成、蒸馏、验证、后处理等)。
Schema
在很多临时脚本中,常见写法是:
row["messages"][0]["content"]
Code language: CSS (css)这种写法短期上手成本低,但当字段名调整、结构变复杂、或者多人协作时,维护成本会迅速上升。
BlossomData 使用一套 Schema 来对数据进行建模,例如:
ChatSchema:对话数据,核心字段为messages: list[ChatMessage]TextSchema:文本数据,字段为content: strRowSchema:结构化数据,字段为data: dictCustomSchema:自定义结构,字段为data: Any
所有 Schema 还具备一组统一的元信息字段,例如:
id:唯一标识type:Schema 类型failed/failure_reason:标记该条数据在某一步处理是否失败,以及失败原因metadata:附加元数据(语言、来源、打分等)
这样,数据结构是显式的,数据在流水线中的状态也可以挂在数据本身,而不是散落在日志与临时变量中。
多执行后端
BlossomData 通过统一的 DataFrame 抽象,将不同执行后端封装在一起,目前支持:本地、Ray、Spark,在上层,创建 Dataset 时只需要指定引擎类型:
dataset = create_dataset(
data,
engine=DatasetEngine.LOCAL, # 调试阶段通常使用本地
)
Code language: PHP (php)在确认逻辑正确后,如果需要扩展到分布式执行,只需将 engine 切换为 RAY 或 SPARK,并提供相应的运行环境和配置。流水线本身不需要改动。
这种方式可以支持常见的工作流:在本地使用小数据集开发、调试 -> 在小集群上验证性能与稳定性 -> 在生产集群上对全部数据运行同一套流水线。
数据流水线
BlossomData 的核心使用模型是 Dataset + Operator。
- Dataset:承载一组 Schema 数据,并绑定一个执行后端(本地 / Ray / Spark)。
- Operator:对数据进行一次独立的变换,如 map、filter、transform 等。
在实际使用时,通常会写出一个算子列表 ops,按顺序定义流水线:
ops = [
# 例如:翻译
...,
# 例如:验证 + 蒸馏
...,
# 例如:合并推理内容
...,
]
Code language: PHP (php)每个 Operator 只负责一小步逻辑,输入和输出都是 Schema。这样一来:
- 处理流程清晰可读。
- 每一步的副作用较小,便于调试与重用。
- 出现问题时,可以定位到具体算子,而不是整段脚本。
针对大模型训练数据,框架内置了一批面向对话、推理、质量控制的算子,例如翻译、验证与蒸馏、推理内容合并等。
统一管理外部资源
在面向大模型数据的处理过程中,算子经常需要访问外部服务,BlossomData 引入了 Context 与 Provider 的概念,用来统一管理这些外部资源:
- 使用 YAML 配置文件(如
config.yaml或~/.blossom.yaml)描述 Provider。 - 运行时自动加载配置,无需在代码中重复填写各种 URL 和密钥。
- Operator 通过 Context 获取 Provider 实例,调用统一接口,而不直接依赖具体 SDK。
这一设计让“换一个模型服务”“调整超参”“修改重试策略”这类操作尽量收敛到配置层,而不需要频繁修改业务代码。
自定义 Operator
就像前两小节提到的那样,如果内置的算子不能满足我们的业务需求,也可以通过灵活的 API 来扩充自定义算子:
# 使用装饰器定义自定义算子,第一个参数为元素
@filter_operator()
def filter_short_text(item):
return len(item.content) > 10
# 使用上下文调用模型的算子,名称以 context 开头,第一个参数为上下文,第二个参数为元素
# 传入 parallel 以指定并发数
@context_map_operator(parallel=4)
def translate_with_model(context, item):
result = context.chat_completion(
"gpt-4o-mini",
[user(f"Translate to Chinese: {item.content}")]
)
return TextSchema(content=result)
# 继承 Operator 实现自定义算子
# 对于 map 算子,需要实现 process_item 方法,通过 self.context 访问上下文
class SelfQA(MapOperator):
def process_item(self, item):
self_qa_prompt = (
"基于给定的文本,随意生成一个问题以及对应的长答案。\n"
"你的输出应该是一个 json,包含 question、answer 两个字符串字段,不需要输出任何其他的无关解释。\n"
f"给定的文本:{item.content}"
)
raw_result = self.context.chat_completion("gpt-4o-mini", [user(self_qa_prompt)])
result = loads_markdown_first_json(raw_result)
return ChatSchema(
messages=[
user(result["question"]),
assistant(result["answer"]),
]
)
Code language: CSS (css)从数学题合成推理数据
下面通过一个简化示例说明框架的基本用法。假设我们有一条数据,由“数学题 + 参考答案”构成,希望构建一条带完整推理过程的中文训练样本。
准备原始数据
data = [
ChatSchema(
messages=[
user(
"Suppose that wz = 12 - 8i and |w| = sqrt(13). "
"What is |z|?"
),
assistant("4"), # 参考答案
]
),
]
Code language: PHP (php)这里用 ChatSchema 来描述一条对话,其中 user(...) 是题目,assistant("4") 是给定答案。
定义处理流水线
假设我们想要做这样一条流水线:
- 将英文题目翻译为中文。
- 使用推理模型重新生成带推理过程的答案。
- 验证答案是否与参考答案一致。
- 将推理内容合并进最终内容,得到可直接用于训练的样本。
代码层面,可以通过一组 Operator 来表达这一流水线(算子名称以实际实现为准):
ops = [
ChatTranslator(
model="your-chat-model",
target_language="Chinese",
),
ChatVerifyDistiller(
model="your-reasoning-model",
mode=ChatVerifyDistiller.Mode.LLM,
validation_model="your-chat-model",
),
ChatReasoningContentMerger(),
]
Code language: JavaScript (javascript)创建 Dataset 并执行
from blossom.dataframe import create_dataset
dataset = create_dataset(data) # 默认使用本地引擎
result = dataset.execute(ops).collect()
print(result[0])
Code language: PHP (php)执行完成后,result[0] 就是一条已经过翻译、推理与验证,并合并了推理内容的 ChatSchema 数据,可以直接用于后续的训练或评测流水线。
DataFrame 与 Dataset
在 BlossomData 里可以粗略理解为:
- DataFrame:对底层数据的统一抽象,真正承载了 Local / Spark / Ray 等不同引擎实现。
- Dataset:在 DataFrame 之上做了一层封装,是“面向用户”的主要入口,提供更友好的链式 API,并集成了算子执行、读写等能力。
绝大多数场景下你直接跟 Dataset 打交道即可。
创建 Dataset 的方式大致有两类:
# 1)从内存数据创建(适合样例、小规模调试)
data = [ChatSchema(...), ...]
dataset = create_dataset(data, engine=DatasetEngine.LOCAL)
# 2)从文件系统加载(适合规模稍大场景)
dataset = load_dataset(
"example/data/chat.jsonl",
engine=DatasetEngine.RAY,
# 可选:指定 DataHandler 以适配自定义格式
# data_handler=DefaultDataHandler(),
)
Code language: PHP (php)基础链式操作
Dataset 的 API 风格和 Spark、RayData 类似,但保留了强类型 Schema 的好处:
dataset = (
dataset.repartition(2)
.filter(lambda x: x.metadata["language"] == "en")
.sort(lambda x: x.metadata["feedback"], ascending=False)
.add_metadata(
lambda x: {
"context_length": sum(len(m.content) for m in x.messages),
}
)
.limit(4)
.execute([...]) # 执行若干算子
.cache()
)
results = dataset.collect()
dataset.write_json("output.jsonl")
Code language: PHP (php)统计与聚合 API
除了“处理数据”,Dataset 也提供了一套轻量的分析 / 聚合接口:
- 常用方法:
count()、min(key_fn)/max(key_fn)、sum(key_fn)、unique(key_fn)等。 - 高级聚合:
aggregate(*agg_funcs)和group_by(key_fn, name=None)。
统计 + 分组 + 自定义聚合
下面这个例子展示了 Dataset 在数据分析场景下的各种用法:
example_data = [
RowSchema(
data={
"country": random.choice(["US", "CN", "JP", "KR", "TW", "HK"]),
"score": random.randint(1, 100),
}
)
for _ in range(1024)
]
dataset = create_dataset(example_data)
statistics = {
"count": dataset.count(),
"min": dataset.min(lambda x: x["score"]),
"max": dataset.max(lambda x: x["score"]),
"unique": dataset.unique(lambda x: x["country"]),
"agg": dataset.aggregate(
Count(),
Sum(lambda x: x["score"]),
Sum(lambda x: x["score"] * 2, name="score_x2"),
Mean(lambda x: x["score"]),
),
"bin_score_count": [
agg.data
for agg in dataset.execute([EqualWidthBinner(lambda x: x["score"])])
.group_by(lambda x: x.metadata["bin_label"], name="bin")
.count()
.sort(lambda x: x["bin"])
.collect()
],
"group_by_country_agg": [
agg.data
for agg in dataset.group_by(lambda x: x["country"])
.aggregate(
Count(),
Sum(lambda x: x["score"]),
Sum(lambda x: x["score"] * 2, name="score_x2"),
Mean(lambda x: x["score"]),
)
.collect()
],
"custom_aggregate_func": dataset.aggregate(
RowAggregateFunc(
initial_value={"cn_count": 0},
accumulate=lambda x, y: {
"cn_count": (
x["cn_count"] + 1 if y["country"] == "CN" else x["cn_count"]
),
},
merge=lambda x, y: {
"cn_count": x["cn_count"] + y["cn_count"],
},
finalize=lambda x: x["cn_count"],
),
),
}
print(statistics)
Code language: PHP (php)这个例子里可以看到几件事:
- Dataset 上可以直接做全局统计:
count、min、max、unique、aggregate。 - 按列(这里是
country)分组后再做聚合分析:group_by(...).aggregate(...).collect()。 - 使用
RowAggregateFunc写出完全自定义的聚合逻辑(上面统计的是country == "CN"的样本数)。(PyPI) - 通过
EqualWidthBinner这样的算子先做“等宽分箱”,分箱结果写在metadata["bin_label"],再配合group_by+count做简易分布统计。
换句话说,在 BlossomData 里,“处理 + 分析”可以共用一套 Dataset / DataFrame API,不需要在多个框架之间切来切去。
小结
BlossomData 的核心诉求并不是“做一个又一套大而全的框架”,而是尽量解决大模型训练数据处理中一些反复出现的工程问题:数据结构不清晰、流水线无法复用、环境迁移成本高、外部配置散落在各处等等。框架专注于大模型场景,解决了这些常见痛点,进而提升数据工程效率。
