EMR API の使い方

Ashley Innocent

Ashley Innocent

24 3月 2026

EMR API の使い方

Apidog エンタープライズ

オンプレミスデプロイ

SSO & RBAC

SOC 2 準拠

Apidog Enterpriseを見る

TL;DR

AWS EMR (Elastic MapReduce) APIは、Hadoop、Spark、Hive、Prestoを実行するビッグデータクラスターを管理します。クラスターを作成し、ジョブをステップとして送信し、ワークロードに基づいて自動スケーリングを行い、完了したら終了します。認証にはAWS IAMを使用します。テストにはApidogを使用して、クラスター構成の検証、API構造に対するジョブ送信のテスト、データパイプラインのドキュメント化を行います。

はじめに

AWS EMR は、AWS上のマネージドHadoop/Sparkサービスです。分析、機械学習、ETLパイプラインのためにペタバイト規模のデータを処理します。独自のHadoopクラスターを管理する代わりに、AWSにインフラストラクチャを任せることができます。

EMRは、クラスター内のEC2インスタンスで実行されます。以下を指定します。

EMR API を使用すると、これらすべてを自動化できます。プログラムでクラスターを作成し、ジョブを送信し、進捗を監視し、他のAWSサービスと統合することができます。

💡
データパイプラインを構築している場合、Apidogは、高額なデータ処理ジョブを実行する前に、クラスター構成をテストし、ジョブ定義を検証し、EMRワークフローを文書化するのに役立ちます。
ボタン

ApidogでAWS APIをテストする - 無料

このガイドを読み終えるまでに、次のことができるようになります。

AWSでの認証

EMRはIAMを使用した標準のAWS認証を使用します。

AWS SDKアプローチ (推奨)

import { EMRClient, RunJobFlowCommand } from '@aws-sdk/client-emr'

const client = new EMRClient({
  region: 'us-east-1',
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
  }
})

SigV4を使用した直接API

EMRにはAWS署名バージョン4が必要です。SDKまたはboto3、AWS CLIなどのツールを使用するか、手動で署名を生成します。

aws emr list-clusters --region us-east-1

IAM権限

EMR管理のための最小ポリシー:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "elasticmapreduce:*",
        "ec2:Describe*",
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket"
      ],
      "Resource": "*"
    }
  ]
}

クラスターの作成

基本的なクラスター作成

aws emr create-cluster \
  --name "My Spark Cluster" \
  --release-label emr-7.0.0 \
  --applications Name=Spark Name=Hadoop \
  --instance-type m5.xlarge \
  --instance-count 3 \
  --service-role EMR_DefaultRole \
  --job-flow-role EMR_EC2_DefaultRole

API経由 (RunJobFlow)

{
  "Name": "Data Processing Cluster",
  "ReleaseLabel": "emr-7.0.0",
  "Applications": [
    { "Name": "Spark" },
    { "Name": "Hadoop" },
    { "Name": "Hive" }
  ],
  "Instances": {
    "MasterInstanceType": "m5.xlarge",
    "SlaveInstanceType": "m5.xlarge",
    "InstanceCount": 3,
    "KeepJobFlowAliveWhenNoSteps": true,
    "TerminationProtected": false
  },
  "Steps": [],
  "ServiceRole": "EMR_DefaultRole",
  "JobFlowRole": "EMR_EC2_DefaultRole",
  "LogUri": "s3://my-bucket/emr-logs/",
  "Tags": [
    { "Key": "Environment", "Value": "Production" }
  ]
}

レスポンス:

{
  "JobFlowId": "j-ABC123DEF456"
}

インスタンスグループとインスタンスフリート

インスタンスグループ: グループごとに固定されたインスタンスタイプ (マスター、コア、タスク)。

インスタンスフリート: グループごとに複数のインスタンスタイプ/オプション。EMRは可用性と価格に基づいて選択します。

