TL;DR
AWS EMR (Elastic MapReduce) APIは、Hadoop、Spark、Hive、Prestoを実行するビッグデータクラスターを管理します。クラスターを作成し、ジョブをステップとして送信し、ワークロードに基づいて自動スケーリングを行い、完了したら終了します。認証にはAWS IAMを使用します。テストにはApidogを使用して、クラスター構成の検証、API構造に対するジョブ送信のテスト、データパイプラインのドキュメント化を行います。
はじめに
AWS EMR は、AWS上のマネージドHadoop/Sparkサービスです。分析、機械学習、ETLパイプラインのためにペタバイト規模のデータを処理します。独自のHadoopクラスターを管理する代わりに、AWSにインフラストラクチャを任せることができます。
EMRは、クラスター内のEC2インスタンスで実行されます。以下を指定します。
- インスタンスタイプ (マスター、コア、タスクノード)
- アプリケーション (Spark、Hadoop、Hive、Presto、HBase)
- ブートストラップアクション (セットアップスクリプト)
- ステップ (実行するジョブ)
EMR API を使用すると、これらすべてを自動化できます。プログラムでクラスターを作成し、ジョブを送信し、進捗を監視し、他のAWSサービスと統合することができます。
ApidogでAWS APIをテストする - 無料
このガイドを読み終えるまでに、次のことができるようになります。
- APIを介してEMRクラスターを作成および構成する
- ジョブをステップとして送信する
- 自動スケーリングを管理する
- クラスターの健全性とジョブの進捗を監視する
- インスタンスフリートとスポットインスタンスでコストを最適化する
AWSでの認証
EMRはIAMを使用した標準のAWS認証を使用します。
AWS SDKアプローチ (推奨)
import { EMRClient, RunJobFlowCommand } from '@aws-sdk/client-emr'
const client = new EMRClient({
region: 'us-east-1',
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
}
})
SigV4を使用した直接API
EMRにはAWS署名バージョン4が必要です。SDKまたはboto3、AWS CLIなどのツールを使用するか、手動で署名を生成します。
aws emr list-clusters --region us-east-1
IAM権限
EMR管理のための最小ポリシー:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticmapreduce:*",
"ec2:Describe*",
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": "*"
}
]
}
クラスターの作成
基本的なクラスター作成
aws emr create-cluster \
--name "My Spark Cluster" \
--release-label emr-7.0.0 \
--applications Name=Spark Name=Hadoop \
--instance-type m5.xlarge \
--instance-count 3 \
--service-role EMR_DefaultRole \
--job-flow-role EMR_EC2_DefaultRole
API経由 (RunJobFlow)
{
"Name": "Data Processing Cluster",
"ReleaseLabel": "emr-7.0.0",
"Applications": [
{ "Name": "Spark" },
{ "Name": "Hadoop" },
{ "Name": "Hive" }
],
"Instances": {
"MasterInstanceType": "m5.xlarge",
"SlaveInstanceType": "m5.xlarge",
"InstanceCount": 3,
"KeepJobFlowAliveWhenNoSteps": true,
"TerminationProtected": false
},
"Steps": [],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"LogUri": "s3://my-bucket/emr-logs/",
"Tags": [
{ "Key": "Environment", "Value": "Production" }
]
}
レスポンス:
{
"JobFlowId": "j-ABC123DEF456"
}
インスタンスグループとインスタンスフリート
インスタンスグループ: グループごとに固定されたインスタンスタイプ (マスター、コア、タスク)。
インスタンスフリート: グループごとに複数のインスタンスタイプ/オプション。EMRは可用性と価格に基づいて選択します。
{
"Instances": {
"InstanceFleets": [
{
"Name": "MasterFleet",
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.xlarge"
},
{
"InstanceType": "m4.xlarge"
}
]
},
{
"Name": "CoreFleet",
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 2,
"TargetSpotCapacity": 4,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.2xlarge"
},
{
"InstanceType": "m4.2xlarge"
}
],
"LaunchSpecifications": {
"SpotSpecification": {
"TimeoutDurationMinutes": 60,
"TimeoutAction": "SWITCH_TO_ON_DEMAND"
}
}
}
]
}
}
ジョブをステップとして送信する
EMRはジョブを「ステップ」として順番に実行します。
Sparkステップを追加する
aws emr add-steps \
--cluster-id j-ABC123DEF456 \
--steps '[
{
"Name": "Process Data",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--class",
"com.example.DataProcessor",
"s3://my-bucket/jars/processor.jar",
"s3://my-bucket/input/",
"s3://my-bucket/output/"
]
}
}
]'
API経由 (AddJobFlowSteps)
{
"JobFlowId": "j-ABC123DEF456",
"Steps": [
{
"Name": "Spark ETL Job",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--executor-memory",
"4g",
"--executor-cores",
"2",
"s3://my-bucket/scripts/process.py",
"--input",
"s3://my-bucket/input/",
"--output",
"s3://my-bucket/output/"
]
}
}
]
}
ActionOnFailureオプション
TERMINATE_CLUSTER: 失敗時にクラスターを停止CANCEL_AND_WAIT: 残りのステップをキャンセルし、クラスターは実行状態を維持CONTINUE: 次のステップに進む
Hiveステップ
{
"Name": "Hive Query",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"hive-script",
"--run-hive-script",
"--args",
"-f",
"s3://my-bucket/scripts/transform.q"
]
}
}
自動スケーリング
EMRは負荷に基づいてタスクノードを追加/削除できます。
自動スケーリングポリシーの作成
aws emr put-auto-scaling-policy \
--cluster-id j-ABC123DEF456 \
--instance-group-id ig-ABC123 \
--auto-scaling-policy '{
"Constraints": {
"MinCapacity": 2,
"MaxCapacity": 10
},
"Rules": [
{
"Name": "ScaleOut",
"Description": "Add nodes when memory is high",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": 2,
"CoolDown": 300
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"ComparisonOperator": "GREATER_THAN",
"EvaluationPeriods": 3,
"MetricName": "MemoryAvailableMB",
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"Threshold": 2000,
"Statistic": "AVERAGE"
}
}
}
]
}'
スケーリングのメトリクス
MemoryAvailableMB: 利用可能なメモリMemoryTotalMB: 総メモリHDFSUtilization: 使用済みHDFSスペースAppsRunning: 実行中のYARNアプリケーションAppsPending: 待機中のYARNアプリケーション
監視とログ記録
クラスターの一覧表示
aws emr list-clusters --states RUNNING
クラスターの記述
aws emr describe-cluster --cluster-id j-ABC123DEF456
レスポンスには以下が含まれます:
{
"Cluster": {
"Id": "j-ABC123DEF456",
"Name": "My Cluster",
"Status": {
"State": "RUNNING",
"StateChangeReason": {},
"Timeline": {
"CreationDateTime": "2026-03-24T10:00:00.000Z"
}
},
"Applications": [
{ "Name": "Spark", "Version": "3.5.0" }
],
"InstanceCollectionType": "INSTANCE_GROUP",
"LogUri": "s3://my-bucket/emr-logs/",
"MasterPublicDnsName": "ec2-12-34-56-78.compute-1.amazonaws.com"
}
}
ステップの一覧表示
aws emr list-steps --cluster-id j-ABC123DEF456
ステップステータス
{
"Id": "s-ABC123",
"Name": "Process Data",
"Status": {
"State": "COMPLETED",
"Timeline": {
"StartDateTime": "2026-03-24T10:05:00.000Z",
"EndDateTime": "2026-03-24T11:30:00.000Z"
}
}
}
CloudWatchとの統合
EMRはCloudWatchにメトリクスを公開します。
JobsFailedJobsRunningMemoryAvailableMBMemoryTotalMBHDFSUtilization
コスト最適化
スポットインスタンスの使用
タスクノードはスポットインスタンスに最適です。終了しても、ジョブは残りのノードで継続されます。
{
"Name": "TaskGroup",
"InstanceRole": "TASK",
"InstanceType": "m5.2xlarge",
"InstanceCount": 4,
"Market": "SPOT",
"BidPrice": "0.10"
}
一時的なクラスター
クラスターを作成し、ジョブを実行し、自動的に終了させます。
{
"KeepJobFlowAliveWhenNoSteps": false,
"Steps": [
{ ... step 1 ... },
{ ... step 2 ... }
]
}
すべてのステップが完了すると、クラスターは終了します。
複数のオプションを持つインスタンスフリート
EMRに利用可能な最も安価なものを選ばせます。
{
"InstanceTypeConfigs": [
{
"InstanceType": "m5.2xlarge",
"BidPrice": "0.15"
},
{
"InstanceType": "m4.2xlarge",
"BidPrice": "0.12"
},
{
"InstanceType": "c5.2xlarge",
"BidPrice": "0.10"
}
]
}
Apidogでのテスト
EMRクラスターは高価です。設定を慎重にテストしてください。

