AWS Glueの開発環境をDockerで構築する(その2)

前回、Glueの開発環境をDockerで作りましたが、zeppelinコンテナを起動してホストからsshフォワードを実行して接続するという構成でホストを意識する必要がありましたが、せっかくzeppelinをdockerで作っているのでsshフォワードを行う部分もdockerで実現できないかと思い考えてみました。

今回作った構成

f:id:cloudfish:20180202231325p:plain
データを永続化するためにdataコンテナとsshフォワード用にssh forwardコンテナを追加することにしました。
そのためzeppelinコンテナからはssh forwardコンテナに接続しにいくことになります。
こうすることでssh forwardコンテナには名前でアクセスできるので、毎回設定変更する必要がなくなります。

手順

  • ssh forwardコンテナ作成
  • docker-compose.yml作成
  • 起動確認

ssh forwardコンテナ作成

以下のとおりDockerfileを作成します。

FROM ubuntu:latest
RUN apt-get -y update && apt-get -y upgrade
RUN apt-get -y install openssh-client net-tools 

ADD ssh_forward.sh /home/ubuntu/ssh_forward.sh
RUN chmod 644 /home/ubuntu/ssh_forward.sh

CMD ["/bin/sh","/home/ubuntu/ssh_forward.sh"]

コンテナ起動時に実行するssh_forward.shを作成します。
起動時に環境変数秘密鍵と接続先のIP or DNSを受取り鍵ファイルを作成して開発エンドポイントへ接続します。

#! /bin/sh

echo $SSH_RSA > dev_endpoint.pem
chmod 600 dev_endpoint.pem

ssh -g -i dev_endpoint.pem -NTL 9007:169.254.76.1:9007 glue@$HOST_ADDRESS -o StrictHostKeyChecking=no

ビルド実行

docker build -t ssh_forward .

docker-compose.ymlを作成

version: '2'
services:
  data:
    image: busybox
    container_name: app
    volumes:
      - /zeppelin/notebook

  zeppelin:
    image: apache/zeppelin:0.7.3
    ports: 
       - "8080:8080"
    volumes_from:
      - data
    restart: always
    links:
      - "ssh_server"

  ssh_server:
    image: ssh_forward
    environment:
       SSH_RSA: "[DevEndpoint接続用ssh秘密鍵]"
       HOST_ADDRESS: [GlueのDevEndpointのDNS or IP]

※DevEndpoint接続用ssh秘密鍵は改行コードを「\\n」に置換して1行にしてからセットしてください。

起動確認

busyboxapache/zeppellinについては予めdocker hubから取得しておいてください。
また、GlueのDevEndpointの設定も事前に必要になります。

docker-compose up

以下アドレスにアクセスして画面が表示されていれば成功です。
http://localhost:8080

f:id:cloudfish:20180202230306p:plain

これで全てdockerコンテナ上で完結できましたね。

AWS GlueでVPCフローログ用のclassifiersを作ってみた

GlueでVPCフローログをparquet形式に変換させる定期ジョブを作ろうと思いクロール処理を追加したところ、ビルトインのClassifiersにはなかったため自動でテーブル構造を認識してくれませんでした。
認識させるためにはカスタムClassifiersを作る必要があることが分かりました。
今回はVPCフローログ用にカスタムClassifiersの作り方を調べた備忘録です

GlueのビルトインのClassifiersについては、以下のページで確認できます。
ここになければ作る必要があります。
Adding Classifiers to a Crawler - AWS Glue

手順

  • Grokフィルタ作成
  • classifiersの作成
  • 実行確認

Grokフィルタ作成

まずログをマッチングして分類するためのフィルタを作る必要がりあります。
Grokフィルタの詳細についてはここを確認してください。
Grok filter plugin | Logstash Reference [6.1] | Elastic
また、Glueのビルトインパターンについては以下を参照してください。
Writing Custom Classifiers - AWS Glue