{
  "Instances": {
    "InstanceFleets": [
      {
        "Name": "MasterFleet",
        "InstanceFleetType": "MASTER",
        "TargetOnDemandCapacity": 1,
        "InstanceTypeConfigs": [
          {
            "InstanceType": "m5.xlarge"
          },
          {
            "InstanceType": "m4.xlarge"
          }
        ]
      },
      {
        "Name": "CoreFleet",
        "InstanceFleetType": "CORE",
        "TargetOnDemandCapacity": 2,
        "TargetSpotCapacity": 4,
        "InstanceTypeConfigs": [
          {
            "InstanceType": "m5.2xlarge"
          },
          {
            "InstanceType": "m4.2xlarge"
          }
        ],
        "LaunchSpecifications": {
          "SpotSpecification": {
            "TimeoutDurationMinutes": 60,
            "TimeoutAction": "SWITCH_TO_ON_DEMAND"
          }
        }
      }
    ]
  }
}

ジョブをステップとして送信する

EMRはジョブを「ステップ」として順番に実行します。

Sparkステップを追加する

aws emr add-steps \
  --cluster-id j-ABC123DEF456 \
  --steps '[
    {
      "Name": "Process Data",
      "ActionOnFailure": "CONTINUE",
      "HadoopJarStep": {
        "Jar": "command-runner.jar",
        "Args": [
          "spark-submit",
          "--deploy-mode",
          "cluster",
          "--class",
          "com.example.DataProcessor",
          "s3://my-bucket/jars/processor.jar",
          "s3://my-bucket/input/",
          "s3://my-bucket/output/"
        ]
      }
    }
  ]'

API経由 (AddJobFlowSteps)

{
  "JobFlowId": "j-ABC123DEF456",
  "Steps": [
    {
      "Name": "Spark ETL Job",
      "ActionOnFailure": "CONTINUE",
      "HadoopJarStep": {
        "Jar": "command-runner.jar",
        "Args": [
          "spark-submit",
          "--executor-memory",
          "4g",
          "--executor-cores",
          "2",
          "s3://my-bucket/scripts/process.py",
          "--input",
          "s3://my-bucket/input/",
          "--output",
          "s3://my-bucket/output/"
        ]
      }
    }
  ]
}

ActionOnFailureオプション

Hiveステップ

{
  "Name": "Hive Query",
  "HadoopJarStep": {
    "Jar": "command-runner.jar",
    "Args": [
      "hive-script",
      "--run-hive-script",
      "--args",
      "-f",
      "s3://my-bucket/scripts/transform.q"
    ]
  }
}

自動スケーリング

EMRは負荷に基づいてタスクノードを追加/削除できます。

自動スケーリングポリシーの作成

aws emr put-auto-scaling-policy \
  --cluster-id j-ABC123DEF456 \
  --instance-group-id ig-ABC123 \
  --auto-scaling-policy '{
    "Constraints": {
      "MinCapacity": 2,
      "MaxCapacity": 10
    },
    "Rules": [
      {
        "Name": "ScaleOut",
        "Description": "Add nodes when memory is high",
        "Action": {
          "SimpleScalingPolicyConfiguration": {
            "AdjustmentType": "CHANGE_IN_CAPACITY",
            "ScalingAdjustment": 2,
            "CoolDown": 300
          }
        },
        "Trigger": {
          "CloudWatchAlarmDefinition": {
            "ComparisonOperator": "GREATER_THAN",
            "EvaluationPeriods": 3,
            "MetricName": "MemoryAvailableMB",
            "Namespace": "AWS/ElasticMapReduce",
            "Period": 300,
            "Threshold": 2000,
            "Statistic": "AVERAGE"
          }
        }
      }
    ]
  }'

スケーリングのメトリクス

監視とログ記録

クラスターの一覧表示

aws emr list-clusters --states RUNNING

クラスターの記述

aws emr describe-cluster --cluster-id j-ABC123DEF456

レスポンスには以下が含まれます:

