Blossom Data – 面向大模型的数据处理框架

Blossom Data 是在 Blossom模型 的迭代过程中逐步设计出来的一套数据处理框架,目标是为大模型训练数据提供一个结构化、可组合、可迁移的工程化方案。

在大模型训练的过程中,数据往往比模型更“棘手”,一套完整的数据流水线,通常要经历:

  • 原始数据清洗与规范化 -> 多轮改写、翻译、合成 -> 质量验证与过滤 -> 不同规模环境下重复执行

如果这些步骤依赖的是一批一次性脚本或 Notebook,随着时间推移,很容易演变成难以复用、难以维护的复杂系统。

项目主页

使用 Blossom Data 合成的数据示例

框架设计

BlossomData核心特性如下

  • 使用 Schema 描述数据结构,避免“全是 dict 和魔法字段”的混乱状态。
  • Operator + Dataset / DataFrame 组织数据处理流程,使每一步处理具备明确边界、便于组合与复用。
  • 抽象出 Local / Ray / Spark 等不同执行后端,使同一套代码可以在不同执行引擎上运行。
  • 针对大模型训练场景,提供一批常用的内置算子(翻译、合成、蒸馏、验证、后处理等)。

Schema

在很多临时脚本中,常见写法是:

row["messages"][0]["content"]
Code language: CSS (css)

这种写法短期上手成本低,但当字段名调整、结构变复杂、或者多人协作时,维护成本会迅速上升。

BlossomData 使用一套 Schema 来对数据进行建模,例如:

  • ChatSchema:对话数据,核心字段为 messages: list[ChatMessage]
  • TextSchema:文本数据,字段为 content: str
  • RowSchema:结构化数据,字段为 data: dict
  • CustomSchema:自定义结构,字段为 data: Any

所有 Schema 还具备一组统一的元信息字段,例如:

  • id:唯一标识
  • type:Schema 类型
  • failed / failure_reason:标记该条数据在某一步处理是否失败,以及失败原因
  • metadata:附加元数据(语言、来源、打分等)

这样,数据结构是显式的,数据在流水线中的状态也可以挂在数据本身,而不是散落在日志与临时变量中。

多执行后端

BlossomData 通过统一的 DataFrame 抽象,将不同执行后端封装在一起,目前支持:本地、Ray、Spark,在上层,创建 Dataset 时只需要指定引擎类型:

dataset = create_dataset(
    data,
    engine=DatasetEngine.LOCAL,  # 调试阶段通常使用本地
)
Code language: PHP (php)

在确认逻辑正确后,如果需要扩展到分布式执行,只需将 engine 切换为 RAYSPARK,并提供相应的运行环境和配置。流水线本身不需要改动。

这种方式可以支持常见的工作流:在本地使用小数据集开发、调试 -> 在小集群上验证性能与稳定性 -> 在生产集群上对全部数据运行同一套流水线。

数据流水线

BlossomData 的核心使用模型是 Dataset + Operator

  • Dataset:承载一组 Schema 数据,并绑定一个执行后端(本地 / Ray / Spark)。
  • Operator:对数据进行一次独立的变换,如 map、filter、transform 等。

在实际使用时,通常会写出一个算子列表 ops,按顺序定义流水线:

ops = [
    # 例如:翻译
    ...,
    # 例如:验证 + 蒸馏
    ...,
    # 例如:合并推理内容
    ...,
]
Code language: PHP (php)

每个 Operator 只负责一小步逻辑,输入和输出都是 Schema。这样一来:

  • 处理流程清晰可读。
  • 每一步的副作用较小,便于调试与重用。
  • 出现问题时,可以定位到具体算子,而不是整段脚本。

针对大模型训练数据,框架内置了一批面向对话、推理、质量控制的算子,例如翻译、验证与蒸馏、推理内容合并等。

统一管理外部资源

在面向大模型数据的处理过程中,算子经常需要访问外部服务,BlossomData 引入了 Context 与 Provider 的概念,用来统一管理这些外部资源:

  • 使用 YAML 配置文件(如 config.yaml~/.blossom.yaml)描述 Provider。
  • 运行时自动加载配置,无需在代码中重复填写各种 URL 和密钥。
  • Operator 通过 Context 获取 Provider 实例,调用统一接口,而不直接依赖具体 SDK。

