PySparkでMySQLからのデータ取得&集計方法

MySQLに対してSQLでよくやるようなデータの取得や集計などをPySparkのDataFrameだとどうやるのか調べてみましたので、備忘録として残しておきたいと思います。
検証環境は以前紹介したDockerではじめるPySparkをベースにDockerで環境を構築しいます。
こういった検証にDockerはすごく便利でいいですね

環境

  • PySpark 2.2
  • MySQL5.7

データはMySQLの公式でサンプルとして提供されているworldデータベースを利用します。

環境の構築

利用するDockerイメージ

以下の通りdocker-compose.ymlを作成します。

version: '2'
services:
 pyspark:
    image: cloudfish/pyspark-notebook
    volumes:
       - LOCAL_PATH:/home/jovyan/work
    ports:
      - "8888:8888"
    command: bash -c "start-notebook.sh --NotebookApp.token=''"
    links:
      - dbserver
    environment:
      GRANT_SUDO: "yes"
  dbserver:
    image: kakakakakku/mysql-57-world-database
    environment:
      MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
  phpmyadmi:
    image: phpmyadmin/phpmyadmin
    ports: 
      - "18080:80"
    links:
      - "dbserver"
    environment:
      PMA_HOST: dbserver
      PMA_USER: root 
      PMA_PASSWORD: ""

Docker起動

docker-compose up

Jupyter Notebook画面確認
http://localhost:8888
f:id:cloudfish:20180727134630p:plain
phpmyadmin画面確認
http://localhost:18080
f:id:cloudfish:20180730141054p:plain

PySparkの実行確認

早速サンプルデータベースで実行確認を進めていきます。worldデータベースは以下のようなテーブルが含まれています。
これらのテーブルを使ってデータを取得してみたいと思います。

Tables_in_world
city
country
countrylanguage

画面右端のNewボタンをクリックしPython3を選択し、開いた画面で以下を入力していきます。
f:id:cloudfish:20180727134709p:plain

以下のコードはコードセルごとに入力してください。入力後Shift + Enterでコードが実行されます。

Sparkの初期化処理

from pyspark.sql import SQLContext, Row
from pyspark import SparkContext

sc = SparkContext("local", "First App")

※2回実行するとエラーになります。

JDBC接続処理

JDBCに接続しています。

sqlContext = SQLContext(sc)
jdbc_url="jdbc:mysql://dbserver/mysql"
driver_class="com.mysql.jdbc.Driver"

DB_USER="root"
DB_PASSWORD=""

def load_dataframe(table):
  df=sqlContext.read.format("jdbc").options(
    url =jdbc_url,
    driver=driver_class,
   dbtable=table,
   user=DB_USER,
    password=DB_PASSWORD
  ).load()
  return df

データ取得処理

データをDataFrameに取得します
テーブル指定でデータ取得する場合(countryテーブル、cityテーブルを取得)

df_country = load_dataframe("world.country")
# 実行されるSQL:SELECT * FROM world.country WHERE 1=0
df_city = load_dataframe("world.city")
# 実行されるSQL:SELECT * FROM world.city WHERE 1=0

SQLを指定してデータ取得する場合(cityテーブルの国コードがJPNのものだけを取得)

df_city_japan = load_dataframe("(select * from world.city where CountryCode='JPN') city_japan")
# 実行されるSQL:SELECT * FROM (select * from world.city where CountryCode='JPN') city_japan WHERE 1=0

MySQL側でどのようなSQLが流れるのか見てみたところ、テーブルにセットした内容がFROM句の後にセットされるようです。

カラム指定

Nameカラムを表示

df_country.select("Name").show()

条件検索

国名がJapanのデータを抽出。

df_country.filter(df_country["Name"] == "Japan").show()
df_country.where(df_country["Name"] == "Japan").select("Code","GNP").show()

isNull

独立年がNullのデータを抽出

df_country.where(df_country["IndepYear"].isNull()).show()

like

国名がJで始まるデータを抽出

df_country.where(df_country["Name"].like("J%")).show()

Case When式

人口が100000人より大きい場合は「Big」、小さい場合は「Small」を表示

from pyspark.sql import functions as F
df_country.select(df_country["Name"], F.when(df_country["Population"] > 100000,"Big").otherwise("Small").alias("CountryDiv")).show()

substr

国名を3文字切り出して表示

df_country.select(df_country["Name"].substr(1,3)).show()

limit

5件のみ抽出

df_country.limit(5).show()

Join

countryテーブルとcityテーブルを結合し国名がJapanのものを抽出

df_join = df_country.alias('country').join(df_city.alias('city'),(df_city["countryCode"] == df_country["Code"]) & (df_country["Name"]=="Japan")).show()