{
  "Cluster": {
    "Id": "j-ABC123DEF456",
    "Name": "My Cluster",
    "Status": {
      "State": "RUNNING",
      "StateChangeReason": {},
      "Timeline": {
        "CreationDateTime": "2026-03-24T10:00:00.000Z"
      }
    },
    "Applications": [
      { "Name": "Spark", "Version": "3.5.0" }
    ],
    "InstanceCollectionType": "INSTANCE_GROUP",
    "LogUri": "s3://my-bucket/emr-logs/",
    "MasterPublicDnsName": "ec2-12-34-56-78.compute-1.amazonaws.com"
  }
}

ステップの一覧表示

aws emr list-steps --cluster-id j-ABC123DEF456

ステップステータス

{
  "Id": "s-ABC123",
  "Name": "Process Data",
  "Status": {
    "State": "COMPLETED",
    "Timeline": {
      "StartDateTime": "2026-03-24T10:05:00.000Z",
      "EndDateTime": "2026-03-24T11:30:00.000Z"
    }
  }
}

CloudWatchとの統合

EMRはCloudWatchにメトリクスを公開します。

コスト最適化

スポットインスタンスの使用

タスクノードはスポットインスタンスに最適です。終了しても、ジョブは残りのノードで継続されます。

{
  "Name": "TaskGroup",
  "InstanceRole": "TASK",
  "InstanceType": "m5.2xlarge",
  "InstanceCount": 4,
  "Market": "SPOT",
  "BidPrice": "0.10"
}

一時的なクラスター

クラスターを作成し、ジョブを実行し、自動的に終了させます。

{
  "KeepJobFlowAliveWhenNoSteps": false,
  "Steps": [
    { ... step 1 ... },
    { ... step 2 ... }
  ]
}

すべてのステップが完了すると、クラスターは終了します。

複数のオプションを持つインスタンスフリート

EMRに利用可能な最も安価なものを選ばせます。

{
  "InstanceTypeConfigs": [
    {
      "InstanceType": "m5.2xlarge",
      "BidPrice": "0.15"
    },
    {
      "InstanceType": "m4.2xlarge",
      "BidPrice": "0.12"
    },
    {
      "InstanceType": "c5.2xlarge",
      "BidPrice": "0.10"
    }
  ]
}

Apidogでのテスト

EMRクラスターは高価です。設定を慎重にテストしてください。

EMR APIテストツール

1. クラスター構成を検証する

Apidogにクラスターテンプレートを保存します。

pm.test('Cluster has required applications', () => {
  const config = pm.request.body.toJSON()
  const apps = config.Applications.map(a => a.Name)
  pm.expect(apps).to.include('Spark')
})

pm.test('Instance types are valid', () => {
  const config = pm.request.body.toJSON()
  const types = ['m5.xlarge', 'm5.2xlarge', 'm4.xlarge']
  pm.expect(types).to.include(config.Instances.MasterInstanceType)
})

2. ステップ定義をテストする

pm.test('Spark step has valid args', () => {
  const step = pm.request.body.toJSON().Steps[0]
  const args = step.HadoopJarStep.Args
  pm.expect(args[0]).to.eql('spark-submit')
  pm.expect(args).to.include('--deploy-mode')
})

3. 環境変数

AWS_REGION: us-east-1
EMR_SERVICE_ROLE: EMR_DefaultRole
EMR_EC2_ROLE: EMR_EC2_DefaultRole
S3_LOG_BUCKET: my-emr-logs
S3_SCRIPTS_BUCKET: my-emr-scripts

ApidogでAWS APIをテストする - 無料

一般的なエラーと修正

ValidationError: ServiceRoleが無効です

原因: IAMロールが存在しないか、EMR用に設定されていません。

修正: IAMコンソールでサービスロールを作成するか、AWSのデフォルト: EMR_DefaultRole_V2 を使用してください。