作ったGrokフィルタをcrawlerを使って逐一実行確認していはかなりの時間がかかるため、
以下のサービスを使ってGrokフィルタが想定どおりに動作しているかを事前に確認します。
Grok Debugger

f:id:cloudfish:20180128001400p:plain

カスタムパターンを作る時の正規表現の確認は、以下のサービスが便利でした
Online regex tester and debugger: PHP, PCRE, Python, Golang and JavaScript

classifiersの作成

左ペインから「classifiers」を選択し、「Add classifier」ボタンをクリックします。
今回は以下のように入力してCreateします。
classifier name:任意
classifier type : Grok
classification:任意
Grok pattern:

%{INT:version:int} %{NOTSPACE:account_id} %{ENI:eni_id} %{IP:srcaddr} %{IP:dstaddr} %{INT:srcport:int} %{INT:dstport:int} %{INT:protocol:int} %{INT:packets:int} %{NUMBER:bytes} %{NUMBER:start} %{NUMBER:end} %{NOTSPACE:action} %{NOTSPACE:log_status}

Custome pattern:

ENI [eni-]+[a-z0-9]{8,8}

上記のgrokパターンは適切ではないと思いますが、とりあえずVPCフローログがマッチするはずです
f:id:cloudfish:20180128002514p:plain

実行確認

上記設定ができれば実際にクロールしてみましょう。
Add crawlerを実行すると以下のような画面になります。
左下のclassifierリストから今回作ったclassifierをAddします。
vpcフローログのあるS3パスを指定して、それ以外は画面に沿って入力しRun crawlerを実行してください。
f:id:cloudfish:20180128003119p:plain

正しく分類できていれば、テーブルの詳細を確認すると以下のようになっていると思います。
うまくいかなかった場合は、ClassificationにUnknownと表示されますのでその場合は修正して再度確認してください。
ただし、classifiersを更新した際は、crawlerを新規に作成しないと反映されないようなので注意してください。
f:id:cloudfish:20180128003850p:plain

grokフィルタについては、少しクセがあり慣れるのに少し時間がかかりそうですが、Debuggerもあるのでうまく活用して慣れていきたいと思います。

AWS Glueで新しくScalaがサポートされました

AWS GlueのETLスクリプトを作成する言語として、新たにScalaが追加されました。

画面を確認すると以下のようにPythonに加えてScalaも選択できるようになっています。
f:id:cloudfish:20180115231204p:plain

以下はScalaで自動生成されたETLスクリプトになります。

