Salesforce Bulk API 大规模数据操作:数据工程师指南

背景与应用场景

作为一名 Salesforce 数据工程师,我们经常面临处理海量数据的挑战。在现代企业中,Salesforce 不仅仅是一个客户关系管理(CRM)系统,它更是业务运营的核心数据枢纽,承载着销售、服务、营销等关键数据。然而,当我们需要将数百万条甚至数千万条记录从外部系统(如数据仓库、企业资源规划系统 ERP、其他 CRM)导入 Salesforce,或从中导出用于分析、集成时,传统的基于 UI 的数据加载工具(如数据加载向导)或同步的 API(如 SOAP API、REST API)往往会显得力不从心。

标准 API 调用通常是同步的,每次只能处理一条或少量记录,这对于大规模数据操作来说效率低下,且容易触及 API 调用限制。因此,Salesforce 提供了 Bulk API(批量 API),这是一个专门为处理大量数据而设计的高度优化且异步的 RESTful API。它允许您异步地插入、更新、更新插入(upsert)、删除或查询大量记录,从而显著提高数据处理的吞吐量。

Bulk API 的主要应用场景包括:

  • 初始数据迁移 (Initial Data Migration):当企业首次实施 Salesforce 或从旧系统迁移数据时,需要将大量的历史数据一次性导入 Salesforce。
  • 定期 ETL 过程 (Regular ETL Processes):从数据仓库或其他源系统定期提取、转换并加载(Extract, Transform, Load)数据到 Salesforce,保持数据的同步和更新。
  • 大规模数据更新/删除 (Mass Data Updates/Deletions):执行数据清理、标准化或数据归档操作,需要对大量现有 Salesforce 记录进行批量修改或删除。
  • 大规模数据导出 (Large-Scale Data Exports):将 Salesforce 中的海量数据导出到外部数据湖、数据仓库进行深入分析或备份。
  • 系统集成 (System Integration):在两个或多个系统之间同步大量数据,其中 Salesforce 是参与方之一。

对于数据工程师而言,掌握 Bulk API 是构建健壮、高效、可扩展的数据集成和数据管道的关键技能。它使我们能够设计出能够应对大数据量挑战的数据解决方案。


原理说明

Bulk API 的核心设计理念是异步处理 (Asynchronous Processing)批次处理 (Batch Processing)。与传统的同步 API 调用不同,您提交一个 Bulk API Job(批量 API 作业)后,它会在 Salesforce 的后台异步执行。这种方式极大地提高了数据处理的效率和吞吐量。

Bulk API 的工作流程概述:

  1. 创建 Job (Create Job):首先,您需要创建一个 Job。这个 Job 定义了您要执行的操作(插入、更新、更新插入、删除或查询)、目标对象(例如 Account、Contact)以及数据类型(CSV 或 JSON)。
  2. 上传数据 (Upload Data):接下来,您将需要处理的数据以批次 (Batches) 的形式上传到该 Job 中。Bulk API 会将这些数据自动或手动地分割成更小的批次。
    • Bulk API 1.0 中,您需要手动管理批次,明确提交每个批次。
    • Bulk API 2.0 中,您只需将所有数据流式上传到 Job 中,Salesforce 会自动处理数据的分批,这大大简化了开发工作。
  3. 关闭 Job (Close Job):所有数据上传完成后,您需要将 Job 标记为完成上传。此时,Salesforce 会开始处理这些批次。
  4. 监控 Job 状态 (Monitor Job Status):由于是异步操作,您需要定期查询 Job 的状态,以了解其处理进度。
  5. 获取结果 (Retrieve Results):一旦 Job 完成,您可以下载处理成功的记录以及处理失败的记录及其错误信息。这对于错误处理和数据修正至关重要。

Bulk API 1.0 与 Bulk API 2.0