EC2インスタンスのプロビジョニングに失敗しました

原因: インスタンスタイプがAZで利用できないか、サービス制限に達しています。

修正:

アプリケーション終了コード1でステップが失敗しました

原因: 実際のSpark/Hadoopジョブが失敗しました。

修正: S3 (LogUriパス) のログを確認してください。ステップのstderrstdoutを見てください。

クラスターがSTARTING状態のまま動かない

原因: ブートストラップアクションの失敗、または権限の問題。

修正: EC2インスタンスのコンソール出力を確認してください。ブートストラップスクリプトのS3アクセスを検証してください。

代替案と比較

機能 AWS EMR Google Dataproc Azure HDInsight Databricks
マネージドHadoop/Spark Sparkのみ
AWS統合 優れている 限定的 限定的 良好
サーバーレスオプション EMR Serverless Dataproc Serverless 限定的
コスト スポットサポート プリエンプティブVM スポットインスタンス 良好
MLサポート EMR Studio Vertex AI Synapse MLflow組み込み

EMRは最も深いAWS統合を持っています。Databricksはより良いSparkツールを備えています。DataprocはGCPユーザーにとって安価です。

実際のユースケース

データレイクETL。 小売企業は日次販売データを処理します。EMRクラスターはS3からCSVファイルを取り込み、Sparkで変換し、Parquetをデータレイクに書き込みます。クラスターは毎日2時間実行され、その後終了します。

ログ分析。 SaaS企業はアプリケーションログを処理します。SparkはS3からログを読み取り、メトリクスを集計し、データウェアハウスに書き込みます。自動スケーリングは、ログ量が多いピーク時にタスクノードを追加します。

機械学習パイプライン。 データサイエンスチームはEMRでモデルをトレーニングします。SparkはS3から特徴量を読み取り、MLlibでモデルをトレーニングし、サービスのためにSageMakerにエクスポートします。

まとめ

ここで学んだこと:

次のステップ:

  1. EMR用のIAMロールを設定する
  2. テストクラスターを作成する
  3. 簡単なSparkジョブを送信する
  4. S3でログを確認する
  5. コスト削減戦略を実装する

ApidogでAWS APIをテストする - 無料

ボタン

よくある質問

マスターノード、コアノード、タスクノードの違いは何ですか?

マスターノードにSSH接続するにはどうすればよいですか?

aws emr ssh --cluster-id j-ABC123DEF456 --key-pair-file my-key.pem

EMRでJupyterノートブックを実行できますか?はい。EMR Studioを使用するか、JupyterHubアプリケーションを有効にしてください。または、EMR Notebooks (マネージドJupyter) を使用することもできます。

EMR Serverlessとは何ですか?クラスターを管理することなくSpark/Hiveジョブを送信できるサーバーレスオプションです。ジョブ実行ごとに料金が発生します。散発的なワークロードに適しています。

DynamoDBから読み込むにはどうすればよいですか?DynamoDBコネクタを使用します。

spark-submit --conf spark.hadoop.dynamodb.servicename=dynamodb \
  --conf spark.hadoop.dynamodb.input.tableName=MyTable \
  --conf spark.hadoop.dynamodb.output.tableName=MyTable \
  --conf spark.hadoop.dynamodb.region=us-east-1 \
  my-job.jar

どのリリースラベルを使用すべきですか?最新の安定版 (Spark 3.xの場合はemr-7.x)。環境間で一貫したバージョンを使用してください。リリースノートでアプリケーションの互換性を確認してください。

失敗したステップのトラブルシューティングはどのように行いますか?

  1. ステップステータスを確認します: aws emr describe-step
  2. S3のログを表示します: s3://your-log-bucket/logs/j-ABC123/steps/s-DEF123/
  3. マスターにSSH接続し、/mnt/var/log/を確認します。

ApidogでAPIデザイン中心のアプローチを取る

APIの開発と利用をよりシンプルなことにする方法を発見できる