AWS Glueのジョブ監視

先日、CloudWatch Eventを確認するとGlueも追加されていたのでジョブの監視をやってみました。

検証構成

f:id:cloudfish:20180814085941p:plain
CloudWatch EventでGlueのJobステータスがSucceededもしくはFailedになればSlackに通知しています。
現状、特定ジョブやイベント(Failedのみなど)のみ通知できないので、フィルタをかけれるようにLambdaを利用することにしました。特定イベントのみ通知できるようになればSNSも使えそうです。
今回はLambdaを利用しているので、Slackだけでなく他のAPIを呼び出すなども可能です。

手順

Slack通知用Lambdaの作成

事前にKMSのキーとSlackのチャンネルに通知するためのIncoming Webhookのエンドポイントを作成しておいてください。
「Lambda」→「関数の作成」をクリックし「一から作成」を選択して以下を入力します。
名前:GlueMonitorSlackNotification
ランタイム:Python3.6
ロール:テンプレートから新しいロールを作成
ロール名:lambda-glue-role
ポリシーテンプレート:KMSの復号化アクセス権限
f:id:cloudfish:20180814092416p:plain

プログラムは以下に置き換えます。

#coding:utf-8
from __future__ import print_function

import json
import urllib.parse
import boto3
import logging
import os
import sys

from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

logger = logging.getLogger()
logger.setLevel(logging.INFO)
SLACK_CONFIG = {}

def decrypt_environment(encrypt_environment_key):
    encrypted_value = os.environ[encrypt_environment_key]
    return boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_value))['Plaintext'].decode()

def init():
    global SLACK_CONFIG
    SLACK_CONFIG = {
        "END_POINT": decrypt_environment('SlACK_END_POINT'),
        "ICON": ":aws:",
        "USERNAME": 'GlueJobBot'
    }
    
def post_message(event):
    post_message = {}
    post_message["icon_emoji"] = SLACK_CONFIG['ICON']
    post_message["username"] = SLACK_CONFIG['USERNAME']
    post_message["attachments"]= [0] * 1
    post_message["attachments"][0]= {}
    post_message["attachments"][0]["fallback"] = 'Glue Jobが実行しました'
    post_message["attachments"][0]["color"] = '#36a64f'
    if event['detail']['state'] == 'FAILED':
       post_message["attachments"][0]["color"] = '#E51616'
    post_message["attachments"][0]["pretext"] = ''
    post_message["attachments"][0]["author_name"] = ''
    post_message["attachments"][0]["author_link"] = ''
    post_message["attachments"][0]["title"] = 'Glue Job ' + event['detail']['jobName'] + ":" + event['detail']['state']
    post_message["attachments"][0]["text"] = 'JobID:' + event['detail']['jobRunId']

    payload_json = json.dumps(post_message)
    data = urllib.parse.urlencode({"payload": payload_json})

    req = Request(SLACK_CONFIG['END_POINT'], data.encode('utf-8'))
    try:
        response = urlopen(req)
        response.read()
        logger.info("Message posted!! %s", post_message["attachments"][0]["title"])
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)

def lambda_handler(event, context):
    init()
    post_message(event)
    

関数を作成後、環境変数にSlACK_END_POINTとIncoming Webhookのエンドポイントを入力します。
「伝送中の暗号化のためのヘルパーの有効化」をチェックし、「伝送中に暗号化する KMS キー」に作成したKMSキーを選択して、環境変数欄にある「暗号化」をクリックします。
f:id:cloudfish:20180814124215p:plain

CloudWatch Eventの作成

サービス名:Glue
イベントタイプ:Glue Job State Change
ターゲット:Lambda関数
機能:GlueMonitorSlackNotification ←作成したLambda関数を選択
f:id:cloudfish:20180814125109p:plain

Glue Jobの作成

作成済みのジョブがあればそれを利用してください。
なければ、左メニュー下のチュートリアルのジョブの追加に従ってジョブを追加してください。

実行確認

準備ができればジョブを実行し、Slackの該当チャンネルに通知されているか確認します。
ジョブが成功した時
f:id:cloudfish:20180814130323p:plain
ジョブが失敗した時
f:id:cloudfish:20180814130330p:plain
正しく設定できていれば上記のように通知されるはずです。
現時点でCloudWatch Eventではジョブの指定や失敗時のみなどの細かい設定ができないため、Lambda側でフィルタして通知する必要があります。
これでジョブの成功失敗を監視することができそうです。

Glueのメトリクス

