ec2-schedulerでRDSを定期的に起動・停止する

前回、ec2-schedulerでEC2の定期起動・停止を試してみましたが、今回はRDSで実行してみたいと思います。RDSについてもほぼEC2と同様の設定になります

設定方法

DynamoDBのConfigTableに以下を設定
テーブル名はスタック名+ConfigTableになっています。
ex)
月曜から金曜までの9:00 - 20:00(日本時間)の間だけRDSを起動する設定

configレコード
type name use_metrics default_timezone regions schedule_lambda_account scheduled_services tagname trace
config scheduler true Asia/Tokyo { "ap-northeast-1" } true { "ec2", "rds" } Schedule false

scheduled_servicesにrdsを追加します

scheduleレコード
type name description periods timezone
schedule rds_stopstart rds stop start {"weekday_9to20"} Asia/Tokyo
periodレコード
type name description timezone weekdays begintime endtime
period weekday_9to20 Office hours Asia/Tokyo {"mon-fri"} 9:00 20:00

RDS側にタグを付与

対象のRDSに以下のタグを付与します
f:id:cloudfish:20180413083308p:plain

動作確認

まず停止状態にしておき、時間が来るまで待ちます
f:id:cloudfish:20180413083333p:plain

設定した時間内になれば自動的に起動されました
f:id:cloudfish:20180413084842p:plain

注意点

  • RDSクラスターの一部もしくはAuroraはec2-schedulerでは自動起動・停止はできません。
  • EC2でも同様ですが、設定時間から起動が開始されているためインスタンスが利用可能となっているわけではないので、利用可能としたい時間にあわせて事前に起動しておく必要があります。
  • メンテナンスウィンドウを考慮したスケジュールも可能となっています。詳しくはドキュメントを参照してください。
  • 本番で利用する場合は、正常に起動しない場合も想定して監視をするなどの考慮してください。

RDSも停止することが可能になったので、定期的な停止をしてコスト削減を検討してみてはいかがでしょうか。

ec2-schedulerでEC2を定期的に停止してコストを節約しよう

AWS Answerで公開されているec2-schedulerを使ってみました。
EC2の自動起動、停止についてはLambdaを使って色々と工夫されているかと思いますが、このec2-schedulerはかなり高機能なソリューションになっていますのでぜひ利用を検討してもらえればと思います。
aws.amazon.com

ec2-schedulerでできること

構成

以下のような構成で提供されています。
https://d1.awsstatic.com/aws-answers/answers-images/instance-scheduler-architecture.727e008ced5a4b1b656b5c22afb4a2dfc32d7c33.png
デフォルトでは5分毎にLambdaが起動されて特定のタグが付与されているEC2、RDSの自動起動、停止を行っています。

デプロイ方法

以下のリンクからCloudFormationを該当リージョンから実行します。
リージョンごとに設定が必要になります。
Launch CloudFormation

設定方法

DynamoDBのConfigTableに以下のレコードを追加します。
テーブル名はスタック名+ConfigTableになっています。
ex)
月曜から金曜までの9:00 - 20:00(日本時間)の間だけEC2を起動する設定

scheduleレコード
type name description periods timezone
schedule ec2_stopstart ec2 stop start {"weekday_9to20"} Asia/Tokyo
periodレコード
type name description timezone weekdays begintime endtime
period weekday_9to20 Office hours Asia/Tokyo {"mon-fri"} 9:00 20:00

ちなみにconfigレコードはデフォルトタイムゾーンの設定やタグ名の設定など実行時の設定内容を保存しています。

また、レコードの登録については、DynamoDBをコンソールから直接変更してもいいですが、scheduler-cliという専用のCLIも用意されています。
docs.aws.amazon.com

EC2側にタグを付与

対象のEC2に以下のタグを付与します
f:id:cloudfish:20180413090128p:plain

動作確認

正しく設定されていれば、上記の時間以降に対象EC2が停止されます。
f:id:cloudfish:20180412145758p:plain

また、上記の時間設定であれば、9:00になれば対象EC2が起動されています。

注意点

設定した時間にEC2が停止もしくは起動されているわけではないので、例えば必ず9:00から使いたいのであれば、起動時間を考慮して早めに起動しておく必要があります。