import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    // @type: DataSource
    // @args: [database = "sampledb", table_name = "vpc_flow_logs", transformation_ctx = "datasource0"]
    // @return: datasource0
    // @inputs: []
    val datasource0 = glueContext.getCatalogSource(database = "sampledb", tableName = "vpc_flow_logs", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
    // @type: ApplyMapping
    // @args: [mapping = [("version", "int", "version", "int"), ("account", "string", "account", "string"), ("interfaceid", "string", "interfaceid", "string"), ("sourceaddress", "string", "sourceaddress", "string"), ("destinationaddress", "string", "destinationaddress", "string"), ("sourceport", "int", "sourceport", "int"), ("destinationport", "int", "destinationport", "int"), ("protocol", "int", "protocol", "int"), ("packets", "int", "packets", "int"), ("bytes", "int", "bytes", "int"), ("starttime", "int", "starttime", "int"), ("endtime", "int", "endtime", "int"), ("action", "string", "action", "string"), ("logstatus", "string", "logstatus", "string")], transformation_ctx = "applymapping1"]
    // @return: applymapping1
    // @inputs: [frame = datasource0]
    val applymapping1 = datasource0.applyMapping(mappings = Seq(("version", "int", "version", "int"), ("account", "string", "account", "string"), ("interfaceid", "string", "interfaceid", "string"), ("sourceaddress", "string", "sourceaddress", "string"), ("destinationaddress", "string", "destinationaddress", "string"), ("sourceport", "int", "sourceport", "int"), ("destinationport", "int", "destinationport", "int"), ("protocol", "int", "protocol", "int"), ("packets", "int", "packets", "int"), ("bytes", "int", "bytes", "int"), ("starttime", "int", "starttime", "int"), ("endtime", "int", "endtime", "int"), ("action", "string", "action", "string"), ("logstatus", "string", "logstatus", "string")), caseSensitive = false, transformationContext = "applymapping1")
    // @type: ResolveChoice
    // @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
    // @return: resolvechoice2
    // @inputs: [frame = applymapping1]
    val resolvechoice2 = applymapping1.resolveChoice(choiceOption = Some(ChoiceOption("make_struct")), transformationContext = "resolvechoice2")
    // @type: DropNullFields
    // @args: [transformation_ctx = "dropnullfields3"]
    // @return: dropnullfields3
    // @inputs: [frame = resolvechoice2]
    val dropnullfields3 = resolvechoice2.dropNulls(transformationContext = "dropnullfields3")
    // @type: DataSink
    // @args: [connection_type = "s3", connection_options = {"path": "s3://glue-test-out-bucket"}, format = "parquet", transformation_ctx = "datasink4"]
    // @return: datasink4
    // @inputs: [frame = dropnullfields3]
    val datasink4 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://glue-test-out-bucket"}"""), transformationContext = "datasink4", format = "parquet").writeDynamicFrame(dropnullfields3)
    Job.commit()
  }
}

Sparkの利用者はPythonScalaのどちらの言語の利用が多いんでしょうか?
いずれにせよ言語の選択肢が増えるのはすごくいいですね。

AWS Glueの開発環境(Zeppelin)をDockerで構築する

AWS Glueで自動生成されたETL処理のPySparkの開発について、AWSコンソール上で修正して実行確認は可能ですがかなり手間になります。
そこで開発エンドポイントを使って開発する方法が提供されており、Apache Zeppelinなどを使ってインタラクティブに開発することができます。公式ドキュメントによると、エンドポイントを利用して開発する方法としては大きく以下の4つの方法が提供されています。

① EC2上にApache Zeppelinを構築して開発エンドポイントへ接続
② ローカルマシンにApache Zeppelinを構築して開発エンドポイントへ接続
③ 開発エンドポイントへ直接sshしてREPL Shellを利用する
④ PyCharmのProfessional editionで開発エンドポイントへ接続

①の方法がコンソールからワンクリックで開発環境を構築することができるので簡単かつ便利に準備することができますが、EC2(m4.xlarge)が起動されるためかなりの費用がかかります。
今回は②の方法でかつローカルマシン上のDockerでApacheZeppelinを構築して開発環境を準備してみたいと思います。

手順

1. 開発エンドポイントを作成
2. コンテナイメージを取得
3. 開発エンドポイントにsshフォワードで接続
4. Apache/Zeppelinを起動&設定
5. 動作確認

前提

リージョン:東京リージョン
ローカルマシン:Mac(Docker for Macインストール済み)

1. 開発エンドポイントを作成

事前に以下が対応されている前提とします

  • 開発エンドポイントを作成するVPCにS3エンドポイントの作成
  • Glue用サービスロールの作成
  • 開発エンドポイント用のセキュリティグループの作成
  • 開発エンドポイント用のssh鍵の作成

詳細は以下参照
docs.aws.amazon.com

開発エンドポイントについては、費用1DPUあたり0.44ドル/時で最低2DPUからとなりますので
できるだけこまめに消すことを想定してCloudFormationで作成します。

AWSTemplateFormatVersion: '2010-09-09'
Description: "Glue DevEndpoint"
Resources:
 DevEndpoint:
   Type: "AWS::Glue::DevEndpoint"
   Properties:
     EndpointName: "TestDevEndpoint"
     NumberOfNodes: 2
     PublicKey: "[ssh public key]"
     RoleArn: "[Glue Service RoleのARN]"
     SecurityGroupIds: 
       - [開発エンドポイント用のセキュリティグループ]
     SubnetId:  [開発エンドポイント用のサブネット]