这一设计让“换一个模型服务”“调整超参”“修改重试策略”这类操作尽量收敛到配置层,而不需要频繁修改业务代码。

自定义 Operator

就像前两小节提到的那样,如果内置的算子不能满足我们的业务需求,也可以通过灵活的 API 来扩充自定义算子:

# 使用装饰器定义自定义算子,第一个参数为元素
@filter_operator()
def filter_short_text(item):
    return len(item.content) > 10
# 使用上下文调用模型的算子,名称以 context 开头,第一个参数为上下文,第二个参数为元素
# 传入 parallel 以指定并发数
@context_map_operator(parallel=4)
def translate_with_model(context, item):
    result = context.chat_completion(
        "gpt-4o-mini",
        [user(f"Translate to Chinese: {item.content}")]
    )
    return TextSchema(content=result)
# 继承 Operator 实现自定义算子
# 对于 map 算子,需要实现 process_item 方法,通过 self.context 访问上下文
class SelfQA(MapOperator):
    def process_item(self, item):
        self_qa_prompt = (
            "基于给定的文本,随意生成一个问题以及对应的长答案。\n"
            "你的输出应该是一个 json,包含 question、answer 两个字符串字段,不需要输出任何其他的无关解释。\n"
            f"给定的文本:{item.content}"
        )
        raw_result = self.context.chat_completion("gpt-4o-mini", [user(self_qa_prompt)])
        result = loads_markdown_first_json(raw_result)
        return ChatSchema(
            messages=[
                user(result["question"]),
                assistant(result["answer"]),
            ]
        )
Code language: CSS (css)

从数学题合成推理数据

下面通过一个简化示例说明框架的基本用法。假设我们有一条数据,由“数学题 + 参考答案”构成,希望构建一条带完整推理过程的中文训练样本

准备原始数据

data = [
    ChatSchema(
        messages=[
            user(
                "Suppose that wz = 12 - 8i and |w| = sqrt(13). "
                "What is |z|?"
            ),
            assistant("4"),  # 参考答案
        ]
    ),
]
Code language: PHP (php)

这里用 ChatSchema 来描述一条对话,其中 user(...) 是题目,assistant("4") 是给定答案。

定义处理流水线

假设我们想要做这样一条流水线:

  1. 将英文题目翻译为中文。
  2. 使用推理模型重新生成带推理过程的答案。
  3. 验证答案是否与参考答案一致。
  4. 将推理内容合并进最终内容,得到可直接用于训练的样本。

代码层面,可以通过一组 Operator 来表达这一流水线(算子名称以实际实现为准):

ops = [
    ChatTranslator(
        model="your-chat-model",
        target_language="Chinese",
    ),
    ChatVerifyDistiller(
        model="your-reasoning-model",
        mode=ChatVerifyDistiller.Mode.LLM,
        validation_model="your-chat-model",
    ),
    ChatReasoningContentMerger(),
]
Code language: JavaScript (javascript)

创建 Dataset 并执行

from blossom.dataframe import create_dataset
dataset = create_dataset(data)  # 默认使用本地引擎
result = dataset.execute(ops).collect()
print(result[0])
Code language: PHP (php)

执行完成后,result[0] 就是一条已经过翻译、推理与验证,并合并了推理内容的 ChatSchema 数据,可以直接用于后续的训练或评测流水线。

DataFrame 与 Dataset

在 BlossomData 里可以粗略理解为:

  • DataFrame:对底层数据的统一抽象,真正承载了 Local / Spark / Ray 等不同引擎实现。
  • Dataset:在 DataFrame 之上做了一层封装,是“面向用户”的主要入口,提供更友好的链式 API,并集成了算子执行、读写等能力。

绝大多数场景下你直接跟 Dataset 打交道即可。

创建 Dataset 的方式大致有两类:

# 1)从内存数据创建(适合样例、小规模调试)
data = [ChatSchema(...), ...]
dataset = create_dataset(data, engine=DatasetEngine.LOCAL)
# 2)从文件系统加载(适合规模稍大场景)
dataset = load_dataset(
    "example/data/chat.jsonl",
    engine=DatasetEngine.RAY,
    # 可选:指定 DataHandler 以适配自定义格式
    # data_handler=DefaultDataHandler(),
)
Code language: PHP (php)