OrderBy

GNPが高い国順に表示

df_country.orderBy("GNP" , ascending=False).select("Code","Name","GNP").show()

GroupBy

国ごとにグループ化し、cityの数、人口の平均と合計を集計

from pyspark.sql import functions as F

df_city.groupBy("countryCode") \
 .agg( \
    F.count(df_city["Name"]).alias("total_count"), \
    F.avg(df_city["Population"]).alias("avg_population"), \
    F.sum(df_city["Population"]).alias("sum_population") \
).show()

まとめ

書き方は少し慣れる必要がありますが、かなりSQLに近いイメージでデータ取得が可能なことが分かりました。
今回はDBに対して実行しましたが、ファイルに対しても同様に実行可能です。

CodeBuildのbuildspec.ymlで変数定義ができない

CodeBuildを利用していてbuildspec.ymlを書いていたなかで、環境変数ではなく変数定義して使いたいと思い試していたのですが、
どうもうまくいかずハマりましたので備忘録としてのことしておきたいと思います。

やりたかったこと

buildspec.ymlのビルド手順で処理途中で変数定義を行いたい

簡単なサンプルですが、以下のようなbuildspec.ymlがあったとして、実行するとバージョンが表示されることを期待しました。

  build:
    commands:
      - $version=1.0
      - echo $version

ですが、実行してもこの状態だと何も出力されず、
exportしてみたりと色々試してみましたが思うような結果は得られませんでした。

最終的に行ごとにセッションが新しく張られてるのではないかと考え以下のように変数を引き継ぎたい処理をシェルスクリプト化することで解決しました。

hoge.sh

$version=1.0
echo $version

buildspec.yml

  build:
    commands:
      - sh ./build.sh

見た目そのままシェルスクリプトなので、同じように使えるかと思いましたがそうではないんですね。
シェルスクリプトにすることで一見して処理が見えなくなるのも微妙な感じですが、まとまった処理であればスクリプト化するのもいいかと思います。

AWS BatchでEFSをマウントしディスク共有してみる

もうすぐEFSが東京リージョンに来ますね。最近はAWS Batchを触る機会が多くなってきたので、AWS BatchでEFSを使ってのディスク共有を検証してみました。
まだ東京リージョンにはないので今回はバージニアリージョンを利用して検証してみます。

構成

f:id:cloudfish:20180707232903p:plain

手順概要

1. EFSの作成
2. Batch用イメージの作成
3. AWS Batchの設定
4. 実行確認

1. EFSの作成

まずAWS Batch用とEFS用にセキュリティグループを作成します。

Batch-SG(sg-batchsg)
Type Protocol Port Range Source Description
SSH TCP 22 xxx.xxx.xxx.xxx/32 ssh
EFS-SG(sg-efssg)
Type Protocol Port Range Source Description
TCP TCP 2049 sg-batchsg batch→efs通信用

Batch用インスタンスからEFSに接続可能なように設定しておきます。

AWSコンソールからEFSを選択してEFSを作成します。
今回はデフォルトVPCとして、セキュリティグループは上で作成したEFS用のセキュリティグループを設定します。
ゾーンごとにセキュリティグループを設定するのは何だかなーという印象です。
f:id:cloudfish:20180705001335p:plain

作成後にDNS nameが割り当てられるので控えておいてください。
f:id:cloudfish:20180705102246p:plain

2. Batch用イメージの作成

ECS用のAMIからEC2を新規に作成し、起動時に指定のEFSを自動マウントする設定を行います。
バージニアリージョンの場合は以下のAMIから作成しました。
amzn-ami-2017.09.l-amazon-ecs-optimized (ami-aff65ad2)

以下にECS対応のAMIの一覧が記載されていますので参考にしてください。
ただし、AWS Batchのマネージド型で起動したインスタンスについては、2017.09のもので起動されていましたのでここではそれをベースとしています。
Amazon ECS 対応 AMI - Amazon Elastic Container Service

起動完了後にSSHでログインしrootユーザーで以下の設定を行います。

# マウント用のディレクトリ作成
mkdir /efs
# NFSクライアントのインストール
yum install -y nfs-utils

ここではEFSのFile system IDをfs-1234abcdとしていますが、実際のIDに合わせて変更してください。

# EFSをマウント
mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 fs-1234abcd.efs.us-east-1.amazonaws.com:/ /efs

# 起動時にマウントするようにfstabを更新
echo 'fs-1234abcd.efs.us-east-1.amazonaws.com:/ /efs nfs4 nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 0 0' | sudo tee -a /etc/fstab

設定完了後にEC2を再起動し、EFSがマウントできていることを確認してください。
確認完了すれば、EC2を停止しAMIを作成し、それをAWS Batchの実行イメージとします。
※ 本番で利用する場合は、サポートログが出力されるefs-utilsのマウントヘルパーの利用を推奨

