AWS Glueで新しくScalaがサポートされました
AWS GlueのETLスクリプトを作成する言語として、新たにScalaが追加されました。
画面を確認すると以下のようにPythonに加えてScalaも選択できるようになっています。
以下はScalaで自動生成されたETLスクリプトになります。
import com.amazonaws.services.glue.ChoiceOption import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.ResolveSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // @type: DataSource // @args: [database = "sampledb", table_name = "vpc_flow_logs", transformation_ctx = "datasource0"] // @return: datasource0 // @inputs: [] val datasource0 = glueContext.getCatalogSource(database = "sampledb", tableName = "vpc_flow_logs", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame() // @type: ApplyMapping // @args: [mapping = [("version", "int", "version", "int"), ("account", "string", "account", "string"), ("interfaceid", "string", "interfaceid", "string"), ("sourceaddress", "string", "sourceaddress", "string"), ("destinationaddress", "string", "destinationaddress", "string"), ("sourceport", "int", "sourceport", "int"), ("destinationport", "int", "destinationport", "int"), ("protocol", "int", "protocol", "int"), ("packets", "int", "packets", "int"), ("bytes", "int", "bytes", "int"), ("starttime", "int", "starttime", "int"), ("endtime", "int", "endtime", "int"), ("action", "string", "action", "string"), ("logstatus", "string", "logstatus", "string")], transformation_ctx = "applymapping1"] // @return: applymapping1 // @inputs: [frame = datasource0] val applymapping1 = datasource0.applyMapping(mappings = Seq(("version", "int", "version", "int"), ("account", "string", "account", "string"), ("interfaceid", "string", "interfaceid", "string"), ("sourceaddress", "string", "sourceaddress", "string"), ("destinationaddress", "string", "destinationaddress", "string"), ("sourceport", "int", "sourceport", "int"), ("destinationport", "int", "destinationport", "int"), ("protocol", "int", "protocol", "int"), ("packets", "int", "packets", "int"), ("bytes", "int", "bytes", "int"), ("starttime", "int", "starttime", "int"), ("endtime", "int", "endtime", "int"), ("action", "string", "action", "string"), ("logstatus", "string", "logstatus", "string")), caseSensitive = false, transformationContext = "applymapping1") // @type: ResolveChoice // @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"] // @return: resolvechoice2 // @inputs: [frame = applymapping1] val resolvechoice2 = applymapping1.resolveChoice(choiceOption = Some(ChoiceOption("make_struct")), transformationContext = "resolvechoice2") // @type: DropNullFields // @args: [transformation_ctx = "dropnullfields3"] // @return: dropnullfields3 // @inputs: [frame = resolvechoice2] val dropnullfields3 = resolvechoice2.dropNulls(transformationContext = "dropnullfields3") // @type: DataSink // @args: [connection_type = "s3", connection_options = {"path": "s3://glue-test-out-bucket"}, format = "parquet", transformation_ctx = "datasink4"] // @return: datasink4 // @inputs: [frame = dropnullfields3] val datasink4 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://glue-test-out-bucket"}"""), transformationContext = "datasink4", format = "parquet").writeDynamicFrame(dropnullfields3) Job.commit() } }
Sparkの利用者はPythonかScalaのどちらの言語の利用が多いんでしょうか?
いずれにせよ言語の選択肢が増えるのはすごくいいですね。
AWS Glueの開発環境(Zeppelin)をDockerで構築する
AWS Glueで自動生成されたETL処理のPySparkの開発について、AWSコンソール上で修正して実行確認は可能ですがかなり手間になります。
そこで開発エンドポイントを使って開発する方法が提供されており、Apache Zeppelinなどを使ってインタラクティブに開発することができます。公式ドキュメントによると、エンドポイントを利用して開発する方法としては大きく以下の4つの方法が提供されています。
① EC2上にApache Zeppelinを構築して開発エンドポイントへ接続
② ローカルマシンにApache Zeppelinを構築して開発エンドポイントへ接続
③ 開発エンドポイントへ直接sshしてREPL Shellを利用する
④ PyCharmのProfessional editionで開発エンドポイントへ接続
①の方法がコンソールからワンクリックで開発環境を構築することができるので簡単かつ便利に準備することができますが、EC2(m4.xlarge)が起動されるためかなりの費用がかかります。
今回は②の方法でかつローカルマシン上のDockerでApacheZeppelinを構築して開発環境を準備してみたいと思います。
1. 開発エンドポイントを作成
事前に以下が対応されている前提とします
詳細は以下参照
docs.aws.amazon.com
開発エンドポイントについては、費用1DPUあたり0.44ドル/時で最低2DPUからとなりますので
できるだけこまめに消すことを想定してCloudFormationで作成します。
AWSTemplateFormatVersion: '2010-09-09' Description: "Glue DevEndpoint" Resources: DevEndpoint: Type: "AWS::Glue::DevEndpoint" Properties: EndpointName: "TestDevEndpoint" NumberOfNodes: 2 PublicKey: "[ssh public key]" RoleArn: "[Glue Service RoleのARN]" SecurityGroupIds: - [開発エンドポイント用のセキュリティグループ] SubnetId: [開発エンドポイント用のサブネット]
上記を作成して保存後、コンソールもしくはcliから実行してください。
aws cloudformation create-stack --stack-name stack-dev-endpoint --region ap-northeast-1 --template-body file://dev_endpoint.yml
と思ったら、エラーが発生してうまく実行できませんでいた。
バージニアリージョンでは問題なく実行できましたので、おそらく現時点で東京リージョンではGlueのCloudFormationがまだ使えないようです。
仕方が無いのでcliで作成することにします。
以下のコマンドを実行して開発エンドポイントを作成します。
aws glue create-dev-endpoint \ --endpoint-name TestDevEndpoint \ --role-arn [Glue Service RoleのARN] \ --security-group-ids sg-866cdeff \ --subnet-id [開発エンドポイント用のセキュリティグループ] \ --public-key "[ssh public key]" \ --number-of-nodes 2
実行後しばらくしてstatusがREADYになれば準備完了です。
3. 開発エンドポイントにsshフォワードで接続
コンソールから開発エンドポイントの詳細を開き「SSH tunnel to remote interpreter」に記載のある接続コマンドをコピーします。
コピーしたコマンドに-gオプションを付与してMacからssh接続を行います
ssh -g -i <private-key.pem> -NTL 9007:169.254.76.1:9007 glue@ec2-xxx-xxx-xxx-xxx.ap-northeast-1.compute.amazonaws.com
※コンテナからトンネルを利用できるようにするためにgオプションを付与します
4. Apache/Zeppelinを起動&設定
docker run -p 8080:8080 apache/zeppelin:0.7.3
※タグは取得したバージョンをセットしてください
起動後以下のURLにアクセスします
http://localhost:8080.
画面右側からinterpreterを選択します
spark項目で以下のように設定します。
- Connect to existing processをチェック
- HostにローカルマシンのIP
- masterにyarn-clinet
- psark.executor.memoryを削除
設定後にrestartすれば完了です。
5. 動作確認
以下のサンプルコードを実行してスキーマ情報が出力されることを確認してください
DataBase、TableNameは実際に設定しているものを使用してください
%pyspark import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * # Create a Glue context glueContext = GlueContext(SparkContext.getOrCreate()) # Create a DynamicFrame using the 'persons_json' table persons_DyF = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="persons_json") # Print out information about this data print "Count: ", persons_DyF.count() persons_DyF.printSchema()
これで気軽に試せる環境が整いました。
実際に使うにはコードのデータや設定データの永続化も考える必要がありますが、まずはこれで色々と試してみてください。
DockerではじめるPySpark
最近Sparkの勉強を始めました。
手軽に試せる環境としてPySparkをJupyter Notebookで実行できる環境を作ればよさそうです。
環境構築に手間取りたくなかったので、Dockerで構築できないか調べてみるとDocker Hubでイメージが提供されていましたので、それを利用することにしました。
今回は導入からサンプル実行までやってみたいと思います。
導入手順
Dockerイメージの取得
以下のコマンドを実行してイメージを取得してください。
イメージのサイズが約5GBあるのでディスク容量には気をつけてください
docker pull jupyter/pyspark-notebook
Dockerの起動
notebookのデータを保存しておくために、ローカルに適当なディレクトリを作成してDockerのマウント先を作ります。
以下コマンドを実行してDockerを起動します。
docker run -p 8888:8888 -v ローカルボリュームのフルパス:/home/jovyan/work jupyter/pyspark-notebook start-notebook.sh --NotebookApp.token=''
Jupyterの起動
Dockerの起動完了後、以下のアドレスにアクセスします
http://localhost:8888/
正常に起動していれば以下のような画面が表示されます
右端のNewボタンからPython3を選択します
新規に開いた画面に以下のコードを入力します
from pyspark.context import SparkContext sc = SparkContext() data = sc.parallelize([1, 2, 3, 4, 5]) print(data.count()) sc.stop()
Runボタンをクリックし実行結果が表示されれば成功です
後は色々とサンプルを動かしてみてください。
Dockerで手軽に試せるのはいいですね。
S3マウントツールのObjectiveFSを試してみた
S3のマウントにみなさん何を使っていますか?
s3fs?goofys?使い勝手はいかがでしょう?
今回は、商用ですがObjectiveFS(https://objectivefs.com/)というS3のマウントツールの紹介をしたいと思います。
環境
OS:AmazonLinux
手順
ライセンス取得
14日間のトライアル版がありますので、以下のページの「Try It Free」からサインアップ(クレジットカード不要)してライセンスを取得してください。サインアップ後にメールが届きますのでそれに従ってログインしてください。
https://objectivefs.com/price?l=pricing
初期設定
IAM Userを作成しアクセスキー、シークレットキーを準備しておいてください。
IAM Userの権限にはS3FullAccessを付与しておいてください。
#sudo mount.objectivefs config
Enter ObjectiveFS license: ←ライセンスを入力
Enter Access Key Id: ←アクセスキーを入力
Enter Secret Access Key: ←シークレットキーを入力
Enter Default Region (optional): ←リージョンを入力
ファイルシステムを作成
以下のコマンドを実行するろS3にバケットが作成されますので注意して実行してください。
「s3-objectivefs」というバケットを作成するものとします
# sudo mount.objectivefs create s3-objectivefs Passphrase (for s3://<filesystem>): ←任意のパスワード Verify passphrase (for s3://<filesystem>): ←パスワード再入力
マウントする
# sudo mkdir /mnt/s3-objectivefs # sudo mount.objectivefs s3-objectivefs /mnt/s3-objectivefs Passphrase (for s3://<filesystem>): ←上記で設定したパスワードを入力
これでエラーが出なければ完了です
lsしてみたりファイルを作るなりして遊んでみてください。
既存バケットをマウントする方法はまたご紹介したいと思います。
AWS SDKでAuthFailure【cloudpack 大阪 BLOG】
発生した障害
boto(python)で定期的にS3にファイルをアップロードしているスクリプトが突然エラーとなり実行できなくなる問題に遭遇しました。
エラーログを確認すると「403 Forbidden」が発生していました。
調査したこと
何らかの原因で使用しているアクセスキーが有効でなくなった可能性をがあると想定しましたが問題なし。
ポリシーも確認しましたが権限は問題なし。
ざっと見たところ原因分からず。
問題の切り分け
同じ権限を持ったユーザーを再作成して試したところ上記と同様のエラーとなる
ここでS3にエラーが発生してないか気になりヘルスチェックを確認しましたがこちらも問題なし
一応、再作成したユーザーが本当に正しい権限が付与されているか確認するためにaws cliを手動で実行したところ下記エラーが発生しました。
An error occurred (AuthFailure) when calling the DescribeInstances operation: AWS was not able to validate the provided access credentials
認証失敗とありますが、あまり見慣れていないエラーが発生していたため即ググったところ、OSの時間がずれていることが原因らしいことが判明。OSの時刻を確認すると確かに20分ほどずれており、ntpで正常に同期できていませんでした。手動で時刻同期して再度aws cliを実行すると正常に実行できました。
その後定期スクリプトを再度実行したところ正常に終了しました。
参考ページ
awscliはOS内の時間が狂ってるとAuthFailureが出る | hacknote
これをやると http://qiita.com/sonots/private/4d5dbec2b12621ef27e9 こうなる · GitHub
上記のページを確認するとOSの時刻がずれているとAPIの認証が通らないようです。
AWSのドキュメントにも以下の記載がありました。
AWS CLI、または AWS SDK を使用してインスタンスからリクエストを行う場合、これらのツールによって 自動的にリクエストに署名されます。 インスタンスの日時が正しく設定されていない場合、署名の日付がリクエストの日付と一致しないことがあり、 その場合は AWS によってリクエストが却下されます。
AutoScalingでEC2のDetach/StandByができない場合の対処法【cloudpack 大阪 BLOG】
はじめに
AutoScalingを設定していて手動で設定を変更していると、たまに台数がずれるなどして最低台数の制限に引っかかりAutoScalingGroupからEC2をDetachやStandByにできない場合があります。スケーリングポリシーに基づいてインスタンスの増減が実施されていればほぼ発生しないと思いますが、メンテナンスやインスタンスの入れ替えなどを手動で実施すると発生する場合があります。今回はこうしたケースの対処法を紹介しようと思います。
Detachできない状態
AutoScalingGroupには3台紐付いていますが、DesiredとMinがそれぞれ2台となっています。
1台をDetachしてAutoScalingGroupを2台にしたいのですが、この状態でDetachを実施すると、
Desiredが2から1に変わりMinを下回ることからエラーとなります。
対応方法
Desiredを2から3に変更した後にDetachできればいいのですが、普通に変更するとインスタンスが新規に追加されてしまいます。
対応としては以下のように「Suspended Processes」でLaunchを設定します。こうすることでAutoScalignGroupの設定を変更しても新規にインスタンスの追加が実行されません。この状態にしてからDesiredを3に変更します。
正しく設定が完了すれば以下のようにAutoScalignGroupのインスタンス数が変わらずDesiredが3になります。
この状態で再度、対象インスタンスをDetachすると問題なく実行できるはずです。
Detach完了後です。正常にDetachが完了しました。
AutoScalingは便利な機能ですが、手動でインスタンスを制御しようとすると思わぬ動作をしてインスタンスの削除や追加が実行されます。追加はともかく削除は困ることが多いと思いますので手動で何かをする際は事前にしっかりと検証したほうがいいと思います。