また、Glueにジョブのプロファイリング機能が追加されています。
ジョブのプロファイリング機能を有効にすると以下のメトリクスが取得できます。以下は簡単ですが日本語に訳した内容となります(By Google翻訳
各メトリクスの詳細については、以下を参照してください。
Monitoring AWS Glue Using CloudWatch Metrics - AWS Glue
これらのメトリクスはジョブの中長期的な処理傾向に問題がないかどうかを確認できると思うので、ジョブの内容に合わせて必要なメトリクスを定期的にモニタリングもしくは監視していけばいいかと思います。

* メトリクス * 詳細
glue.driver.aggregate.bytesRead すべてのエグゼキュータで実行されているすべての完了したSparkタスクによって、すべてのデータソースから読み取られたバイト数。
glue.driver.aggregate.elapsedTime ETL経過時間(ミリ秒単位)(ジョブのブートストラップ時間は含まれません)
glue.driver.aggregate.numCompletedStages ジョブの完了段階の数
glue.driver.aggregate.numCompletedTasks ジョブ内で完了したタスクの数
glue.driver.aggregate.numFailedTasks 失敗したタスクの数
glue.driver.aggregate.numKilledTasks killされたタスクの数
glue.driver.aggregate.recordsRead すべてのエグゼキュータで実行されているすべての完了したSparkタスクによって、すべてのデータソースから読み取られたレコードの数
glue.driver.aggregate.shuffleBytesWritten 以前のレポート(AWS Glue Metrics Dashboardによって集計されたデータで、前の1分間にこの目的で書き込まれたバイト数として集計されたもの)以降、すべてのエグゼキュータがデータをシャッフルするために書き込んだバイト数
glue.driver.aggregate.shuffleLocalBytesRead 以前のレポート(AWS Glue Metrics Dashboardによって集計されたバイト数)で、すべてのエグゼキュータがそれらの間でデータをシャッフルするために読み込んだバイト数
glue.driver.BlockManager.disk.diskSpaceUsed_MB すべてのエグゼキュータで使用されるディスク容量のメガバイト
glue.driver.ExecutorAllocationManager.executors.numberAllExecutors 実行中のジョブ実行者の数
glue.driver.ExecutorAllocationManager.executors.numberMaxNeededExecutors 現在の負荷を満たすために必要な最大(実行中のジョブおよび実行中のジョブ)エグゼキュータの数
glue.driver.jvm.heap.usage
glue.executorId.jvm.heap.usage
glue.ALL.jvm.heap.usage
ドライバ、executorIdによって識別されるエグゼキュータ、またはすべてのエグゼキュータのJVMヒープによって使用されるメモリの割合(スケール:0-1)
glue.driver.jvm.heap.used
glue.executorId.jvm.heap.used
glue.ALL.jvm.heap.used
ドライバのJVMヒープで使用されるメモリバイト数、executorIdによって識別されるエグゼキュータ、またはすべてのエグゼキュータ
glue.driver.s3.filesystem.read_bytes
glue.executorId.s3.filesystem.read_bytes
glue.ALL.s3.filesystem.read_bytes
ドライバによってAmazon S3から読み込まれたバイト数、executorIdによって識別されたエグゼキュータ、または前回のレポート以降のすべてのエグゼキュータ(AWS Glue Metrics Dashboardによって前の1分間に読み取られたバイト数として集計)
glue.driver.s3.filesystem.write_bytes
glue.executorId.s3.filesystem.write_bytes
glue.ALL.s3.filesystem.write_bytes
以前のレポート(AWS Glue Metrics Dashboardによって前の1分間に書き込まれたバイト数で集計されたもの)以来、ドライバ、ExecutorIdによって識別されたエグゼキュータ、またはすべてのエグゼキュータによってAmazon S3に書き込まれたバイト数
glue.driver.system.cpuSystemLoad
glue.executorId.system.cpuSystemLoad
glue.ALL.system.cpuSystemLoad
ドライバによって使用されたCPUシステム負荷の割合(スケール:0-1)、executorIdによって識別される実行プログラム、またはすべての実行プログラム

まとめ

Glueが出た当初は、CloudWatch EventからGlueのステータス変更は取得できなかったと思うので、ステータス取得用のLambdaで別途作る必要がありましたが、CloudWatch Eventから拾えるようになったのでジョブの成功・失敗が拾いやすくなりました。もう少しCloudWatch Event側でジョブやステータスの指定など細かく制御できると失敗のみSNSで通知するなどができるので手軽にできるのですが今後の改善に期待です。
また、CloudWatchメトリクスも提供されましたので、これらも含めてGlueのジョブ監視を行なうことができそうですね。

AWS GlueでAurora Serverlessを利用する

Aurora Serverlessが一般リリースされましたね。みなさん色々ブログを書かれていますので流行りにのってGlueからもAurora Serverlessを利用できるか検証してみました。

検証構成

f:id:cloudfish:20180814150635p:plain
今回は上記のような構成で、Auroraからデータ取得してS3に出力してみました。
Aurora ServerlessにはMySQLで提供されているサンプルデータのworldデータベースを取り込んで試してみたいと思います。

Aurora Serverlessの準備

起動

Aurora Serverlessの起動方法は、以下のブログなどを参照して作成してください。
また、他にも色々と情報がでていますので、ググってみてください。
aws.amazon.com

データ準備

Aurora Serverlessはパブリック接続ができないようなので、VPC内にEC2を立ててそこからサンプルデータを取り込みます。

wget http://downloads.mysql.com/docs/world.sql.zip
unzip world.sql.zip
mysql -u ユーザー名 -p -h クラスターエンドポイント < world.sql

#データが作成されているか確認。以下のテーブルが作成されていればOKです。
mysql -u ユーザー名 -p -h クラスターエンドポイント < world.sql
mysql>  use world;
mysql>  show tables;
+-----------------+
| Tables_in_world |
+-----------------+
| city            |
| country         |
| countrylanguage |
+-----------------+

Glueの設定

接続設定

左メニューの「接続」→「接続の追加」をクリックし、以下の設定で接続を作成します。

接続名 任意(aurora-jdbc)
接続タイプ JDBC

Aurora Serverlessは接続タイプ「Amazon RDS」からはまだ選択できないのでJDBCを利用します。
「次へ」をクリック

JDBC URL jdbc:mysql://[クラスターエンドポイント]:3306/world
ユーザー名 Auroraで設定したユーザー名
パスワード Auroraで設定したパスワード
VPC Auroraと同じVPCを設定
サブネット Auroraと通信が可能なサブネットを設定
セキュリティグループ JDBC用のセキュリティグループを作成して割当

確認画面で設定内容が問題ないか確認し「完了」ボタンをクリックしてください。
※上記のセキュリティグループからAurora Serverlessへ3306ポートで通信が可能なようにAurora Serverless側のセキュリティグループを設定しておいてください。

クローラー設定

左メニューの「クローラー」→「クローラの追加」をクリックし、以下の設定でクローラーを追加してください。

クローラの名前 任意(aurora-serverless-crawler)

「次へ」をクリック

Choose a data store JDBC
接続 上記で設定した接続名を選択(aurora-jdbc)
インクルードパス world/%

「次へ」をクリック
「別のデータストアの追加」はそのままで次へ

IAMロール Glue用のロールを指定

※事前に作成していなければ、 こちらを参照し作成してください。
「このクローラのスケジュールを設定する」もそのままで次へ
「データベースの追加」をクリックし

データベース名 任意の名前(world_out)
テーブルに追加されたプレフィックス 入力しない

入力内容を確認し「完了」をクリックします。

クローラーを実行

クローラーが作成されたらクローラーを実行します。
正常終了すると作成したデータベースに以下のテーブルが追加されているはずです。

  • world_city
  • world_country
  • world_countrylanguage

エラーとなる場合は、Auroraへの疎通設定がうまくできていないためだと思いますのでセキュリティグループなどの設定を見直して再実行してください。

ジョブ設定

左メニューの「ジョブ」→「ジョブの追加」をクリックし、以下の設定でジョブを追加してください。

名前 任意(aurora-serverless-job)
IAMロール Glue用のロールを指定

※事前に作成していなければ、 こちらを参照し作成してください。
他はデフォルトのままで「次へ」をクリック

データソース world_city

「次へ」をクリック

「データターゲットでテーブルを作成する」をチェックし

データストア Amazon S3
形式 CSV
ターゲットパス 任意のS3のパス

「次へ」をクリック
「ソース列をターゲット列にマッピングします」もデフォルトのまま「次へ」をクリック
確認画面で設定内容を確認し「ジョブを保存してスクリプトを編集する」をクリックします。
スクリプトについては特に編集は行いません。

実行確認

左メニューの「ジョブ」を選択し、上記で作成したジョブを選択します。
選択後に「アクション」メニューから「ジョブの実行」をクリックしジョブが終了するのを待ちます。
正常に実行されれば実行ステータスが「Succeeded」となりますので、S3にファイルが出力されていることを確認してください。

まとめ

実際のケースではGlueからAurora Serverlessを利用することはあまりないかもしれませんが、問題なく利用できることは分かりましたので何か機会があれば利用してみたいと思います。

ELBがある時・ない時

AWSにおいてELBは気軽に利用できるサービスになっていますが、意外と意識されていないのではないでしょうか?。ELBはロードバランサとしての役割だけでなくSSL証明書のターミネーションやアクセス状況など各種メトリクス確認できることのほかにEC2間を疎結合にできるなど様々なメリットがあり、AWSでインフラ構成を検討する際には必ずといっていいほど出てくるものとなります。
今回はELBがある時とない時を比較し、ELBを使うことでどのようなメリットがあるのかみていきたいと思います。

前提

http(s)でアクセスする一般的なwebシステムを前提とします。
ここで使うELBとはClassic Load Balancerを想定しています。

ケース1

EC2が1台のみのWebサーバ構成において考えてみたいと思います。

ELBがない時

構成図
f:id:cloudfish:20180308230559p:plain

上記構成の場合、Route53(DNS設定)には以下のようなレコードの登録が必要となります。

Name Type Value
example.com A xxx.xxx.xxx.xxx

1台構成なのでELBが無くてもよいのですが、ELBが無いとどういったメリット・デメリットがあるか考えてみます。

メリット
  • ELBの利用料が不要
デメリット

 SSL証明書の終端をWebサーバで対応する必要があります。WebサーバにSSL証明書を新規に設定もしくは更新する場合、かなり手間になります。またopensslの脆弱性が出た場合なども対応する必要がある。

  • アクセス状況などの各種メトリクス確認

 サイトが高負荷になった場合や、アクセス状況を確認したい場合などに、Webサーバのアクセスログを集計して確認する必要があるため、即座に状況判断ができない。

  • 拡張性

 Webサーバを複数台に拡張する場合、大きな構成変更を伴う。
①ELBを追加するケース
 変更点:ELBの追加、EC2の追加、DNSの変更
DNSラウンドロビン
 変更点:EC2の追加、DNSの変更

ELBがある時

構成図
f:id:cloudfish:20180314092135p:plain

Route53レコード

Name Type Value
example.com A example-com-elb-123456789.ap-northeast-1.elb.amazonaws.com.

上記のような構成において、それぞれどのようなメリットがあるか考えてみましょう
EC2が1台の構成ですが、DNSからは直接EC2ではなくELBを指す形となっています。

メリット

 SSL証明書の終端をELBで対応してくれます。SSL証明書の設定や更新も手軽に実施できます。
ELB側に脆弱性が発見された場合においてもAWS側で対応してくれるので利用者側で意識する必要がありません。
またACMAWS Certificate Manager)といったAWSから提供されている無料の証明書の利用も可能です。

  • アクセス状況などの各種メトリクス確認

 サイトが高負荷になった場合やアクセス状況を確認したい場合において、ELBのメトリクスでリクエストの状況が確認できるため、アクセス状況の把握が可能

  • 拡張性

 Webサーバを複数台に拡張する場合、大きな構成変更が不要。