上記を作成して保存後、コンソールもしくはcliから実行してください。

aws cloudformation create-stack --stack-name stack-dev-endpoint --region ap-northeast-1 --template-body file://dev_endpoint.yml

と思ったら、エラーが発生してうまく実行できませんでいた。
バージニアリージョンでは問題なく実行できましたので、おそらく現時点で東京リージョンではGlueのCloudFormationがまだ使えないようです。

仕方が無いのでcliで作成することにします。
以下のコマンドを実行して開発エンドポイントを作成します。

aws glue create-dev-endpoint \
 --endpoint-name TestDevEndpoint \
 --role-arn [Glue Service RoleのARN]  \
 --security-group-ids sg-866cdeff \
 --subnet-id  [開発エンドポイント用のセキュリティグループ] \
 --public-key "[ssh public key]" \
 --number-of-nodes 2 

実行後しばらくしてstatusがREADYになれば準備完了です。
f:id:cloudfish:20180112204151p:plain

2. コンテナイメージの取得

以下のコマンドを実行してDockerHubからapache zeppelinのコンテナイメージ取得する

docker pull apache/zeppelin

3. 開発エンドポイントにsshフォワードで接続

コンソールから開発エンドポイントの詳細を開き「SSH tunnel to remote interpreter」に記載のある接続コマンドをコピーします。
f:id:cloudfish:20180112211427p:plain

コピーしたコマンドに-gオプションを付与してMacからssh接続を行います

ssh -g -i <private-key.pem> -NTL 9007:169.254.76.1:9007 glue@ec2-xxx-xxx-xxx-xxx.ap-northeast-1.compute.amazonaws.com

※コンテナからトンネルを利用できるようにするためにgオプションを付与します

4. Apache/Zeppelinを起動&設定

docker run -p 8080:8080  apache/zeppelin:0.7.3

※タグは取得したバージョンをセットしてください

起動後以下のURLにアクセスします
http://localhost:8080.

画面右側からinterpreterを選択します
f:id:cloudfish:20180112212626p:plain

spark項目で以下のように設定します。

  • Connect to existing processをチェック
  • HostにローカルマシンのIP
  • masterにyarn-clinet
  • psark.executor.memoryを削除

f:id:cloudfish:20180113215341p:plain

設定後にrestartすれば完了です。

5. 動作確認

以下のサンプルコードを実行してスキーマ情報が出力されることを確認してください
DataBase、TableNameは実際に設定しているものを使用してください

%pyspark
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *

# Create a Glue context
glueContext = GlueContext(SparkContext.getOrCreate())

# Create a DynamicFrame using the 'persons_json' table
persons_DyF = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="persons_json")

# Print out information about this data
print "Count:  ", persons_DyF.count()
persons_DyF.printSchema()

これで気軽に試せる環境が整いました。
実際に使うにはコードのデータや設定データの永続化も考える必要がありますが、まずはこれで色々と試してみてください。

DockerではじめるPySpark

最近Sparkの勉強を始めました。
手軽に試せる環境としてPySparkをJupyter Notebookで実行できる環境を作ればよさそうです。
環境構築に手間取りたくなかったので、Dockerで構築できないか調べてみるとDocker Hubでイメージが提供されていましたので、それを利用することにしました。
今回は導入からサンプル実行までやってみたいと思います。

環境

Mac Sierra
Docker for Mac

導入手順

Dockerイメージの取得

以下のコマンドを実行してイメージを取得してください。
イメージのサイズが約5GBあるのでディスク容量には気をつけてください

docker pull jupyter/pyspark-notebook
Dockerの起動