Salesforce 提供了两个版本的 Bulk API

  • Bulk API 1.0:这是较早的版本,功能强大但使用起来相对复杂。它要求开发者手动管理批次,每次提交的批次大小有明确的限制(通常最多 10,000 条记录或 10MB)。它支持 CSV 和 XML 格式。
  • Bulk API 2.0:这是推荐的新版本,它通过自动化批次管理极大地简化了开发流程。您只需将所有数据流式传输到 Job 中,Salesforce 会自动进行优化和分批处理。它支持 CSV 和 JSON 格式,并且处理失败的记录结果中包含原始数据行,方便重试。Bulk API 2.0 通常具有更好的性能和更简化的错误处理。对于新的数据集成项目,强烈建议优先考虑使用 Bulk API 2.0

作为数据工程师,我们的目标是构建高效且易于维护的数据管道,因此在大多数情况下,我们应该优先采用 Bulk API 2.0


示例代码

以下是使用 Bulk API 2.0 执行大规模数据插入操作的 cURL 示例。我们将演示如何创建一个 Job、上传数据、关闭 Job、检查状态以及获取处理结果。假设您已经通过 OAuth 2.0 流程获得了有效的 access_tokeninstance_url

1. 创建 Bulk API 2.0 Ingest Job

首先,我们需要创建一个新的 Ingest Job,指定要操作的对象、操作类型和数据内容类型。这里我们创建一个插入 Account 记录的 Job,数据格式为 CSV。

curl --request POST \
  --url "$instance_url/services/data/vXX.0/jobs/ingest" \
  --header "Authorization: Bearer $access_token" \
  --header "Content-Type: application/json; charset=UTF-8" \
  --data '{
    "object": "Account",
    "operation": "insert",
    "contentType": "CSV",
    "lineEnding": "LF"
  }'

解释:

  • --request POST:指定 HTTP POST 方法。
  • --url "$instance_url/services/data/vXX.0/jobs/ingest":Bulk API 2.0 Job 创建的端点。vXX.0 应该替换为您的 Salesforce API 版本,例如 v58.0
  • --header "Authorization: Bearer $access_token":OAuth 授权头,包含您的访问令牌。
  • --header "Content-Type: application/json; charset=UTF-8":指定请求体为 JSON 格式。
  • --data:包含 Job 配置的 JSON 请求体。
    • "object": "Account":指定要操作的 Salesforce 对象。
    • "operation": "insert":指定操作类型,可以是 insert, update, upsert, delete
    • "contentType": "CSV":指定数据格式为 CSV。
    • "lineEnding": "LF":指定 CSV 文件的行结束符,通常为 LF (Line Feed) 或 CRLF (Carriage Return Line Feed)。
成功响应会返回一个包含 id(Job ID)、operationobject 等信息的 JSON 对象。

2. 上传数据到 Job

创建 Job 后,您可以通过 HTTP PUT 请求将 CSV 数据上传到 Job。这里我们将模拟一个包含 Name 和 AnnualRevenue 字段的 CSV 数据。

curl --request PUT \
  --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId/batches" \
  --header "Authorization: Bearer $access_token" \
  --header "Content-Type: text/csv" \
  --data 'Name,AnnualRevenue
Account 1,100000
Account 2,200000
Account 3,300000'

解释:

  • --request PUT:指定 HTTP PUT 方法。
  • --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId/batches":数据上传的端点,$jobId 替换为上一步创建 Job 时返回的 Job ID。
  • --header "Content-Type: text/csv":指定请求体为 CSV 格式。
  • --data:包含要上传的 CSV 数据。第一行为字段标题,后续为数据行。
这个请求不会立即返回处理结果,而是一个 204 No Content 响应,表示数据已成功接收。

3. 关闭 Job

所有数据上传完毕后,您需要将 Job 的状态更新为 UploadComplete,告诉 Salesforce 可以开始处理数据了。