EC2を複製し、ELB配下にアタッチするだけで拡張が可能。(アプリが冗長構成に対応できている前提)

デメリット
  • ELBの利用料が必要

ケース2

次に以下のようなWeb層とAP層に分かれた構成で考えてみたいと思います。
比較ポイントはWeb層とAP層の間に内部ELBを配置する場合としない場合で比較してみます。

ELBがない時

構成図
f:id:cloudfish:20180617125309p:plain

メリット
  • ELBの利用料が不要
デメリット
  • APサーバへの接続

 WebサーバーはAPサーバのIPを意識する必要があり、IPが変更された場合の柔軟性がない

  • 拡張性

APサーバが拡張、縮小された場合、Webサーバー2台ともにAPサーバの接続先を追加する必要がありメンテナンス性に欠ける

ELBがある時

構成図
f:id:cloudfish:20180617125348p:plain

メリット
  • APサーバへの接続

WebサーバはAPへの接続先としてELBに接続するだけでよい

  • 拡張性

APサーバが拡張、縮小された場合、Webサーバ側の変更は不要

デメリット
  • ELBの利用料が必要

ケース2ではPrivateHostedZoneを利用してWeb層とAP層をさらに疎結合にすることも可能ですが、その反面、構成が分かりにくくなるというデメリットもあるのでそれらを踏まえて検討してください。

