TL;DR (Tóm tắt)
Các API của AWS EMR (Elastic MapReduce) quản lý các cụm dữ liệu lớn chạy Hadoop, Spark, Hive và Presto. Bạn tạo cụm, gửi các công việc dưới dạng các bước, tự động điều chỉnh quy mô dựa trên khối lượng công việc và chấm dứt khi hoàn thành. Xác thực sử dụng AWS IAM. Để kiểm thử, hãy dùng Apidog để xác thực cấu hình cụm, kiểm thử việc gửi công việc dựa trên cấu trúc API và ghi lại các quy trình xử lý dữ liệu của bạn.
Giới thiệu
AWS EMR là dịch vụ Hadoop/Spark được quản lý trên AWS. Nó xử lý petabyte dữ liệu cho các phân tích, học máy và các quy trình ETL. Thay vì tự quản lý cụm Hadoop của riêng bạn, bạn để AWS xử lý cơ sở hạ tầng.
EMR chạy trên các phiên bản EC2 trong một cụm. Bạn chỉ định:
- Loại phiên bản (nút master, core, task)
- Ứng dụng (Spark, Hadoop, Hive, Presto, HBase)
- Hành động khởi động (script thiết lập)
- Các bước (công việc cần chạy)
EMR API cho phép bạn tự động hóa tất cả những điều này. Bạn có thể tạo cụm theo chương trình, gửi công việc, giám sát tiến độ và tích hợp với các dịch vụ AWS khác.
button
Kiểm thử API AWS với Apidog - miễn phí
Đến cuối hướng dẫn này, bạn sẽ có thể:
- Tạo và cấu hình cụm EMR thông qua API
- Gửi các công việc dưới dạng các bước
- Quản lý tự động điều chỉnh quy mô
- Giám sát tình trạng cụm và tiến độ công việc
- Tối ưu hóa chi phí với instance fleets và spot instances
Xác thực với AWS
EMR sử dụng xác thực AWS tiêu chuẩn với IAM.
Cách tiếp cận bằng AWS SDK (được khuyến nghị)
import { EMRClient, RunJobFlowCommand } from '@aws-sdk/client-emr'
const client = new EMRClient({
region: 'us-east-1',
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
}
})
API trực tiếp với SigV4
EMR yêu cầu AWS Signature Version 4. Sử dụng SDK hoặc các công cụ như boto3, AWS CLI, hoặc tạo chữ ký thủ công.
aws emr list-clusters --region us-east-1
Quyền IAM
Chính sách tối thiểu để quản lý EMR:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:*",
"ec2:Describe*",
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": "*"
}
]
}
Tạo một cụm
Tạo cụm cơ bản
aws emr create-cluster \
--name "My Spark Cluster" \
--release-label emr-7.0.0 \
--applications Name=Spark Name=Hadoop \
--instance-type m5.xlarge \
--instance-count 3 \
--service-role EMR_DefaultRole \
--job-flow-role EMR_EC2_DefaultRole
Thông qua API (RunJobFlow)
{
"Name": "Data Processing Cluster",
"ReleaseLabel": "emr-7.0.0",
"Applications": [
{ "Name": "Spark" },
{ "Name": "Hadoop" },
{ "Name": "Hive" }
],
"Instances": {
"MasterInstanceType": "m5.xlarge",
"SlaveInstanceType": "m5.xlarge",
"InstanceCount": 3,
"KeepJobFlowAliveWhenNoSteps": true,
"TerminationProtected": false
},
"Steps": [],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"LogUri": "s3://my-bucket/emr-logs/",
"Tags": [
{ "Key": "Environment", "Value": "Production" }
]
}
Phản hồi:
{
"JobFlowId": "j-ABC123DEF456"
}
Nhóm phiên bản (Instance groups) so với Hạm đội phiên bản (Instance fleets)
Nhóm phiên bản (Instance groups): Loại phiên bản cố định cho mỗi nhóm (master, core, task).
Hạm đội phiên bản (Instance fleets): Nhiều loại/tùy chọn phiên bản trên mỗi nhóm. EMR lựa chọn dựa trên tính khả dụng và giá cả.
{
"Instances": {
"InstanceFleets": [
{
"Name": "MasterFleet",
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
},
{
"InstanceType": "m4.xlarge"
}
]
},
{
"Name": "CoreFleet",
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 2,
"TargetSpotCapacity": 4,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.2xlarge"
},
{
"InstanceType": "m4.2xlarge"
}
],
"LaunchSpecifications": {
"SpotSpecification": {
"TimeoutDurationMinutes": 60,
"TimeoutAction": "SWITCH_TO_ON_DEMAND"
}
}
}
]
}
}
Gửi công việc dưới dạng các bước
EMR thực thi các công việc dưới dạng "các bước" theo trình tự.
Thêm một bước Spark
aws emr add-steps \
--cluster-id j-ABC123DEF456 \
--steps '[
{
"Name": "Process Data",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--class",
"com.example.DataProcessor",
"s3://my-bucket/jars/processor.jar",
"s3://my-bucket/input/",
"s3://my-bucket/output/"
]
}
}
]'
Thông qua API (AddJobFlowSteps)
{
"JobFlowId": "j-ABC123DEF456",
"Steps": [
{
"Name": "Spark ETL Job",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--executor-cores",
"2",
"s3://my-bucket/scripts/process.py",
"--input",
"s3://my-bucket/input/",
"--output",
"s3://my-bucket/output/"
]
}
}
]
}
Các tùy chọn ActionOnFailure
TERMINATE_CLUSTER: Dừng cụm khi thất bạiCANCEL_AND_WAIT: Hủy các bước còn lại, giữ cụm đang chạyCONTINUE: Tiếp tục với bước tiếp theo
Bước Hive
{
"Name": "Hive Query",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"hive-script",
"--run-hive-script",
"--args",
"-f",
"s3://my-bucket/scripts/transform.q"
]
}
}
Tự động điều chỉnh quy mô (Auto-scaling)
EMR có thể thêm/xóa các nút task dựa trên tải.
Tạo chính sách tự động điều chỉnh quy mô
aws emr put-auto-scaling-policy \
--cluster-id j-ABC123DEF456 \
--instance-group-id ig-ABC123 \
--auto-scaling-policy '{
"Constraints": {
"MinCapacity": 2,
"MaxCapacity": 10
},
"Rules": [
{
"Name": "ScaleOut",
"Description": "Add nodes when memory is high",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": 2,
"CoolDown": 300
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"ComparisonOperator": "GREATER_THAN",
"EvaluationPeriods": 3,
"MetricName": "MemoryAvailableMB",
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"Threshold": 2000,
"Statistic": "AVERAGE"
}
}
}
]
}'
Các chỉ số để điều chỉnh quy mô
MemoryAvailableMB: Bộ nhớ trốngMemoryTotalMB: Tổng bộ nhớHDFSUtilization: Dung lượng HDFS đã sử dụngAppsRunning: Ứng dụng YARN đang chạyAppsPending: Ứng dụng YARN đang chờ
Giám sát và ghi nhật ký
Liệt kê các cụm
aws emr list-clusters --states RUNNING
Mô tả cụm
aws emr describe-cluster --cluster-id j-ABC123DEF456
Phản hồi bao gồm:
{
"Cluster": {
"Id": "j-ABC123DEF456",
"Name": "My Cluster",
"Status": {
"State": "RUNNING",
"StateChangeReason": {},
"Timeline": {
"CreationDateTime": "2026-03-24T10:00:00.000Z"
}
},
"Applications": [
{ "Name": "Spark", "Version": "3.5.0" }
],
"InstanceCollectionType": "INSTANCE_GROUP",
"LogUri": "s3://my-bucket/emr-logs/",
"MasterPublicDnsName": "ec2-12-34-56-78.compute-1.amazonaws.com"
}
}
Liệt kê các bước
aws emr list-steps --cluster-id j-ABC123DEF456
Trạng thái bước
{
"Id": "s-ABC123",
"Name": "Process Data",
"Status": {
"State": "COMPLETED",
"Timeline": {
"StartDateTime": "2026-03-24T10:05:00.000Z",
"EndDateTime": "2026-03-24T11:30:00.000Z"
}
}
}
Tích hợp CloudWatch
EMR xuất bản các chỉ số lên CloudWatch:
JobsFailed(Công việc thất bại)JobsRunning(Công việc đang chạy)MemoryAvailableMB(Bộ nhớ khả dụng theo MB)MemoryTotalMB(Tổng bộ nhớ theo MB)HDFSUtilization(Tỷ lệ sử dụng HDFS)
Tối ưu hóa chi phí
Sử dụng các phiên bản Spot
Các nút task rất phù hợp cho các phiên bản Spot. Nếu chúng bị chấm dứt, công việc vẫn tiếp tục trên các nút còn lại.
{
"Name": "TaskGroup",
"InstanceRole": "TASK",
"InstanceType": "m5.2xlarge",
"InstanceCount": 4,
"Market": "SPOT",
"BidPrice": "0.10"
}
Cụm tạm thời (Transient clusters)
Tạo cụm, chạy công việc, tự động chấm dứt:
{
"KeepJobFlowAliveWhenNoSteps": false,
"Steps": [
{ ... step 1 ... },
{ ... step 2 ... }
]
}
Cụm sẽ chấm dứt sau khi tất cả các bước hoàn thành.
Hạm đội phiên bản với nhiều tùy chọn
Để EMR chọn tùy chọn rẻ nhất có sẵn:
{
"InstanceTypeConfigs": [
{
"InstanceType": "m5.2xlarge",
"BidPrice": "0.15"
},
{
"InstanceType": "m4.2xlarge",
"BidPrice": "0.12"
},
{
"InstanceType": "c5.2xlarge",
"BidPrice": "0.10"
}
]
}
Kiểm thử với Apidog
Các cụm EMR rất tốn kém. Hãy kiểm thử cấu hình cẩn thận.