1. クラスター構成を検証する
Apidogにクラスターテンプレートを保存します。
pm.test('Cluster has required applications', () => {
const config = pm.request.body.toJSON()
const apps = config.Applications.map(a => a.Name)
pm.expect(apps).to.include('Spark')
})
pm.test('Instance types are valid', () => {
const config = pm.request.body.toJSON()
const types = ['m5.xlarge', 'm5.2xlarge', 'm4.xlarge']
pm.expect(types).to.include(config.Instances.MasterInstanceType)
})
2. ステップ定義をテストする
pm.test('Spark step has valid args', () => {
const step = pm.request.body.toJSON().Steps[0]
const args = step.HadoopJarStep.Args
pm.expect(args[0]).to.eql('spark-submit')
pm.expect(args).to.include('--deploy-mode')
})
3. 環境変数
AWS_REGION: us-east-1
EMR_SERVICE_ROLE: EMR_DefaultRole
EMR_EC2_ROLE: EMR_EC2_DefaultRole
S3_LOG_BUCKET: my-emr-logs
S3_SCRIPTS_BUCKET: my-emr-scripts
ApidogでAWS APIをテストする - 無料
一般的なエラーと修正
ValidationError: ServiceRoleが無効です
原因: IAMロールが存在しないか、EMR用に設定されていません。
修正: IAMコンソールでサービスロールを作成するか、AWSのデフォルト: EMR_DefaultRole_V2 を使用してください。
EC2インスタンスのプロビジョニングに失敗しました
原因: インスタンスタイプがAZで利用できないか、サービス制限に達しています。
修正:
- 複数のインスタンスタイプを持つインスタンスフリートを使用する
- 制限の引き上げをリクエストする
- 別のインスタンスタイプを試す
アプリケーション終了コード1でステップが失敗しました
原因: 実際のSpark/Hadoopジョブが失敗しました。
修正: S3 (LogUriパス) のログを確認してください。ステップのstderrとstdoutを見てください。
クラスターがSTARTING状態のまま動かない
原因: ブートストラップアクションの失敗、または権限の問題。
修正: EC2インスタンスのコンソール出力を確認してください。ブートストラップスクリプトのS3アクセスを検証してください。
代替案と比較
| 機能 | AWS EMR | Google Dataproc | Azure HDInsight | Databricks |
|---|---|---|---|---|
| マネージドHadoop/Spark | ✓ | ✓ | ✓ | Sparkのみ |
| AWS統合 | 優れている | 限定的 | 限定的 | 良好 |
| サーバーレスオプション | EMR Serverless | Dataproc Serverless | 限定的 | ✓ |
| コスト | スポットサポート | プリエンプティブVM | スポットインスタンス | 良好 |
| MLサポート | EMR Studio | Vertex AI | Synapse | MLflow組み込み |
EMRは最も深いAWS統合を持っています。Databricksはより良いSparkツールを備えています。DataprocはGCPユーザーにとって安価です。
実際のユースケース
データレイクETL。 小売企業は日次販売データを処理します。EMRクラスターはS3からCSVファイルを取り込み、Sparkで変換し、Parquetをデータレイクに書き込みます。クラスターは毎日2時間実行され、その後終了します。
ログ分析。 SaaS企業はアプリケーションログを処理します。SparkはS3からログを読み取り、メトリクスを集計し、データウェアハウスに書き込みます。自動スケーリングは、ログ量が多いピーク時にタスクノードを追加します。
機械学習パイプライン。 データサイエンスチームはEMRでモデルをトレーニングします。SparkはS3から特徴量を読み取り、MLlibでモデルをトレーニングし、サービスのためにSageMakerにエクスポートします。
まとめ
ここで学んだこと:
- RunJobFlow APIでクラスターを作成する
- ジョブをステップとして送信する
- コスト効率のために自動スケーリングを使用する
- CloudWatchで監視する
- スポットインスタンスと一時的なクラスターでコストを最適化する
次のステップ:
- EMR用のIAMロールを設定する
- テストクラスターを作成する
- 簡単なSparkジョブを送信する
- S3でログを確認する
- コスト削減戦略を実装する
ApidogでAWS APIをテストする - 無料
よくある質問
マスターノード、コアノード、タスクノードの違いは何ですか?
- マスター: クラスターマネージャー (YARN ResourceManager, HDFS NameNode) を実行します。
- コア: データ処理を実行し、HDFSデータを保存します。
- タスク: データ処理のみを実行し、HDFSデータは保存しません (スポットインスタンスに適しています)。
マスターノードにSSH接続するにはどうすればよいですか?
aws emr ssh --cluster-id j-ABC123DEF456 --key-pair-file my-key.pem
EMRでJupyterノートブックを実行できますか?はい。EMR Studioを使用するか、JupyterHubアプリケーションを有効にしてください。または、EMR Notebooks (マネージドJupyter) を使用することもできます。
EMR Serverlessとは何ですか?クラスターを管理することなくSpark/Hiveジョブを送信できるサーバーレスオプションです。ジョブ実行ごとに料金が発生します。散発的なワークロードに適しています。
DynamoDBから読み込むにはどうすればよいですか?DynamoDBコネクタを使用します。
spark-submit --conf spark.hadoop.dynamodb.servicename=dynamodb \
--conf spark.hadoop.dynamodb.input.tableName=MyTable \
--conf spark.hadoop.dynamodb.output.tableName=MyTable \
--conf spark.hadoop.dynamodb.region=us-east-1 \
my-job.jar
どのリリースラベルを使用すべきですか?最新の安定版 (Spark 3.xの場合はemr-7.x)。環境間で一貫したバージョンを使用してください。リリースノートでアプリケーションの互換性を確認してください。
失敗したステップのトラブルシューティングはどのように行いますか?
- ステップステータスを確認します:
aws emr describe-step - S3のログを表示します:
s3://your-log-bucket/logs/j-ABC123/steps/s-DEF123/ - マスターにSSH接続し、
/mnt/var/log/を確認します。