このLambda自体のコストとしては、月額約5$程度となります。
起動・停止スケジュールを複数作ることもできますし、クロスアカウントでの実行や実行結果をCloudWatchのカスタムメトリクスに書き込むといったこともできるようです。かなり使いやすいと思うので導入を検討してみてはいかがでしょうか?

SecurityGroup設定のポイント

AWSのインフラ設計にあたって、セキュリティグループの設計は結構重要だと思うのですが、あまり意識して設計されていないケースも見受けられます。
メンテナンス性が考慮されていないと、後々かなり変更しづらくなりますし、誤って設定してしまい障害を発生させてしまうことも起こりえます。
そこで今回は一般的なWebシステム構成をベースにSecurityGroupの設定のポイントを紹介したいと思います。

環境

以下のようなWebシステムを前提に考えてみます。
f:id:cloudfish:20180221210403p:plain
※赤枠がアタッチするセキュリティグループになります。

基本的な設定ポイント

サービスのRole(役割)ごとにSecurityGroupを作成する
内部通信の許可はSourceにSecurityGroupを定義する(IPで定義しない。VPNやDXなどで外部と接続するようなケースはこの限りではないです)
管理用通信(ssh、監視)などは扱うグループ(開発者、インフラなど)ごとにSecurityGroupを分けて管理する。ただし、デフォルトで付与できるSGは5つまでなので分けすぎにも気をつけてください

具体的な設定内容は以下に記載します。
以下は全てInBound通信の設定となります。

サービスの役割ごとに応じたセキュリティグループ

ELB、Webサーバ、RDSがあるためそれぞれにセキュリティグループを作成します

ELB-SG(sg-elbsg)
Type Protocol Port Range Source Description
HTTP TCP 80 0.0.0.0/0 webアクセス用
HTTPS TCP 443 0.0.0.0/0 webアクセス用
WEB-SG(sg-websg)
Type Protocol Port Range Source Description
HTTP TCP 80 sg-elbsg elb→web通信用
HTTPS TCP 443 sg-elbsg elb→web通信用

※sourceにELBのセキュリティグループをセット

DB-SG(sg-dbsg)
Type Protocol Port Range Source Description
MYSQL/Aurora TCP 3306 sg-websg web→db通信用

※DBの種類に応じてポートは変更してください

管理用通信(ssh、監視)のセキュリティグループ

開発者とインフラ担当という想定で2つのセキュリティグループを作成しました。
ここは関係者に応じて作ってください。

DEV-SG(sg-devsg)
Type Protocol Port Range Source Description
SSH TCP 22 xxx.xxx.xxx.xxx/32
SSH TCP 22 xxx.xxx.xxx.xxx/32
INFRA-SG(sg-infrasg)
Type Protocol Port Range Source Description
SSH TCP 22 xxx.xxx.xxx.xxx/32
Custom TCP Rule TCP 1234 xxx.xxx.xxx.xxx/32 監視用

セキュリティグループについては送信元をIPではなくSGを指定することができます。
そうすることで通信経路が分かりやすくなるので、メンテナンス性も向上します。
また、例えばDBへの通信をWebサーバのIPで許可していたりすると、何かしらでIPが変更されたもしくはオートスケールでサーバが増えた際に、接続できなくなるトラブルも発生します。
こうしたことからもできるだけIPやCIDRで設定しない方向で設計してもらえればと思います。

また、OutBoundの通信の制限については、所属している組織のセキュリティポリシーにもよるかと思いますが、セキュアになる反面、トラブル時に原因が分かりにくくなる場合もあるのでその点を注意して使用するかどうかを判断してもらえればと思います。

AWS GlueのJob Bookmarkの使い方

実際にETLで処理するケースとしては、1日1回定期的に処理するなどのケースが多いと思います。
この場合、追加分のみを抽出してETL処理をする必要があります。
Glueには、前回どこまで処理したかを管理するJob Bookmarksという機能があります。
今回はこのJob Bookmarksを使ってみたいと思います。

確認用のETL処理

S3に配置したapacheアクセスログをparquet形式に変換します。
f:id:cloudfish:20180212212311p:plain

手順概要