notebookのデータを保存しておくために、ローカルに適当なディレクトリを作成してDockerのマウント先を作ります。
以下コマンドを実行してDockerを起動します。

docker run -p 8888:8888 -v ローカルボリュームのフルパス:/home/jovyan/work jupyter/pyspark-notebook start-notebook.sh --NotebookApp.token=''

Jupyterの起動

Dockerの起動完了後、以下のアドレスにアクセスします
http://localhost:8888/

正常に起動していれば以下のような画面が表示されます
f:id:cloudfish:20171222233336p:plain

右端のNewボタンからPython3を選択します
f:id:cloudfish:20171222234941p:plain

新規に開いた画面に以下のコードを入力します

from pyspark.context import SparkContext
sc = SparkContext()

data = sc.parallelize([1, 2, 3, 4, 5])
print(data.count())
sc.stop()

Runボタンをクリックし実行結果が表示されれば成功です
f:id:cloudfish:20171222235225p:plain

後は色々とサンプルを動かしてみてください。
Dockerで手軽に試せるのはいいですね。

AWSで時刻同期サービスがリリースされました

これまでAWS上では時刻同期について、外部のサーバを利用する必要がありましたが、
AWS内で利用できるサービスがリリースされました。
これで外部へアクセスする必要がなくなりました。またうるう秒も自動で調整してくれるため是非利用するべきだと思います

設定方法(Amazon Linux

以下のいずれかの方法で設定

方法1

NTPのアクセス先を以下に変更する

server 169.254.169.123 prefer iburst
方法2

ntpを削除してchronyに入れ替える

sudo yum erase ntp*
sudo yum -y install chrony
sudo service chronyd start

これからAWSを利用する場合の時刻同期は全てこれで対応していいと思います。
Windowsでも設定可能なので、詳細は以下を参照してください。

S3マウントツールのObjectiveFSを試してみた

S3のマウントにみなさん何を使っていますか?
s3fs?goofys?使い勝手はいかがでしょう?
今回は、商用ですがObjectiveFS(https://objectivefs.com/)というS3のマウントツールの紹介をしたいと思います。

環境

OS:AmazonLinux

手順

ライセンス取得

14日間のトライアル版がありますので、以下のページの「Try It Free」からサインアップ(クレジットカード不要)してライセンスを取得してください。サインアップ後にメールが届きますのでそれに従ってログインしてください。
https://objectivefs.com/price?l=pricing
f:id:cloudfish:20170626144019p:plain

rpmダウンロード

ログイン後に以下のページからrpmをダウンロードします。
https://objectivefs.com/user/downloads

f:id:cloudfish:20170626144012p:plain

インストー

ダウンロードしたrpmのバージョンにあわせてください

# yum install objectivefs-5.1-1.x86_64.rpm

初期設定

IAM Userを作成しアクセスキー、シークレットキーを準備しておいてください。
IAM Userの権限にはS3FullAccessを付与しておいてください。

#sudo mount.objectivefs config
 Enter ObjectiveFS license: ←ライセンスを入力
 Enter Access Key Id: ←アクセスキーを入力
 Enter Secret Access Key: ←シークレットキーを入力
 Enter Default Region (optional): ←リージョンを入力

ファイルシステムを作成

以下のコマンドを実行するろS3にバケットが作成されますので注意して実行してください。
「s3-objectivefs」というバケットを作成するものとします

# sudo mount.objectivefs create s3-objectivefs
  Passphrase (for s3://<filesystem>): ←任意のパスワード
  Verify passphrase (for s3://<filesystem>): ←パスワード再入力

マウントする

# sudo mkdir /mnt/s3-objectivefs
# sudo mount.objectivefs s3-objectivefs /mnt/s3-objectivefs
Passphrase (for s3://<filesystem>): ←上記で設定したパスワードを入力

これでエラーが出なければ完了です
lsしてみたりファイルを作るなりして遊んでみてください。
既存バケットをマウントする方法はまたご紹介したいと思います。