まとめ

ELBのある時とない時を比較してみましたがいかがでしょうか?
ELBを利用する際のデメリットとしてはほぼ費用面のみかと思いますが、月額については1000-2000円程度なので運用面でメリットを考えるとかなり安いのではないでしょうか?
個人的には特殊な要件がない限り、シングル構成であってもELBは必ず利用するようにしています。
みなさんも是非利用を検討してみてください。

CodeBuildでDockerイメージのマルチステージビルド

Dockerでマルチステージビルドという機能を知ったので検証がてらCodeBuildで試してみました。
マルチステージビルドとは、例えばjavaアプリケーションにおいて、ビルドについてはjdkが入ったイメージを利用してビルドを行い、ビルドされたバイナリだけをjreが入ったイメージにコピーしてDockerイメージを作成することをDockerイメージのビルド時にできる機能となります。こうすることで簡単に実行するDockerイメージを小さくすることが可能となります。この機能はDokcerの17.05以降で利用可能となっています。

検証内容

Javaアプリをmavenがインストールされているコンテナでビルドして、jreがインストールされているコンテナをベースにイメージを作成しECRにプッシュします。
構成イメージは以下になります。
f:id:cloudfish:20180809094159p:plain

ECRの作成

イメージプッシュ用のリポジトリを作成します。
AWSコンソールの「Elastic Container Service」→「リポジトリ」から「リポジトリの作成」をクリックし、リポジトリ名を入力して リポジトリを作成します。ここでは「multistage-ecr」という名前で作成しました。