3. AWS Batchの設定

コンピューティング環境を作成

以下の設定で作成します

設定内容 設定値
コンピューティング環境のタイプ マネージド型
最小vCPU 1
必要なvCPU 1

以下の通りユーザー指定のAMIを有効にします。
f:id:cloudfish:20180705103411p:plain
上記以外はデフォルトで作成してください。

起動したインスタンスssh接続

dockerコンテナからマウントした時の確認用として/efs配下に以下のファイル作成しておきます。

mkdir -p /efs/files
$echo "Hello EFS!!" > /efs/files/test.txt
$cat /efs/files/test.txt
Hello EFS!!
ジョブ定義を作成します。

事前に以下を参照しroleを作成しておきます。
Amazon ECS コンテナインスタンスの IAM ロール - Amazon Elastic Container Service

busyboxを使ってホストの/efsディレクトリをコンテナの/homeディレクトリにマウントする設定を行います。
コンテナ起動時にtest.txtが表示することでマウントされていることを確認します。

設定内容 設定値
コンテナイメージ busybox
コマンド cat /home/files/test.txt
vCPU 1
メモリ 128

ボリューム

設定内容 設定値
名前 efs
ソースパス /efs

マウントポイント

設定内容 設定値
コンテナパス /home
ソースパス efs
ジョブキューの作成
設定内容 設定値
優先度 1
コンピューティング環境を選択 上記で作成したコンピューティング環境名

ジョブ実行

左ペインの「ジョブ」から「ジョブ送信」を選択します。

設定内容 設定値
ジョブ名 任意
ジョブ定義 上記で作成したジョブ定義名
ジョブキュー 上記で作成したジョブキュー名

他はデフォルトでOKです

実行結果

実行後しばらくすると正常に終了した場合は、コンソールのジョブのsucceededにジョブが表示されます。
cloudwatch logsに以下ログが出力されていれば成功です。
f:id:cloudfish:20180703152027p:plain

これまでAWS Batchのジョブ間でファイルを共有する場合は、S3が主な選択肢でしたが、今後はEFSも選択肢として使えそうです。
S3では耐えきれないワークロードなどで利用検討してみてください。

AWS Certified Big Data – Specialty 認定試験を受けました

AWSの認定資格でビッグデータの専門知識を問われる試験のAWS Certified Big Data – Specialtyを受験しました。何とか合格できたので、この試験に向けて勉強した内容などを残しておきたいと思います。
 

試験概要

試験の概要は以下参照
AWS 認定ビッグデータ – 専門知識

受験資格

以下のいずれかの試験に合格している必要があります。
AWS 認定クラウドプラクティショナー
AWS 認定ソリューションアーキテクト – アソシエイト
AWS 認定デベロッパー – アソシエイト
AWS 認定 SysOps アドミニストレーター – アソシエイト
 

試験対策

出題範囲の確認

関連するAWSのサービスについては、以下のブログが参考になります。
AWS Certified Big Data – Specialty – Blue Clouds

サンプル問題

サンプル問題は必ず解きましょう。現時点で模擬試験は提供されてないようです。
サンプル問題

参考資料

勉強にあたっては、BlackbeltをはじめにAWSのホワイトペーパーのビッグデータ関連の資料などを読みました。
実際に各サービスを動かして確認していればなおいいと思います。

英語

現時点で日本語の試験は提供されておらず英語での試験しか提供されていません。
これについては、英語のAWSドキュメントを読むことで勉強しました。
また、他の試験の英語のサンプル問題を読むことで問題文に慣れました。

所感

難易度はアソシエイトとプロフェッショナルの間くらいに感じましたが、英語が得意ではないのが一番の難関でした。170分という試験時間はかなり長いのですが、結局見直す時間もないままぎりぎりまでかかってしまいました。
次はAdvanced Networking - Specialtyに挑戦してみたいと思います。

PHPのファイルアップロード機能をサイト別に切り替える

PHPのファイルアップロード機能をサイトごとにhtaccessで切り替えたいというケースがあり、実現方法について調査したので備忘録として残しておきたいと思います。

アップロードするファイルサイズの切り替えはケースとしてはよくあるのでネット上に情報があがってましたが、ファイルのアップロード機能自体を切り替える例は見つからりませんでした。htaccessで同じようにできるか試してみたところうまくいかずしばらくハマりました。
結論としてはhtaccessではできずにhttpd.confに設定することでサイトごとの設定を変更することができました。

設定方法

以下example.comというvirtualhostに設定する場合の記載例です。