curl --request PATCH \
  --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId" \
  --header "Authorization: Bearer $access_token" \
  --header "Content-Type: application/json; charset=UTF-8" \
  --data '{
    "state": "UploadComplete"
  }'

解释:

  • --request PATCH:指定 HTTP PATCH 方法用于部分更新资源。
  • --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId":指定要更新的 Job。
  • "state": "UploadComplete":将 Job 的状态设置为 UploadComplete。此时 Salesforce 将开始处理批次。您也可以设置为 Aborted 来取消 Job。
成功响应会返回更新后的 Job 状态。

4. 检查 Job 状态

由于 Job 是异步处理的,您需要定期检查其状态,直到它变为 JobCompleteFailed

curl --request GET \
  --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId" \
  --header "Authorization: Bearer $access_token" \
  --header "Accept: application/json"

解释:

  • --request GET:指定 HTTP GET 方法。
  • 此请求将返回包含 Job 当前状态的 JSON 对象,其中 state 字段会显示当前进度(如 Open, UploadComplete, InProgress, JobComplete, Failed, Aborted)。
  • 您还需要关注 numberRecordsProcessed(已处理记录数)和 numberRecordsFailed(失败记录数)等字段。

5. 获取成功结果

当 Job 状态为 JobComplete 后,您可以获取所有成功处理的记录。

curl --request GET \
  --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId/successfulResults" \
  --header "Authorization: Bearer $access_token" \
  --header "Accept: text/csv"

解释:

  • 此请求会返回一个 CSV 格式的响应,包含处理成功的记录信息,例如 Salesforce ID、外部 ID 等。

6. 获取失败结果

同样,如果 Job 中有失败的记录,您需要获取它们以进行分析和重试。

curl --request GET \
  --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId/failedResults" \
  --header "Authorization: Bearer $access_token" \
  --header "Accept: text/csv"

解释:

  • 此请求会返回一个 CSV 格式的响应,包含所有处理失败的记录及其对应的错误消息。对于 Bulk API 2.0,通常还会包含原始数据行,这对于错误重试非常有用。

注意:上述 vXX.0 应替换为实际的 Salesforce API 版本,例如 v58.0


注意事项

在使用 Bulk API 构建数据集成解决方案时,作为数据工程师,我们需要关注以下关键事项,以确保数据处理的效率、可靠性和安全性。

权限 (Permissions)

  • API Enabled (API 已启用):执行任何 API 操作的用户或集成用户必须在其配置文件(Profile)或权限集(Permission Set)中拥有“API Enabled”权限。
  • 对象和字段级别安全性 (Object and Field-Level Security, FLS):用于执行 Bulk API 操作的用户必须对目标对象拥有适当的 CRUD(创建、读取、更新、删除)权限,并且对所有涉及的字段拥有读取或写入权限。例如,如果要插入 Account 记录并填充 Name 字段,则用户必须具有 Account 对象的创建权限和 Name 字段的编辑权限。

API 限制 (API Limits)

Salesforce 对 API 使用施加了多项限制,以确保系统的稳定性和公平使用。了解这些限制至关重要:

  • 每日 API 请求限制 (Daily API Call Limit):这是组织在 24 小时内可以发出的 API 调用总数。每次 Bulk API 提交数据(或查询)都会计入此限制。大型导入或导出项目需要仔细规划,避免触及限制。
  • 并发 Job 限制 (Concurrent Job Limits):Salesforce 对可以同时处理的活跃 Bulk API Job 数量有限制(通常为 5 个)。超过此限制的 Job 会排队等待。
  • 单个 Job 的记录限制 (Records per Job Limit)
    • Bulk API 1.0:每个批次最多 10,000 条记录或 10MB 数据,每个 Job 最多 15,000 个批次,总计可处理多达 1500 万条记录。
    • Bulk API 2.0:由于自动分批,通常对单个 Job 的记录总数没有硬性限制,但推荐的最大文件大小通常为 150MB。
  • 文件大小限制 (File Size Limit):上传的数据文件(CSV/JSON)通常最大限制为 150MB。如果数据量更大,需要将其分割成多个文件上传到同一个 Job 中。
  • 查询结果限制 (Query Results Limit):对于 Bulk Query API,单个查询结果集通常最大为 15GB。