①S3にapacheアクセスログを配置
②Crawler設定、実行
③Job作成、実行
④変換データ確認
⑤S3に追加のログをアップロード
⑥Job再実行
⑦変換データ確認

準備

以下のようなフォーマットのapacheログをそれぞれ10行程度ずつ2ファイル用意します。

200.69.224.58 - - [16/Jan/2018:18:34:42 +0900] "GET /item/computers/2216 HTTP/1.1" 200 65 "/item/games/643" "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)"
40.66.156.206 - - [16/Jan/2018:18:34:42 +0900] "GET /item/games/4918 HTTP/1.1" 200 73 "/item/games/2145" "Mozilla/5.0 (iPad; CPU OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
48.201.76.35 - - [16/Jan/2018:18:34:42 +0900] "GET /category/toys HTTP/1.1" 200 85 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; YTB730; GTB7.2; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET4.0C; .NET4.0E; Media Center PC 6.0)"

①S3にapacheアクセスログを配置

S3にbucketを作成し、準備で作成したファイルの一つ目をアップロードします。

②Crawler設定、実行

①で作成したS3用のクローラーを設定して実行します。

③Job作成、実行

左ペインのJobsを選択し「Add Job」をクリックしてJobを追加します。
この時、Job BookmarkをEnableに変更します。それ以外については環境に応じて設定してください。
f:id:cloudfish:20180212220239p:plain

Job bookmarkについては、以下の設定値があります。
デフォルトはDisableになります。

Job bookmark 説明
Enable 前回実行した以降のデータを処理
Disable 常に全データを処理
Pause 前回実行した以降のデータを処理するが、状態を更新しない

※Pauseはどういう時に使えばいいかが分かっていません。

Jobの設定が完了したら、実行してください。

④変換データ確認

上記でJobが正常に実行されたら、変換後のデータを確認します。
まずは、変換先のS3用のクローラーを設定します。
以下のようにクロール対象から_common_metadataと_metadataを除外するようにセットします。
f:id:cloudfish:20180212221916p:plain

クローラーを実行後、AthenaからSQLを実行してデータが変換されていることを確認します。
今回は1つ目のファイルにはアクセスログを10行用意しました。
f:id:cloudfish:20180212222220p:plain

⑤S3に追加のログをアップロード

準備で作成したapacheログファイルの二つ目をソースとなるS3にアップロードします。

⑥Job再実行

再度、同一Jobを実行します。

⑦変換データ確認

上記Jobが正常終了したら、Athenaから④で確認したの同様のSQLを実行して増分のみ追加されていることを確認してください。Job BookmarkがDisableの場合は、再実行時に全件処理されてしまいます。
もし想定通りの結果とならないようであればJob Bookmarkの設定値を再確認してください。
2つ目のファイルにはアクセスログを11行用意しているので、以下のとおり全部で21行となり増分データのみ追加されました
f:id:cloudfish:20180212223724p:plain

現状、このJob Bookmarkについては、ソースがS3のときのみ利用可能となっています。
RDSなどのDBの場合については、処理のなかでフィルタする必要があります。

実際にBookmarkを本番で使うには、ジョブがエラーとなった場合などどのようにリトライするかをしっかり想定しておく必要がありそうですね。

AWS Glueの開発環境をDockerで構築する(その2)

前回、Glueの開発環境をDockerで作りましたが、zeppelinコンテナを起動してホストからsshフォワードを実行して接続するという構成でホストを意識する必要がありましたが、せっかくzeppelinをdockerで作っているのでsshフォワードを行う部分もdockerで実現できないかと思い考えてみました。

今回作った構成

f:id:cloudfish:20180202231325p:plain
データを永続化するためにdataコンテナとsshフォワード用にssh forwardコンテナを追加することにしました。
そのためzeppelinコンテナからはssh forwardコンテナに接続しにいくことになります。
こうすることでssh forwardコンテナには名前でアクセスできるので、毎回設定変更する必要がなくなります。

手順

  • ssh forwardコンテナ作成
  • docker-compose.yml作成
  • 起動確認

ssh forwardコンテナ作成

以下のとおりDockerfileを作成します。

FROM ubuntu:latest
RUN apt-get -y update && apt-get -y upgrade
RUN apt-get -y install openssh-client net-tools 