1. Xác thực cấu hình cụm
Lưu các mẫu cụm trong Apidog:
pm.test('Cluster has required applications', () => {
const config = pm.request.body.toJSON()
const apps = config.Applications.map(a => a.Name)
pm.expect(apps).to.include('Spark')
})
pm.test('Instance types are valid', () => {
const config = pm.request.body.toJSON()
const types = ['m5.xlarge', 'm5.2xlarge', 'm4.xlarge']
pm.expect(types).to.include(config.Instances.MasterInstanceType)
})
2. Kiểm thử định nghĩa bước
pm.test('Spark step has valid args', () => {
const step = pm.request.body.toJSON().Steps[0]
const args = step.HadoopJarStep.Args
pm.expect(args[0]).to.eql('spark-submit')
pm.expect(args).to.include('--deploy-mode')
})
3. Biến môi trường
AWS_REGION: us-east-1
EMR_SERVICE_ROLE: EMR_DefaultRole
EMR_EC2_ROLE: EMR_EC2_DefaultRole
S3_LOG_BUCKET: my-emr-logs
S3_SCRIPTS_BUCKET: my-emr-scripts
Kiểm thử API AWS với Apidog - miễn phí
Các lỗi thường gặp và cách khắc phục
ValidationError: ServiceRole không hợp lệ
Nguyên nhân: Vai trò IAM không tồn tại hoặc chưa được cấu hình cho EMR.
Khắc phục: Tạo vai trò dịch vụ trong bảng điều khiển IAM hoặc sử dụng vai trò mặc định của AWS: EMR_DefaultRole_V2.
Không thể cung cấp các phiên bản EC2
Nguyên nhân: Loại phiên bản không có sẵn trong AZ của bạn, hoặc đã đạt giới hạn dịch vụ.
Khắc phục:
- Sử dụng hạm đội phiên bản với nhiều loại phiên bản khác nhau
- Yêu cầu tăng giới hạn
- Thử các loại phiên bản khác
Bước thất bại với mã thoát ứng dụng 1
Nguyên nhân: Công việc Spark/Hadoop thực tế đã thất bại.
Khắc phục: Kiểm tra nhật ký trong S3 (đường dẫn LogUri). Xem stderr và stdout cho bước đó.
Cụm bị kẹt ở trạng thái STARTING (Đang khởi động)
Nguyên nhân: Các hành động khởi động bị lỗi, hoặc vấn đề về quyền.
Khắc phục: Kiểm tra đầu ra console của phiên bản EC2. Xác minh quyền truy cập S3 cho các script khởi động.
Các giải pháp thay thế và so sánh
| Tính năng | AWS EMR | Google Dataproc | Azure HDInsight | Databricks |
|---|---|---|---|---|
| Hadoop/Spark được quản lý | ✓ | ✓ | ✓ | Chỉ Spark |
| Tích hợp AWS | Tuyệt vời | Hạn chế | Hạn chế | Tốt |
| Tùy chọn Serverless | EMR Serverless | Dataproc Serverless | Hạn chế | ✓ |
| Chi phí | Hỗ trợ Spot | VM có thể bị ngắt (Preemptible VMs) | Phiên bản Spot | Tốt |
| Hỗ trợ học máy (ML) | EMR Studio | Vertex AI | Synapse | Tích hợp MLflow |
EMR có sự tích hợp AWS sâu sắc nhất. Databricks có các công cụ Spark tốt hơn. Dataproc rẻ hơn cho người dùng GCP.
Các trường hợp sử dụng thực tế
ETL hồ dữ liệu. Một công ty bán lẻ xử lý dữ liệu bán hàng hàng ngày. Các cụm EMR nhập các tệp CSV từ S3, biến đổi bằng Spark và ghi dữ liệu Parquet vào hồ dữ liệu. Các cụm chạy 2 giờ mỗi ngày và chấm dứt.
Phân tích nhật ký. Một công ty SaaS xử lý nhật ký ứng dụng. Spark đọc nhật ký từ S3, tổng hợp các chỉ số và ghi vào kho dữ liệu. Tự động điều chỉnh quy mô (Auto-scaling) thêm các nút task trong thời gian nhật ký tăng cao.
Quy trình học máy. Một nhóm khoa học dữ liệu huấn luyện mô hình trên EMR. Spark đọc các đặc trưng từ S3, huấn luyện mô hình với MLlib và xuất sang SageMaker để triển khai.
Tổng kết
Đây là những gì bạn đã học được:
- Tạo cụm bằng API RunJobFlow
- Gửi các công việc dưới dạng các bước
- Sử dụng tự động điều chỉnh quy mô để tối ưu chi phí
- Giám sát bằng CloudWatch
- Tối ưu hóa chi phí với các phiên bản Spot và cụm tạm thời
Các bước tiếp theo của bạn:
- Thiết lập vai trò IAM cho EMR
- Tạo một cụm kiểm thử
- Gửi một công việc Spark đơn giản
- Xem lại nhật ký trong S3
- Thực hiện các chiến lược tiết kiệm chi phí
Kiểm thử API AWS với Apidog - miễn phí
button
Câu hỏi thường gặp
Sự khác biệt giữa các nút master, core và task là gì?
- Master: Chạy trình quản lý cụm (YARN ResourceManager, HDFS NameNode)
- Core: Chạy xử lý dữ liệu và lưu trữ dữ liệu HDFS
- Task: Chỉ chạy xử lý dữ liệu, không lưu trữ dữ liệu HDFS (tốt cho các phiên bản Spot)
Làm cách nào để SSH vào nút master?
aws emr ssh --cluster-id j-ABC123DEF456 --key-pair-file my-key.pem
Tôi có thể chạy Jupyter notebook trên EMR không?Có. Sử dụng EMR Studio hoặc bật ứng dụng JupyterHub. Hoặc sử dụng EMR Notebooks (Jupyter được quản lý).
EMR Serverless là gì?Một tùy chọn serverless cho phép bạn gửi các công việc Spark/Hive mà không cần quản lý cụm. Thanh toán theo mỗi lần chạy công việc. Tốt cho các khối lượng công việc không thường xuyên.
Làm cách nào để đọc từ DynamoDB?Sử dụng trình kết nối DynamoDB:
spark-submit --conf spark.hadoop.dynamodb.servicename=dynamodb \
--conf spark.hadoop.dynamodb.input.tableName=MyTable \
--conf spark.hadoop.dynamodb.output.tableName=MyTable \
--conf spark.hadoop.dynamodb.region=us-east-1 \
my-job.jar
Tôi nên sử dụng nhãn phát hành (release label) nào?Bản ổn định mới nhất (emr-7.x cho Spark 3.x). Sử dụng các phiên bản nhất quán giữa các môi trường. Kiểm tra khả năng tương thích của ứng dụng trong ghi chú phát hành.
Làm cách nào để khắc phục các bước bị lỗi?
- Kiểm tra trạng thái bước:
aws emr describe-step - Xem nhật ký trong S3:
s3://your-log-bucket/logs/j-ABC123/steps/s-DEF123/ - SSH vào master và kiểm tra
/mnt/var/log/
