Salesforce Bulk API 大规模数据操作:数据工程师指南
背景与应用场景
作为一名 Salesforce 数据工程师,我们经常面临处理海量数据的挑战。在现代企业中,Salesforce 不仅仅是一个客户关系管理(CRM)系统,它更是业务运营的核心数据枢纽,承载着销售、服务、营销等关键数据。然而,当我们需要将数百万条甚至数千万条记录从外部系统(如数据仓库、企业资源规划系统 ERP、其他 CRM)导入 Salesforce,或从中导出用于分析、集成时,传统的基于 UI 的数据加载工具(如数据加载向导)或同步的 API(如 SOAP API、REST API)往往会显得力不从心。
标准 API 调用通常是同步的,每次只能处理一条或少量记录,这对于大规模数据操作来说效率低下,且容易触及 API 调用限制。因此,Salesforce 提供了 Bulk API(批量 API),这是一个专门为处理大量数据而设计的高度优化且异步的 RESTful API。它允许您异步地插入、更新、更新插入(upsert)、删除或查询大量记录,从而显著提高数据处理的吞吐量。
Bulk API 的主要应用场景包括:
- 初始数据迁移 (Initial Data Migration):当企业首次实施 Salesforce 或从旧系统迁移数据时,需要将大量的历史数据一次性导入 Salesforce。
- 定期 ETL 过程 (Regular ETL Processes):从数据仓库或其他源系统定期提取、转换并加载(Extract, Transform, Load)数据到 Salesforce,保持数据的同步和更新。
- 大规模数据更新/删除 (Mass Data Updates/Deletions):执行数据清理、标准化或数据归档操作,需要对大量现有 Salesforce 记录进行批量修改或删除。
- 大规模数据导出 (Large-Scale Data Exports):将 Salesforce 中的海量数据导出到外部数据湖、数据仓库进行深入分析或备份。
- 系统集成 (System Integration):在两个或多个系统之间同步大量数据,其中 Salesforce 是参与方之一。
对于数据工程师而言,掌握 Bulk API 是构建健壮、高效、可扩展的数据集成和数据管道的关键技能。它使我们能够设计出能够应对大数据量挑战的数据解决方案。
原理说明
Bulk API 的核心设计理念是异步处理 (Asynchronous Processing) 和批次处理 (Batch Processing)。与传统的同步 API 调用不同,您提交一个 Bulk API Job(批量 API 作业)后,它会在 Salesforce 的后台异步执行。这种方式极大地提高了数据处理的效率和吞吐量。
Bulk API 的工作流程概述:
- 创建 Job (Create Job):首先,您需要创建一个 Job。这个 Job 定义了您要执行的操作(插入、更新、更新插入、删除或查询)、目标对象(例如 Account、Contact)以及数据类型(CSV 或 JSON)。
- 上传数据 (Upload Data):接下来,您将需要处理的数据以批次 (Batches) 的形式上传到该 Job 中。Bulk API 会将这些数据自动或手动地分割成更小的批次。
- 在 Bulk API 1.0 中,您需要手动管理批次,明确提交每个批次。
- 在 Bulk API 2.0 中,您只需将所有数据流式上传到 Job 中,Salesforce 会自动处理数据的分批,这大大简化了开发工作。
- 关闭 Job (Close Job):所有数据上传完成后,您需要将 Job 标记为完成上传。此时,Salesforce 会开始处理这些批次。
- 监控 Job 状态 (Monitor Job Status):由于是异步操作,您需要定期查询 Job 的状态,以了解其处理进度。
- 获取结果 (Retrieve Results):一旦 Job 完成,您可以下载处理成功的记录以及处理失败的记录及其错误信息。这对于错误处理和数据修正至关重要。
Bulk API 1.0 与 Bulk API 2.0
Salesforce 提供了两个版本的 Bulk API:
- Bulk API 1.0:这是较早的版本,功能强大但使用起来相对复杂。它要求开发者手动管理批次,每次提交的批次大小有明确的限制(通常最多 10,000 条记录或 10MB)。它支持 CSV 和 XML 格式。
- Bulk API 2.0:这是推荐的新版本,它通过自动化批次管理极大地简化了开发流程。您只需将所有数据流式传输到 Job 中,Salesforce 会自动进行优化和分批处理。它支持 CSV 和 JSON 格式,并且处理失败的记录结果中包含原始数据行,方便重试。Bulk API 2.0 通常具有更好的性能和更简化的错误处理。对于新的数据集成项目,强烈建议优先考虑使用 Bulk API 2.0。
作为数据工程师,我们的目标是构建高效且易于维护的数据管道,因此在大多数情况下,我们应该优先采用 Bulk API 2.0。
示例代码
以下是使用 Bulk API 2.0 执行大规模数据插入操作的 cURL 示例。我们将演示如何创建一个 Job、上传数据、关闭 Job、检查状态以及获取处理结果。假设您已经通过 OAuth 2.0 流程获得了有效的 access_token
和 instance_url
。
1. 创建 Bulk API 2.0 Ingest Job
首先,我们需要创建一个新的 Ingest Job,指定要操作的对象、操作类型和数据内容类型。这里我们创建一个插入 Account 记录的 Job,数据格式为 CSV。
curl --request POST \ --url "$instance_url/services/data/vXX.0/jobs/ingest" \ --header "Authorization: Bearer $access_token" \ --header "Content-Type: application/json; charset=UTF-8" \ --data '{ "object": "Account", "operation": "insert", "contentType": "CSV", "lineEnding": "LF" }'
解释:
--request POST
:指定 HTTP POST 方法。--url "$instance_url/services/data/vXX.0/jobs/ingest"
:Bulk API 2.0 Job 创建的端点。vXX.0
应该替换为您的 Salesforce API 版本,例如v58.0
。--header "Authorization: Bearer $access_token"
:OAuth 授权头,包含您的访问令牌。--header "Content-Type: application/json; charset=UTF-8"
:指定请求体为 JSON 格式。--data
:包含 Job 配置的 JSON 请求体。"object": "Account"
:指定要操作的 Salesforce 对象。"operation": "insert"
:指定操作类型,可以是insert
,update
,upsert
,delete
。"contentType": "CSV"
:指定数据格式为 CSV。"lineEnding": "LF"
:指定 CSV 文件的行结束符,通常为 LF (Line Feed) 或 CRLF (Carriage Return Line Feed)。
id
(Job ID)、operation
、object
等信息的 JSON 对象。
2. 上传数据到 Job
创建 Job 后,您可以通过 HTTP PUT 请求将 CSV 数据上传到 Job。这里我们将模拟一个包含 Name 和 AnnualRevenue 字段的 CSV 数据。
curl --request PUT \ --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId/batches" \ --header "Authorization: Bearer $access_token" \ --header "Content-Type: text/csv" \ --data 'Name,AnnualRevenue Account 1,100000 Account 2,200000 Account 3,300000'
解释:
--request PUT
:指定 HTTP PUT 方法。--url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId/batches"
:数据上传的端点,$jobId
替换为上一步创建 Job 时返回的 Job ID。--header "Content-Type: text/csv"
:指定请求体为 CSV 格式。--data
:包含要上传的 CSV 数据。第一行为字段标题,后续为数据行。
3. 关闭 Job
所有数据上传完毕后,您需要将 Job 的状态更新为 UploadComplete
,告诉 Salesforce 可以开始处理数据了。
curl --request PATCH \ --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId" \ --header "Authorization: Bearer $access_token" \ --header "Content-Type: application/json; charset=UTF-8" \ --data '{ "state": "UploadComplete" }'
解释:
--request PATCH
:指定 HTTP PATCH 方法用于部分更新资源。--url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId"
:指定要更新的 Job。"state": "UploadComplete"
:将 Job 的状态设置为UploadComplete
。此时 Salesforce 将开始处理批次。您也可以设置为Aborted
来取消 Job。
4. 检查 Job 状态
由于 Job 是异步处理的,您需要定期检查其状态,直到它变为 JobComplete
或 Failed
。
curl --request GET \ --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId" \ --header "Authorization: Bearer $access_token" \ --header "Accept: application/json"
解释:
--request GET
:指定 HTTP GET 方法。- 此请求将返回包含 Job 当前状态的 JSON 对象,其中
state
字段会显示当前进度(如Open
,UploadComplete
,InProgress
,JobComplete
,Failed
,Aborted
)。 - 您还需要关注
numberRecordsProcessed
(已处理记录数)和numberRecordsFailed
(失败记录数)等字段。
5. 获取成功结果
当 Job 状态为 JobComplete
后,您可以获取所有成功处理的记录。
curl --request GET \ --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId/successfulResults" \ --header "Authorization: Bearer $access_token" \ --header "Accept: text/csv"
解释:
- 此请求会返回一个 CSV 格式的响应,包含处理成功的记录信息,例如 Salesforce ID、外部 ID 等。
6. 获取失败结果
同样,如果 Job 中有失败的记录,您需要获取它们以进行分析和重试。
curl --request GET \ --url "$instance_url/services/data/vXX.0/jobs/ingest/$jobId/failedResults" \ --header "Authorization: Bearer $access_token" \ --header "Accept: text/csv"
解释:
- 此请求会返回一个 CSV 格式的响应,包含所有处理失败的记录及其对应的错误消息。对于 Bulk API 2.0,通常还会包含原始数据行,这对于错误重试非常有用。
注意:上述 vXX.0
应替换为实际的 Salesforce API 版本,例如 v58.0
。
注意事项
在使用 Bulk API 构建数据集成解决方案时,作为数据工程师,我们需要关注以下关键事项,以确保数据处理的效率、可靠性和安全性。
权限 (Permissions)
- API Enabled (API 已启用):执行任何 API 操作的用户或集成用户必须在其配置文件(Profile)或权限集(Permission Set)中拥有“API Enabled”权限。
- 对象和字段级别安全性 (Object and Field-Level Security, FLS):用于执行 Bulk API 操作的用户必须对目标对象拥有适当的 CRUD(创建、读取、更新、删除)权限,并且对所有涉及的字段拥有读取或写入权限。例如,如果要插入 Account 记录并填充 Name 字段,则用户必须具有 Account 对象的创建权限和 Name 字段的编辑权限。
API 限制 (API Limits)
Salesforce 对 API 使用施加了多项限制,以确保系统的稳定性和公平使用。了解这些限制至关重要:
- 每日 API 请求限制 (Daily API Call Limit):这是组织在 24 小时内可以发出的 API 调用总数。每次 Bulk API 提交数据(或查询)都会计入此限制。大型导入或导出项目需要仔细规划,避免触及限制。
- 并发 Job 限制 (Concurrent Job Limits):Salesforce 对可以同时处理的活跃 Bulk API Job 数量有限制(通常为 5 个)。超过此限制的 Job 会排队等待。
- 单个 Job 的记录限制 (Records per Job Limit):
- Bulk API 1.0:每个批次最多 10,000 条记录或 10MB 数据,每个 Job 最多 15,000 个批次,总计可处理多达 1500 万条记录。
- Bulk API 2.0:由于自动分批,通常对单个 Job 的记录总数没有硬性限制,但推荐的最大文件大小通常为 150MB。
- 文件大小限制 (File Size Limit):上传的数据文件(CSV/JSON)通常最大限制为 150MB。如果数据量更大,需要将其分割成多个文件上传到同一个 Job 中。
- 查询结果限制 (Query Results Limit):对于 Bulk Query API,单个查询结果集通常最大为 15GB。
触及这些限制会导致 Job 失败,因此,在设计数据集成解决方案时,务必考虑这些限制,并实施相应的流量控制和重试策略。
错误处理 (Error Handling)
健壮的错误处理机制是任何数据管道的核心:
- 解析失败结果 (Parsing Failed Results):务必下载并解析
failedResults
文件。它包含原始数据行、Salesforce 错误码和错误消息,如REQUIRED_FIELD_MISSING
、DUPLICATE_VALUE
、FIELD_CUSTOM_VALIDATION_EXCEPTION
、UNABLE_TO_LOCK_ROW
等。 - 重试策略 (Retry Strategy):
- 瞬时错误 (Transient Errors):对于由并发锁争用(如
UNABLE_TO_LOCK_ROW
)、API 限制暂时达到或瞬时网络问题导致的错误,应实施带指数退避(Exponential Backoff)的重试机制。 - 数据错误 (Data Errors):对于由数据质量问题(如必填字段缺失、数据格式不正确)或业务逻辑验证规则导致的错误,重试无济于事。需要人工干预修正数据或调整映射关系。
- 瞬时错误 (Transient Errors):对于由并发锁争用(如
- 日志记录 (Logging):详细记录每个 Job 的状态、处理的记录数、失败的记录数以及具体的错误信息,以便于审计、故障排除和问题解决。
数据质量与验证 (Data Quality and Validation)
- 触发器和自动化 (Triggers and Automations):Bulk API 操作默认会触发 Salesforce 中的所有自动化,包括 Apex 触发器(Apex Triggers)、工作流规则(Workflow Rules)、流(Flows)、验证规则(Validation Rules)、以及重复规则(Duplicate Rules)。这对于保持数据完整性至关重要,但也可能影响性能。
- 外部 ID (External ID):对于
upsert
(更新插入)操作,使用外部 ID 至关重要。外部 ID 是一个自定义字段,带有“外部 ID”属性,用于将 Salesforce 外部的记录与 Salesforce 内部的记录进行匹配。它能够防止在执行upsert
操作时创建重复记录。 - 绕过自动化 (Bypassing Automations):在某些极端情况下,为了提高性能或避免不必要的业务逻辑执行,可以考虑暂时禁用某些非关键的自动化。但这种做法极度不推荐,因为它可能导致数据不一致或绕过重要的业务规则,必须在严格控制和充分理解风险的情况下进行。更好的方法是优化自动化逻辑或利用批处理感知的触发器。
性能优化 (Performance Optimization)
- 批次大小 (Batch Size):虽然 Bulk API 2.0 自动管理批次,但了解其内部机制仍然有益。对于 Bulk API 1.0,最佳批次大小通常在 2,000 到 10,000 条记录之间,具体取决于数据复杂性和自动化程度。过大或过小的批次都可能影响性能。
- 并行处理 (Parallel Processing):Salesforce 会并行处理多个批次。然而,如果您的 Job 对同一组记录进行频繁操作,过高的并行度可能导致数据库锁争用(Row Lock Contention),从而降低整体性能,甚至导致
UNABLE_TO_LOCK_ROW
错误。设计数据加载策略时,尽量避免对同一父记录的子记录进行高度并发操作。 - 字段索引 (Field Indexing):确保在用于匹配或过滤的字段(特别是外部 ID)上创建了索引,以加快查找速度。
- 减少计算和自动化 (Minimize Computations and Automations):在导入大量数据时,如果可能,优化或暂时禁用非关键的 Apex 触发器、流和工作流,可以显著提高导入速度。但必须权衡性能与数据完整性及业务逻辑的遵循。
总结与最佳实践
Bulk API 是 Salesforce 平台处理大规模数据操作的基石,是数据工程师构建高效、可靠数据管道不可或缺的工具。它通过异步处理和批次优化,解决了传统同步 API 在处理海量数据时的性能瓶颈。
总结要点:
- 高效性:Bulk API 专为处理大量数据而设计,通过并行处理批次,显著提高了数据加载和查询的吞吐量。
- 异步性:操作在后台执行,允许客户端提交 Job 后继续执行其他任务,提高了集成系统的响应能力。
- 健壮性:内置的错误报告机制有助于识别和处理失败的记录,为数据清理和重试提供了依据。
- 灵活性:支持插入、更新、更新插入、删除和查询等多种操作,适用于各种数据管理场景。
最佳实践:
- 优先使用 Bulk API 2.0 (Prefer Bulk API 2.0):对于所有新的数据集成项目,始终优先选择 Bulk API 2.0。它提供了更简化的开发模型、自动化的批次管理和更友好的错误报告,极大地降低了开发和维护成本。
- 设计健壮的错误处理机制 (Design Robust Error Handling):
- 始终下载并分析
failedResults
文件。 - 区分瞬时错误和数据错误,并实施相应的重试逻辑。对于瞬时错误,采用指数退避策略进行重试;对于数据错误,记录并通知相关人员进行数据修正。
- 确保您的数据管道可以处理部分成功的情况,并能从失败中恢复。
- 始终下载并分析
- 充分利用外部 ID (Leverage External IDs):对于
upsert
操作,外部 ID 是匹配外部系统记录和 Salesforce 记录的关键。合理规划和使用外部 ID 可以有效避免数据重复和提高数据匹配准确性。 - 监控与日志记录 (Monitor and Log):
- 密切监控 Bulk API Job 的状态和性能指标。
- 详细记录每次数据加载或导出操作的结果,包括 Job ID、处理时间、成功记录数、失败记录数以及详细的错误信息。这些日志对于审计、性能分析和故障排除至关重要。
- 理解并遵守 Salesforce 限制 (Understand and Adhere to Salesforce Limits):
- 清楚了解每日 API 调用限制、并发 Job 限制以及文件大小限制。在设计数据管道时,考虑这些限制,并实施流量控制机制。
- 对于大型数据集,可能需要将数据分割成多个 Job 或在更长的时间内分批处理。
- 在沙盒中充分测试 (Thoroughly Test in Sandboxes):在将数据加载到生产环境之前,务必在全量沙盒(Full Sandbox)中进行充分的性能和功能测试。模拟生产环境的数据量和复杂度,验证您的数据管道能够稳定、高效地运行。
- 数据质量为先 (Prioritize Data Quality):虽然 Bulk API 能够快速处理数据,但它不能替代良好的数据质量策略。在数据进入 Salesforce 之前,尽可能在源系统或 ETL 过程中进行数据清洗和验证,以减少 Salesforce 端的验证失败。
- 权衡性能与自动化 (Balance Performance and Automation):Bulk API 操作会触发 Salesforce 的自动化(触发器、流、验证规则等)。在某些对性能要求极高的场景下,可能需要与业务分析师和管理员协作,暂时性地禁用或优化部分自动化。但这应视为最后手段,并确保在数据加载完成后立即恢复所有自动化,以维护数据完整性和业务逻辑。
作为数据工程师,我们的目标是构建可伸缩、可靠且高效的数据集成解决方案。通过深入理解和熟练运用 Salesforce Bulk API,我们可以有效地应对大规模数据挑战,确保企业关键数据的顺畅流动和高质量维护。
评论
发表评论