CodeCommitの作成

AWSコンソールの「CodeCommit」→「リポジトリの作成」をクリックし、リポジトリ名を入力してリポジトリを作成ます。ここでは「multistage-test」という名前で作成しました。

ビルド用リソースの作成

以下のリソースを作成し、作成したCodeCommitのリポジトリにコミットしてください。

├── ./Dockerfile
├── ./buildspec.yml
├── ./pom.xml
└── ./src
    └── ./src/hoge
        ├── ./src/hoge/Main.class
        └── ./src/hoge/Main.java

GitHub - cloudfish7/multi-stage-build-for-codebuildに一式配置していますのでここからもDLできます。

Dockerfile

# ビルド用コンテナでjavaをコンパイル。build1と名前を付けて後続で利用
FROM maven:3.3.9-jdk-8 AS build1
RUN mkdir -p /opt/java/src
ADD ./pom.xml /opt/java/
ADD ./src /opt/java/src
RUN cd /opt/java && mvn install

# jreがインストールされたイメージにビルド用コンテナから作成したjarファイルをコピー
FROM openjdk:8u131-jre-alpine
RUN mkdir -p /opt/app/
COPY --from=build1 /opt/java/target/ /opt/app/

RUN  java -jar /opt/app/HelloWorld-1.0.jar

buildspec.yml

dockerイメージをビルドしてECRにプッシュします。
以下をセットして
{REPO_NAME}にはECRのリポジトリ
{tag}にはイメージタグをセット(何もなければlatest)
{account_id}にはAWSアカウントID

version: 0.1
 
phases:
  pre_build:
    commands:
      - $(aws ecr get-login --region ap-northeast-1 --no-include-email)
  build:
    commands:
      - docker build -t {REPO_NAME}:{tag} .
      - docker tag {REPO_NAME}:{tag} {account_id}.dkr.ecr.ap-northeast-1.amazonaws.com/{REPO_NAME}:{tag}
      - docker push {account_id}.dkr.ecr.ap-northeast-1.amazonaws.com/{REPO_NAME}:{tag}
  post_build:
    commands:

pom.xml

コンパイルしてjarファイルを作成します。

<project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>hoge</groupId>
  <artifactId>HelloWorld</artifactId>
  <version>1.0</version>
  <name>Java Sample App</name>

  <build>
        <outputDirectory>target/classes</outputDirectory>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>hoge.Main</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
  </build>
</project>

Main.java

package hoge;

class Main{
  public static void main(String args[]){
    System.out.println("Hello Docker!!");
  }
}

CodeBuildの作成

AWSコンソールの「CodeBuild」→「プロジェクトの作成」をクリックします。

プロジェクト名:multistage-build
ソースプロバイダ:AWS CodeCommit
リポジトリ:multistage-test ← CodeCommitのリポジトリ
環境イメージ:AWS CodeBuildによって管理されたイメージの使用をチェック
オペレーティングシステムUbuntu
ランタイム:Docker
バージョン:aws/codebuild/docker:17.09.0
ビルド仕様:ソースコードのルートディレクトリのbuildspec.ymlを使用をチェック
上記以外はデフォルトのままとしてプロジェクトを作成します。
f:id:cloudfish:20180808155111p:plain