ADD ssh_forward.sh /home/ubuntu/ssh_forward.sh
RUN chmod 644 /home/ubuntu/ssh_forward.sh

CMD ["/bin/sh","/home/ubuntu/ssh_forward.sh"]

コンテナ起動時に実行するssh_forward.shを作成します。
起動時に環境変数秘密鍵と接続先のIP or DNSを受取り鍵ファイルを作成して開発エンドポイントへ接続します。

#! /bin/sh

echo $SSH_RSA > dev_endpoint.pem
chmod 600 dev_endpoint.pem

ssh -g -i dev_endpoint.pem -NTL 9007:169.254.76.1:9007 glue@$HOST_ADDRESS -o StrictHostKeyChecking=no

ビルド実行

docker build -t ssh_forward .

docker-compose.ymlを作成

version: '2'
services:
  data:
    image: busybox
    container_name: app
    volumes:
      - /zeppelin/notebook

  zeppelin:
    image: apache/zeppelin:0.7.3
    ports: 
       - "8080:8080"
    volumes_from:
      - data
    restart: always
    links:
      - "ssh_server"

  ssh_server:
    image: ssh_forward
    environment:
       SSH_RSA: "[DevEndpoint接続用ssh秘密鍵]"
       HOST_ADDRESS: [GlueのDevEndpointのDNS or IP]

※DevEndpoint接続用ssh秘密鍵は改行コードを「\\n」に置換して1行にしてからセットしてください。

起動確認

busyboxapache/zeppellinについては予めdocker hubから取得しておいてください。
また、GlueのDevEndpointの設定も事前に必要になります。

docker-compose up

以下アドレスにアクセスして画面が表示されていれば成功です。
http://localhost:8080

f:id:cloudfish:20180202230306p:plain

これで全てdockerコンテナ上で完結できましたね。

AWS GlueでVPCフローログ用のclassifiersを作ってみた

GlueでVPCフローログをparquet形式に変換させる定期ジョブを作ろうと思いクロール処理を追加したところ、ビルトインのClassifiersにはなかったため自動でテーブル構造を認識してくれませんでした。
認識させるためにはカスタムClassifiersを作る必要があることが分かりました。
今回はVPCフローログ用にカスタムClassifiersの作り方を調べた備忘録です

GlueのビルトインのClassifiersについては、以下のページで確認できます。
ここになければ作る必要があります。
Adding Classifiers to a Crawler - AWS Glue

手順

  • Grokフィルタ作成
  • classifiersの作成
  • 実行確認

Grokフィルタ作成

まずログをマッチングして分類するためのフィルタを作る必要がりあります。
Grokフィルタの詳細についてはここを確認してください。
Grok filter plugin | Logstash Reference [6.1] | Elastic
また、Glueのビルトインパターンについては以下を参照してください。
Writing Custom Classifiers - AWS Glue

作ったGrokフィルタをcrawlerを使って逐一実行確認していはかなりの時間がかかるため、
以下のサービスを使ってGrokフィルタが想定どおりに動作しているかを事前に確認します。
Grok Debugger

f:id:cloudfish:20180128001400p:plain

カスタムパターンを作る時の正規表現の確認は、以下のサービスが便利でした
Online regex tester and debugger: PHP, PCRE, Python, Golang and JavaScript

classifiersの作成

左ペインから「classifiers」を選択し、「Add classifier」ボタンをクリックします。
今回は以下のように入力してCreateします。
classifier name:任意
classifier type : Grok
classification:任意
Grok pattern:

%{INT:version:int} %{NOTSPACE:account_id} %{ENI:eni_id} %{IP:srcaddr} %{IP:dstaddr} %{INT:srcport:int} %{INT:dstport:int} %{INT:protocol:int} %{INT:packets:int} %{NUMBER:bytes} %{NUMBER:start} %{NUMBER:end} %{NOTSPACE:action} %{NOTSPACE:log_status}

Custome pattern:

ENI [eni-]+[a-z0-9]{8,8}

上記のgrokパターンは適切ではないと思いますが、とりあえずVPCフローログがマッチするはずです
f:id:cloudfish:20180128002514p:plain

実行確認