<VirtualHost *:80>
  ServerName example.com
  DocumentRoot /var/www/example.com/
  php_admin_flag file_uploads on  ← 追加
</VirtualHost>

htaccessでできなかった理由は、各設定内容がそれぞれモードをもっており、そのモードに応じて設定できる場所が決まっていました。
file_uploadsについてはPHP_INI_SYSTEMというモードになっており、設定可能な箇所はphp.iniかhttpd.confで設定可能となっています。

f:id:cloudfish:20180601135736p:plain
PHPマニュアル抜粋

fish shellでaws-cliのコマンド補完

fish shellでaws cliのコマンド補完を設定してみたので設定方法を紹介したいと思います。

環境

Mac Sierra
fish 2.2.0

設定方法

bashzshについては以下の公式で設定方法が紹介されているのですがfishはありませんでした。
docs.aws.amazon.com

色々調べたところ以下をfishのconfig(~/.config/fish/config.fish)に設定することでコマンド補完ができます。

 complete -c aws -f -a '(begin; set -lx COMP_SHELL fish; set -lx COMP_LINE (commandline); /usr/local/bin/aws_completer; end)'

設定後にテストを行ったところ、以下のように補完候補の末尾に何故かバックスラッシュ(\)がセットされてしまいました。

> ~ aws  acm\                                                         
acm\                       ecs\                            mobile\
alexaforbusiness\          efs\                            mq\
apigateway\                elasticache\                    mturk\
application-autoscaling\   elasticbeanstalk\               opsworks\
appstream\                 elastictranscoder\              opsworks-cm
appsync\                   elb\                            organizations\
athena\                    elbv2\                          pinpoint\
autoscaling\               emr\                            polly\

そのため、補完はされるのですが、このまま実行してもエラーになるので、いちいちバックスラッシュを消す必要があります。
これではせっかく補完されてもあまり嬉しくないので、回避策がないかさらに調べてみました。

以下のとおり補完プログラムを修正することで対応できました。
■修正ファイル
/usr/local/lib/python2.7/site-packages/awscli/completer.py(パスは環境にあわせてください)
■修正箇所

def complete(cmdline, point):
    choices = Completer().complete(cmdline, point)
    #print(' \n'.join(choices)) ← コメントアウト
    print('\n'.join(choices)) ←追加

なぜかスペースをバックスラッシュとして解釈してしまうようなのでスペースを削除します。

再度テストするとバックスラッシュが除去されています。

> ~ aws  acm
acm                       ecs                            mobile
alexaforbusiness          efs                            mq
apigateway                elasticache                    mturk
application-autoscaling   elasticbeanstalk               opsworks
appstream                 elastictranscoder              opsworks-cm
appsync                   elb                            organizations
athena                    elbv2                          pinpoint
autoscaling               emr                            polly

これで快適にコマンド補完できました。
ただし、残念なことにawscliのアップデートの度に修正が必要になりそうです。

ec2-scheduleでEC2のインスタンスタイプの自動変更

ec2-scheduleではインスタンスタイプの変更も可能です。
今回はインスタンスタイプを自動で変更する方法を紹介したいと思います。

設定方法

DynamoDBのConfigTableに以下を設定

テーブル名はスタック名+ConfigTableになっています。
ex)
月曜から金曜までの9:00 - 20:00(日本時間)t2.micro
月曜から金曜までの20:00 - 9:00(日本時間)t2.nano

scheduleレコード
type name description periods timezone
schedule ec2_type_change ec2 type change { "down-period@t2.nano", "up-period@t2.micro" } Asia/Tokyo
periodレコード
type name description timezone weekdays begintime endtime
period up-period Office hours Asia/Tokyo {"mon-fri"} 9:00 19:59
period down-period Office hours Asia/Tokyo {"mon-fri"} 20:00 08:59

EC2の設定

以下のとおりタグを付与します
f:id:cloudfish:20180413103933p:plain

動作確認

インスタンスタイプがt2.nanoの状態です。
f:id:cloudfish:20180413103747p:plain

設定時間がすぎるとt2.microに変更されて起動されました。
f:id:cloudfish:20180413105224p:plain

変更の動きとしては、9:00に動くLambdaで一旦EC2が停止されます。
その後、9:05のLambdaでインスタンスタイプが変更されて起動されます。
停止に時間がかかる場合は、変更されるタイミングももう少し遅くなるかと思います。

インスタンスタイプを自動で変更してくれるのはすごく便利な機能ですね。
平日は大きめのインスタンスタイプとし、週末はタイプを落とすなどしたいユースケースなどで使えそうです。
ただし、稀に変更するインスタンスタイプが枯渇しているため起動できないなどのケースも想定されるので、本番での利用についてはそのあたりの考慮が必要になりますがぜひ利用を検討してみてください