基础链式操作

Dataset 的 API 风格和 Spark、RayData 类似,但保留了强类型 Schema 的好处:

dataset = (
    dataset.repartition(2)
    .filter(lambda x: x.metadata["language"] == "en")
    .sort(lambda x: x.metadata["feedback"], ascending=False)
    .add_metadata(
        lambda x: {
            "context_length": sum(len(m.content) for m in x.messages),
        }
    )
    .limit(4)
    .execute([...])  # 执行若干算子
    .cache()
)

results = dataset.collect()
dataset.write_json("output.jsonl")
Code language: PHP (php)

统计与聚合 API

除了“处理数据”,Dataset 也提供了一套轻量的分析 / 聚合接口:

  • 常用方法count()min(key_fn) / max(key_fn)sum(key_fn)unique(key_fn) 等。
  • 高级聚合aggregate(*agg_funcs)group_by(key_fn, name=None)

统计 + 分组 + 自定义聚合

下面这个例子展示了 Dataset 在数据分析场景下的各种用法:

example_data = [
    RowSchema(
        data={
            "country": random.choice(["US", "CN", "JP", "KR", "TW", "HK"]),
            "score": random.randint(1, 100),
        }
    )
    for _ in range(1024)
]


dataset = create_dataset(example_data)
statistics = {
    "count": dataset.count(),
    "min": dataset.min(lambda x: x["score"]),
    "max": dataset.max(lambda x: x["score"]),
    "unique": dataset.unique(lambda x: x["country"]),
    "agg": dataset.aggregate(
        Count(),
        Sum(lambda x: x["score"]),
        Sum(lambda x: x["score"] * 2, name="score_x2"),
        Mean(lambda x: x["score"]),
    ),
    "bin_score_count": [
        agg.data
        for agg in dataset.execute([EqualWidthBinner(lambda x: x["score"])])
        .group_by(lambda x: x.metadata["bin_label"], name="bin")
        .count()
        .sort(lambda x: x["bin"])
        .collect()
    ],
    "group_by_country_agg": [
        agg.data
        for agg in dataset.group_by(lambda x: x["country"])
        .aggregate(
            Count(),
            Sum(lambda x: x["score"]),
            Sum(lambda x: x["score"] * 2, name="score_x2"),
            Mean(lambda x: x["score"]),
        )
        .collect()
    ],
    "custom_aggregate_func": dataset.aggregate(
        RowAggregateFunc(
            initial_value={"cn_count": 0},
            accumulate=lambda x, y: {
                "cn_count": (
                    x["cn_count"] + 1 if y["country"] == "CN" else x["cn_count"]
                ),
            },
            merge=lambda x, y: {
                "cn_count": x["cn_count"] + y["cn_count"],
            },
            finalize=lambda x: x["cn_count"],
        ),
    ),
}
print(statistics)
Code language: PHP (php)

这个例子里可以看到几件事:

  • Dataset 上可以直接做全局统计:countminmaxuniqueaggregate
  • 按列(这里是 country)分组后再做聚合分析:group_by(...).aggregate(...).collect()
  • 使用 RowAggregateFunc 写出完全自定义的聚合逻辑(上面统计的是 country == "CN" 的样本数)。(PyPI)
  • 通过 EqualWidthBinner 这样的算子先做“等宽分箱”,分箱结果写在 metadata["bin_label"],再配合 group_by + count 做简易分布统计。

换句话说,在 BlossomData 里,“处理 + 分析”可以共用一套 Dataset / DataFrame API,不需要在多个框架之间切来切去。

小结

BlossomData 的核心诉求并不是“做一个又一套大而全的框架”,而是尽量解决大模型训练数据处理中一些反复出现的工程问题:数据结构不清晰、流水线无法复用、环境迁移成本高、外部配置散落在各处等等。框架专注于大模型场景,解决了这些常见痛点,进而提升数据工程效率。

Azure99

底层码农,休闲音游玩家,偶尔写写代码

看看这些?

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注