プロジェクト作成後に、AWSコンソールの「IAM」→「ロール」から作成されたCodeBuild用のサービスロールを選択し、以下のポリシーを追加します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "ecr:BatchCheckLayerAvailability",
                "ecr:CompleteLayerUpload",
                "ecr:GetAuthorizationToken",
                "ecr:InitiateLayerUpload",
                "ecr:PutImage",
                "ecr:UploadLayerPart"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

実行確認

準備が整ったのでビルド実行してECRにプッシュされることを確認します。
AWSコンソールの「CodeBuild」→作成したプロジェクトを選択し、「ビルドの開始」をクリックします。
プロジェクト名:作成したプロジェクト名
ブランチ:master
他はデフォルとのままで「ビルドの開始」をクリックします。
f:id:cloudfish:20180808162611p:plain

ビルドが正常終了した場合、ステータスが「Succeeded」となります。
失敗した場合はステータスが「Failed」となりますので、詳細画面からビルドログを確認しエラー内容を確認してください。

正常にビルドが完了しjavaのビルドも正しく完了していると、ビルドログに以下のように出力されていると思います。ビルドイメージ作成時にJavaのアプリが正常にコンパイルされているかチェックするため実行しています。
f:id:cloudfish:20180808162833p:plain

まとめ

今回利用したjreのみのイメージだと約50MBとなり、openjdkのalpineイメージ(約100MB)と比べてもサイズを大幅に削減することが簡単にできました。
また、多段での実行が可能なので、アプリをビルド後、テスト用のイメージでテストを実行し、その後に実行用のイメージを作成するようなこともできそうです。

PySparkでMySQLからのデータ取得&集計方法

MySQLに対してSQLでよくやるようなデータの取得や集計などをPySparkのDataFrameだとどうやるのか調べてみましたので、備忘録として残しておきたいと思います。
検証環境は以前紹介したDockerではじめるPySparkをベースにDockerで環境を構築しいます。
こういった検証にDockerはすごく便利でいいですね

環境

  • PySpark 2.2
  • MySQL5.7

データはMySQLの公式でサンプルとして提供されているworldデータベースを利用します。

環境の構築

利用するDockerイメージ

以下の通りdocker-compose.ymlを作成します。

version: '2'
services:
 pyspark:
    image: cloudfish/pyspark-notebook
    volumes:
       - LOCAL_PATH:/home/jovyan/work
    ports:
      - "8888:8888"
    command: bash -c "start-notebook.sh --NotebookApp.token=''"
    links:
      - dbserver
    environment:
      GRANT_SUDO: "yes"
  dbserver:
    image: kakakakakku/mysql-57-world-database
    environment:
      MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
  phpmyadmi:
    image: phpmyadmin/phpmyadmin
    ports: 
      - "18080:80"
    links:
      - "dbserver"
    environment:
      PMA_HOST: dbserver
      PMA_USER: root 
      PMA_PASSWORD: ""

Docker起動

docker-compose up

Jupyter Notebook画面確認
http://localhost:8888
f:id:cloudfish:20180727134630p:plain
phpmyadmin画面確認
http://localhost:18080
f:id:cloudfish:20180730141054p:plain

PySparkの実行確認

早速サンプルデータベースで実行確認を進めていきます。worldデータベースは以下のようなテーブルが含まれています。
これらのテーブルを使ってデータを取得してみたいと思います。

Tables_in_world
city
country
countrylanguage

画面右端のNewボタンをクリックしPython3を選択し、開いた画面で以下を入力していきます。
f:id:cloudfish:20180727134709p:plain

以下のコードはコードセルごとに入力してください。入力後Shift + Enterでコードが実行されます。

Sparkの初期化処理

from pyspark.sql import SQLContext, Row
from pyspark import SparkContext

sc = SparkContext("local", "First App")

※2回実行するとエラーになります。

JDBC接続処理

JDBCに接続しています。

sqlContext = SQLContext(sc)
jdbc_url="jdbc:mysql://dbserver/mysql"
driver_class="com.mysql.jdbc.Driver"

DB_USER="root"
DB_PASSWORD=""

def load_dataframe(table):
  df=sqlContext.read.format("jdbc").options(
    url =jdbc_url,
    driver=driver_class,
   dbtable=table,
   user=DB_USER,
    password=DB_PASSWORD
  ).load()
  return df