触及这些限制会导致 Job 失败,因此,在设计数据集成解决方案时,务必考虑这些限制,并实施相应的流量控制和重试策略。

错误处理 (Error Handling)

健壮的错误处理机制是任何数据管道的核心:

  • 解析失败结果 (Parsing Failed Results):务必下载并解析 failedResults 文件。它包含原始数据行、Salesforce 错误码和错误消息,如 REQUIRED_FIELD_MISSINGDUPLICATE_VALUEFIELD_CUSTOM_VALIDATION_EXCEPTIONUNABLE_TO_LOCK_ROW 等。
  • 重试策略 (Retry Strategy)
    • 瞬时错误 (Transient Errors):对于由并发锁争用(如 UNABLE_TO_LOCK_ROW)、API 限制暂时达到或瞬时网络问题导致的错误,应实施带指数退避(Exponential Backoff)的重试机制。
    • 数据错误 (Data Errors):对于由数据质量问题(如必填字段缺失、数据格式不正确)或业务逻辑验证规则导致的错误,重试无济于事。需要人工干预修正数据或调整映射关系。
  • 日志记录 (Logging):详细记录每个 Job 的状态、处理的记录数、失败的记录数以及具体的错误信息,以便于审计、故障排除和问题解决。

数据质量与验证 (Data Quality and Validation)

  • 触发器和自动化 (Triggers and Automations):Bulk API 操作默认会触发 Salesforce 中的所有自动化,包括 Apex 触发器(Apex Triggers)、工作流规则(Workflow Rules)、流(Flows)、验证规则(Validation Rules)、以及重复规则(Duplicate Rules)。这对于保持数据完整性至关重要,但也可能影响性能。
  • 外部 ID (External ID):对于 upsert(更新插入)操作,使用外部 ID 至关重要。外部 ID 是一个自定义字段,带有“外部 ID”属性,用于将 Salesforce 外部的记录与 Salesforce 内部的记录进行匹配。它能够防止在执行 upsert 操作时创建重复记录。
  • 绕过自动化 (Bypassing Automations):在某些极端情况下,为了提高性能或避免不必要的业务逻辑执行,可以考虑暂时禁用某些非关键的自动化。但这种做法极度不推荐,因为它可能导致数据不一致或绕过重要的业务规则,必须在严格控制和充分理解风险的情况下进行。更好的方法是优化自动化逻辑或利用批处理感知的触发器。

性能优化 (Performance Optimization)

  • 批次大小 (Batch Size):虽然 Bulk API 2.0 自动管理批次,但了解其内部机制仍然有益。对于 Bulk API 1.0,最佳批次大小通常在 2,000 到 10,000 条记录之间,具体取决于数据复杂性和自动化程度。过大或过小的批次都可能影响性能。
  • 并行处理 (Parallel Processing):Salesforce 会并行处理多个批次。然而,如果您的 Job 对同一组记录进行频繁操作,过高的并行度可能导致数据库锁争用(Row Lock Contention),从而降低整体性能,甚至导致 UNABLE_TO_LOCK_ROW 错误。设计数据加载策略时,尽量避免对同一父记录的子记录进行高度并发操作。
  • 字段索引 (Field Indexing):确保在用于匹配或过滤的字段(特别是外部 ID)上创建了索引,以加快查找速度。
  • 减少计算和自动化 (Minimize Computations and Automations):在导入大量数据时,如果可能,优化或暂时禁用非关键的 Apex 触发器、流和工作流,可以显著提高导入速度。但必须权衡性能与数据完整性及业务逻辑的遵循。

总结与最佳实践