上記設定ができれば実際にクロールしてみましょう。
Add crawlerを実行すると以下のような画面になります。
左下のclassifierリストから今回作ったclassifierをAddします。
vpcフローログのあるS3パスを指定して、それ以外は画面に沿って入力しRun crawlerを実行してください。
f:id:cloudfish:20180128003119p:plain

正しく分類できていれば、テーブルの詳細を確認すると以下のようになっていると思います。
うまくいかなかった場合は、ClassificationにUnknownと表示されますのでその場合は修正して再度確認してください。
ただし、classifiersを更新した際は、crawlerを新規に作成しないと反映されないようなので注意してください。
f:id:cloudfish:20180128003850p:plain

grokフィルタについては、少しクセがあり慣れるのに少し時間がかかりそうですが、Debuggerもあるのでうまく活用して慣れていきたいと思います。

AWS Glueで新しくScalaがサポートされました

AWS GlueのETLスクリプトを作成する言語として、新たにScalaが追加されました。

画面を確認すると以下のようにPythonに加えてScalaも選択できるようになっています。
f:id:cloudfish:20180115231204p:plain

以下はScalaで自動生成されたETLスクリプトになります。

import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    // @type: DataSource
    // @args: [database = "sampledb", table_name = "vpc_flow_logs", transformation_ctx = "datasource0"]
    // @return: datasource0
    // @inputs: []
    val datasource0 = glueContext.getCatalogSource(database = "sampledb", tableName = "vpc_flow_logs", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
    // @type: ApplyMapping
    // @args: [mapping = [("version", "int", "version", "int"), ("account", "string", "account", "string"), ("interfaceid", "string", "interfaceid", "string"), ("sourceaddress", "string", "sourceaddress", "string"), ("destinationaddress", "string", "destinationaddress", "string"), ("sourceport", "int", "sourceport", "int"), ("destinationport", "int", "destinationport", "int"), ("protocol", "int", "protocol", "int"), ("packets", "int", "packets", "int"), ("bytes", "int", "bytes", "int"), ("starttime", "int", "starttime", "int"), ("endtime", "int", "endtime", "int"), ("action", "string", "action", "string"), ("logstatus", "string", "logstatus", "string")], transformation_ctx = "applymapping1"]
    // @return: applymapping1
    // @inputs: [frame = datasource0]
    val applymapping1 = datasource0.applyMapping(mappings = Seq(("version", "int", "version", "int"), ("account", "string", "account", "string"), ("interfaceid", "string", "interfaceid", "string"), ("sourceaddress", "string", "sourceaddress", "string"), ("destinationaddress", "string", "destinationaddress", "string"), ("sourceport", "int", "sourceport", "int"), ("destinationport", "int", "destinationport", "int"), ("protocol", "int", "protocol", "int"), ("packets", "int", "packets", "int"), ("bytes", "int", "bytes", "int"), ("starttime", "int", "starttime", "int"), ("endtime", "int", "endtime", "int"), ("action", "string", "action", "string"), ("logstatus", "string", "logstatus", "string")), caseSensitive = false, transformationContext = "applymapping1")
    // @type: ResolveChoice
    // @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
    // @return: resolvechoice2
    // @inputs: [frame = applymapping1]
    val resolvechoice2 = applymapping1.resolveChoice(choiceOption = Some(ChoiceOption("make_struct")), transformationContext = "resolvechoice2")
    // @type: DropNullFields
    // @args: [transformation_ctx = "dropnullfields3"]
    // @return: dropnullfields3
    // @inputs: [frame = resolvechoice2]
    val dropnullfields3 = resolvechoice2.dropNulls(transformationContext = "dropnullfields3")
    // @type: DataSink
    // @args: [connection_type = "s3", connection_options = {"path": "s3://glue-test-out-bucket"}, format = "parquet", transformation_ctx = "datasink4"]
    // @return: datasink4
    // @inputs: [frame = dropnullfields3]
    val datasink4 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://glue-test-out-bucket"}"""), transformationContext = "datasink4", format = "parquet").writeDynamicFrame(dropnullfields3)
    Job.commit()
  }
}

Sparkの利用者はPythonScalaのどちらの言語の利用が多いんでしょうか?
いずれにせよ言語の選択肢が増えるのはすごくいいですね。