データ取得処理

データをDataFrameに取得します
テーブル指定でデータ取得する場合(countryテーブル、cityテーブルを取得)

df_country = load_dataframe("world.country")
# 実行されるSQL:SELECT * FROM world.country WHERE 1=0
df_city = load_dataframe("world.city")
# 実行されるSQL:SELECT * FROM world.city WHERE 1=0

SQLを指定してデータ取得する場合(cityテーブルの国コードがJPNのものだけを取得)

df_city_japan = load_dataframe("(select * from world.city where CountryCode='JPN') city_japan")
# 実行されるSQL:SELECT * FROM (select * from world.city where CountryCode='JPN') city_japan WHERE 1=0

MySQL側でどのようなSQLが流れるのか見てみたところ、テーブルにセットした内容がFROM句の後にセットされるようです。

カラム指定

Nameカラムを表示

df_country.select("Name").show()

条件検索

国名がJapanのデータを抽出。

df_country.filter(df_country["Name"] == "Japan").show()
df_country.where(df_country["Name"] == "Japan").select("Code","GNP").show()

isNull

独立年がNullのデータを抽出

df_country.where(df_country["IndepYear"].isNull()).show()

like

国名がJで始まるデータを抽出

df_country.where(df_country["Name"].like("J%")).show()

Case When式

人口が100000人より大きい場合は「Big」、小さい場合は「Small」を表示

from pyspark.sql import functions as F
df_country.select(df_country["Name"], F.when(df_country["Population"] > 100000,"Big").otherwise("Small").alias("CountryDiv")).show()

substr

国名を3文字切り出して表示

df_country.select(df_country["Name"].substr(1,3)).show()

limit

5件のみ抽出

df_country.limit(5).show()

Join

countryテーブルとcityテーブルを結合し国名がJapanのものを抽出

df_join = df_country.alias('country').join(df_city.alias('city'),(df_city["countryCode"] == df_country["Code"]) & (df_country["Name"]=="Japan")).show()

OrderBy

GNPが高い国順に表示

df_country.orderBy("GNP" , ascending=False).select("Code","Name","GNP").show()

GroupBy

国ごとにグループ化し、cityの数、人口の平均と合計を集計

from pyspark.sql import functions as F

df_city.groupBy("countryCode") \
 .agg( \
    F.count(df_city["Name"]).alias("total_count"), \
    F.avg(df_city["Population"]).alias("avg_population"), \
    F.sum(df_city["Population"]).alias("sum_population") \
).show()

まとめ

書き方は少し慣れる必要がありますが、かなりSQLに近いイメージでデータ取得が可能なことが分かりました。
今回はDBに対して実行しましたが、ファイルに対しても同様に実行可能です。

CodeBuildのbuildspec.ymlで変数定義ができない

CodeBuildを利用していてbuildspec.ymlを書いていたなかで、環境変数ではなく変数定義して使いたいと思い試していたのですが、
どうもうまくいかずハマりましたので備忘録としてのことしておきたいと思います。

やりたかったこと

buildspec.ymlのビルド手順で処理途中で変数定義を行いたい

簡単なサンプルですが、以下のようなbuildspec.ymlがあったとして、実行するとバージョンが表示されることを期待しました。

  build:
    commands:
      - $version=1.0
      - echo $version

ですが、実行してもこの状態だと何も出力されず、
exportしてみたりと色々試してみましたが思うような結果は得られませんでした。

最終的に行ごとにセッションが新しく張られてるのではないかと考え以下のように変数を引き継ぎたい処理をシェルスクリプト化することで解決しました。

hoge.sh

$version=1.0
echo $version

buildspec.yml

  build:
    commands:
      - sh ./build.sh

見た目そのままシェルスクリプトなので、同じように使えるかと思いましたがそうではないんですね。
シェルスクリプトにすることで一見して処理が見えなくなるのも微妙な感じですが、まとまった処理であればスクリプト化するのもいいかと思います。

AWS BatchでEFSをマウントしディスク共有してみる

もうすぐEFSが東京リージョンに来ますね。最近はAWS Batchを触る機会が多くなってきたので、AWS BatchでEFSを使ってのディスク共有を検証してみました。
まだ東京リージョンにはないので今回はバージニアリージョンを利用して検証してみます。

構成

f:id:cloudfish:20180707232903p:plain

手順概要

1. EFSの作成
2. Batch用イメージの作成
3. AWS Batchの設定
4. 実行確認

1. EFSの作成