Bulk API 是 Salesforce 平台处理大规模数据操作的基石,是数据工程师构建高效、可靠数据管道不可或缺的工具。它通过异步处理和批次优化,解决了传统同步 API 在处理海量数据时的性能瓶颈。

总结要点:

  • 高效性:Bulk API 专为处理大量数据而设计,通过并行处理批次,显著提高了数据加载和查询的吞吐量。
  • 异步性:操作在后台执行,允许客户端提交 Job 后继续执行其他任务,提高了集成系统的响应能力。
  • 健壮性:内置的错误报告机制有助于识别和处理失败的记录,为数据清理和重试提供了依据。
  • 灵活性:支持插入、更新、更新插入、删除和查询等多种操作,适用于各种数据管理场景。

最佳实践:

  1. 优先使用 Bulk API 2.0 (Prefer Bulk API 2.0):对于所有新的数据集成项目,始终优先选择 Bulk API 2.0。它提供了更简化的开发模型、自动化的批次管理和更友好的错误报告,极大地降低了开发和维护成本。
  2. 设计健壮的错误处理机制 (Design Robust Error Handling)
    • 始终下载并分析 failedResults 文件。
    • 区分瞬时错误和数据错误,并实施相应的重试逻辑。对于瞬时错误,采用指数退避策略进行重试;对于数据错误,记录并通知相关人员进行数据修正。
    • 确保您的数据管道可以处理部分成功的情况,并能从失败中恢复。
  3. 充分利用外部 ID (Leverage External IDs):对于 upsert 操作,外部 ID 是匹配外部系统记录和 Salesforce 记录的关键。合理规划和使用外部 ID 可以有效避免数据重复和提高数据匹配准确性。
  4. 监控与日志记录 (Monitor and Log)
    • 密切监控 Bulk API Job 的状态和性能指标。
    • 详细记录每次数据加载或导出操作的结果,包括 Job ID、处理时间、成功记录数、失败记录数以及详细的错误信息。这些日志对于审计、性能分析和故障排除至关重要。
  5. 理解并遵守 Salesforce 限制 (Understand and Adhere to Salesforce Limits)
    • 清楚了解每日 API 调用限制、并发 Job 限制以及文件大小限制。在设计数据管道时,考虑这些限制,并实施流量控制机制。
    • 对于大型数据集,可能需要将数据分割成多个 Job 或在更长的时间内分批处理。
  6. 在沙盒中充分测试 (Thoroughly Test in Sandboxes):在将数据加载到生产环境之前,务必在全量沙盒(Full Sandbox)中进行充分的性能和功能测试。模拟生产环境的数据量和复杂度,验证您的数据管道能够稳定、高效地运行。
  7. 数据质量为先 (Prioritize Data Quality):虽然 Bulk API 能够快速处理数据,但它不能替代良好的数据质量策略。在数据进入 Salesforce 之前,尽可能在源系统或 ETL 过程中进行数据清洗和验证,以减少 Salesforce 端的验证失败。
  8. 权衡性能与自动化 (Balance Performance and Automation):Bulk API 操作会触发 Salesforce 的自动化(触发器、流、验证规则等)。在某些对性能要求极高的场景下,可能需要与业务分析师和管理员协作,暂时性地禁用或优化部分自动化。但这应视为最后手段,并确保在数据加载完成后立即恢复所有自动化,以维护数据完整性和业务逻辑。

作为数据工程师,我们的目标是构建可伸缩、可靠且高效的数据集成解决方案。通过深入理解和熟练运用 Salesforce Bulk API,我们可以有效地应对大规模数据挑战,确保企业关键数据的顺畅流动和高质量维护。

评论

此博客中的热门博文

Salesforce 登录取证:深入解析用户访问监控与安全

Salesforce Experience Cloud 技术深度解析:构建社区站点 (Community Sites)

Salesforce Data Loader 全方位指南:数据迁移与管理的最佳实践