まずAWS Batch用とEFS用にセキュリティグループを作成します。

Batch-SG(sg-batchsg)
Type Protocol Port Range Source Description
SSH TCP 22 xxx.xxx.xxx.xxx/32 ssh
EFS-SG(sg-efssg)
Type Protocol Port Range Source Description
TCP TCP 2049 sg-batchsg batch→efs通信用

Batch用インスタンスからEFSに接続可能なように設定しておきます。

AWSコンソールからEFSを選択してEFSを作成します。
今回はデフォルトVPCとして、セキュリティグループは上で作成したEFS用のセキュリティグループを設定します。
ゾーンごとにセキュリティグループを設定するのは何だかなーという印象です。
f:id:cloudfish:20180705001335p:plain

作成後にDNS nameが割り当てられるので控えておいてください。
f:id:cloudfish:20180705102246p:plain

2. Batch用イメージの作成

ECS用のAMIからEC2を新規に作成し、起動時に指定のEFSを自動マウントする設定を行います。
バージニアリージョンの場合は以下のAMIから作成しました。
amzn-ami-2017.09.l-amazon-ecs-optimized (ami-aff65ad2)

以下にECS対応のAMIの一覧が記載されていますので参考にしてください。
ただし、AWS Batchのマネージド型で起動したインスタンスについては、2017.09のもので起動されていましたのでここではそれをベースとしています。
Amazon ECS 対応 AMI - Amazon Elastic Container Service

起動完了後にSSHでログインしrootユーザーで以下の設定を行います。

# マウント用のディレクトリ作成
mkdir /efs
# NFSクライアントのインストール
yum install -y nfs-utils

ここではEFSのFile system IDをfs-1234abcdとしていますが、実際のIDに合わせて変更してください。

# EFSをマウント
mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 fs-1234abcd.efs.us-east-1.amazonaws.com:/ /efs

# 起動時にマウントするようにfstabを更新
echo 'fs-1234abcd.efs.us-east-1.amazonaws.com:/ /efs nfs4 nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 0 0' | sudo tee -a /etc/fstab

設定完了後にEC2を再起動し、EFSがマウントできていることを確認してください。
確認完了すれば、EC2を停止しAMIを作成し、それをAWS Batchの実行イメージとします。
※ 本番で利用する場合は、サポートログが出力されるefs-utilsのマウントヘルパーの利用を推奨

3. AWS Batchの設定

コンピューティング環境を作成

以下の設定で作成します

設定内容 設定値
コンピューティング環境のタイプ マネージド型
最小vCPU 1
必要なvCPU 1

以下の通りユーザー指定のAMIを有効にします。
f:id:cloudfish:20180705103411p:plain
上記以外はデフォルトで作成してください。

起動したインスタンスssh接続

dockerコンテナからマウントした時の確認用として/efs配下に以下のファイル作成しておきます。

mkdir -p /efs/files
$echo "Hello EFS!!" > /efs/files/test.txt
$cat /efs/files/test.txt
Hello EFS!!
ジョブ定義を作成します。

事前に以下を参照しroleを作成しておきます。
Amazon ECS コンテナインスタンスの IAM ロール - Amazon Elastic Container Service

busyboxを使ってホストの/efsディレクトリをコンテナの/homeディレクトリにマウントする設定を行います。
コンテナ起動時にtest.txtが表示することでマウントされていることを確認します。

設定内容 設定値
コンテナイメージ busybox
コマンド cat /home/files/test.txt
vCPU 1
メモリ 128

ボリューム

設定内容 設定値
名前 efs
ソースパス /efs

マウントポイント

設定内容 設定値
コンテナパス /home
ソースパス efs
ジョブキューの作成
設定内容 設定値
優先度 1
コンピューティング環境を選択 上記で作成したコンピューティング環境名

ジョブ実行

左ペインの「ジョブ」から「ジョブ送信」を選択します。

設定内容 設定値
ジョブ名 任意
ジョブ定義 上記で作成したジョブ定義名
ジョブキュー 上記で作成したジョブキュー名

他はデフォルトでOKです

実行結果

実行後しばらくすると正常に終了した場合は、コンソールのジョブのsucceededにジョブが表示されます。
cloudwatch logsに以下ログが出力されていれば成功です。
f:id:cloudfish:20180703152027p:plain

これまでAWS Batchのジョブ間でファイルを共有する場合は、S3が主な選択肢でしたが、今後はEFSも選択肢として使えそうです。
S3では耐えきれないワークロードなどで利用検討してみてください。