PySparkでMySQLからのデータ取得&集計方法
MySQLに対してSQLでよくやるようなデータの取得や集計などをPySparkのDataFrameだとどうやるのか調べてみましたので、備忘録として残しておきたいと思います。
検証環境は以前紹介したDockerではじめるPySparkをベースにDockerで環境を構築しいます。
こういった検証にDockerはすごく便利でいいですね
環境の構築
利用するDockerイメージ
- cloudfish/pyspark-notebook(公式のpysapark-notebookにJDBCを同梱したイメージ)
- kakakakakku/mysql-57-world-database(mysqlはworldデータベースを取り込んだイメージ。Redash を詳しく学べる「Redash ハンズオン資料」を作ったのイメージを利用させていただいています。)
- phpmyadmin/phpmyadmin(データ確認用として公式のphpmyadminのイメージ)
以下の通りdocker-compose.ymlを作成します。
version: '2' services: pyspark: image: cloudfish/pyspark-notebook volumes: - LOCAL_PATH:/home/jovyan/work ports: - "8888:8888" command: bash -c "start-notebook.sh --NotebookApp.token=''" links: - dbserver environment: GRANT_SUDO: "yes" dbserver: image: kakakakakku/mysql-57-world-database environment: MYSQL_ALLOW_EMPTY_PASSWORD: "yes" phpmyadmi: image: phpmyadmin/phpmyadmin ports: - "18080:80" links: - "dbserver" environment: PMA_HOST: dbserver PMA_USER: root PMA_PASSWORD: ""
Docker起動
docker-compose up
Jupyter Notebook画面確認
http://localhost:8888
phpmyadmin画面確認
http://localhost:18080
PySparkの実行確認
早速サンプルデータベースで実行確認を進めていきます。worldデータベースは以下のようなテーブルが含まれています。
これらのテーブルを使ってデータを取得してみたいと思います。
Tables_in_world |
---|
city |
country |
countrylanguage |
画面右端のNewボタンをクリックしPython3を選択し、開いた画面で以下を入力していきます。
以下のコードはコードセルごとに入力してください。入力後Shift + Enterでコードが実行されます。
Sparkの初期化処理
from pyspark.sql import SQLContext, Row from pyspark import SparkContext sc = SparkContext("local", "First App")
※2回実行するとエラーになります。
JDBC接続処理
JDBCに接続しています。
sqlContext = SQLContext(sc) jdbc_url="jdbc:mysql://dbserver/mysql" driver_class="com.mysql.jdbc.Driver" DB_USER="root" DB_PASSWORD="" def load_dataframe(table): df=sqlContext.read.format("jdbc").options( url =jdbc_url, driver=driver_class, dbtable=table, user=DB_USER, password=DB_PASSWORD ).load() return df
データ取得処理
データをDataFrameに取得します
テーブル指定でデータ取得する場合(countryテーブル、cityテーブルを取得)
df_country = load_dataframe("world.country") # 実行されるSQL:SELECT * FROM world.country WHERE 1=0 df_city = load_dataframe("world.city") # 実行されるSQL:SELECT * FROM world.city WHERE 1=0
SQLを指定してデータ取得する場合(cityテーブルの国コードがJPNのものだけを取得)
df_city_japan = load_dataframe("(select * from world.city where CountryCode='JPN') city_japan") # 実行されるSQL:SELECT * FROM (select * from world.city where CountryCode='JPN') city_japan WHERE 1=0
MySQL側でどのようなSQLが流れるのか見てみたところ、テーブルにセットした内容がFROM句の後にセットされるようです。
カラム指定
Nameカラムを表示
df_country.select("Name").show()
条件検索
国名がJapanのデータを抽出。
df_country.filter(df_country["Name"] == "Japan").show() df_country.where(df_country["Name"] == "Japan").select("Code","GNP").show()
isNull
独立年がNullのデータを抽出
df_country.where(df_country["IndepYear"].isNull()).show()
like
国名がJで始まるデータを抽出
df_country.where(df_country["Name"].like("J%")).show()
Case When式
人口が100000人より大きい場合は「Big」、小さい場合は「Small」を表示
from pyspark.sql import functions as F df_country.select(df_country["Name"], F.when(df_country["Population"] > 100000,"Big").otherwise("Small").alias("CountryDiv")).show()
substr
国名を3文字切り出して表示
df_country.select(df_country["Name"].substr(1,3)).show()
limit
5件のみ抽出
df_country.limit(5).show()
Join
countryテーブルとcityテーブルを結合し国名がJapanのものを抽出
df_join = df_country.alias('country').join(df_city.alias('city'),(df_city["countryCode"] == df_country["Code"]) & (df_country["Name"]=="Japan")).show()
OrderBy
GNPが高い国順に表示
df_country.orderBy("GNP" , ascending=False).select("Code","Name","GNP").show()
GroupBy
国ごとにグループ化し、cityの数、人口の平均と合計を集計
from pyspark.sql import functions as F df_city.groupBy("countryCode") \ .agg( \ F.count(df_city["Name"]).alias("total_count"), \ F.avg(df_city["Population"]).alias("avg_population"), \ F.sum(df_city["Population"]).alias("sum_population") \ ).show()
まとめ
書き方は少し慣れる必要がありますが、かなりSQLに近いイメージでデータ取得が可能なことが分かりました。
今回はDBに対して実行しましたが、ファイルに対しても同様に実行可能です。
CodeBuildのbuildspec.ymlで変数定義ができない
CodeBuildを利用していてbuildspec.ymlを書いていたなかで、環境変数ではなく変数定義して使いたいと思い試していたのですが、
どうもうまくいかずハマりましたので備忘録としてのことしておきたいと思います。
やりたかったこと
buildspec.ymlのビルド手順で処理途中で変数定義を行いたい
簡単なサンプルですが、以下のようなbuildspec.ymlがあったとして、実行するとバージョンが表示されることを期待しました。
build: commands: - $version=1.0 - echo $version
ですが、実行してもこの状態だと何も出力されず、
exportしてみたりと色々試してみましたが思うような結果は得られませんでした。
最終的に行ごとにセッションが新しく張られてるのではないかと考え以下のように変数を引き継ぎたい処理をシェルスクリプト化することで解決しました。
hoge.sh
$version=1.0 echo $version
buildspec.yml
build: commands: - sh ./build.sh
見た目そのままシェルスクリプトなので、同じように使えるかと思いましたがそうではないんですね。
シェルスクリプトにすることで一見して処理が見えなくなるのも微妙な感じですが、まとまった処理であればスクリプト化するのもいいかと思います。
AWS BatchでEFSをマウントしディスク共有してみる
もうすぐEFSが東京リージョンに来ますね。最近はAWS Batchを触る機会が多くなってきたので、AWS BatchでEFSを使ってのディスク共有を検証してみました。
まだ東京リージョンにはないので今回はバージニアリージョンを利用して検証してみます。
構成
手順概要
1. EFSの作成
2. Batch用イメージの作成
3. AWS Batchの設定
4. 実行確認
1. EFSの作成
まずAWS Batch用とEFS用にセキュリティグループを作成します。
2. Batch用イメージの作成
ECS用のAMIからEC2を新規に作成し、起動時に指定のEFSを自動マウントする設定を行います。
バージニアリージョンの場合は以下のAMIから作成しました。
amzn-ami-2017.09.l-amazon-ecs-optimized (ami-aff65ad2)
以下にECS対応のAMIの一覧が記載されていますので参考にしてください。
ただし、AWS Batchのマネージド型で起動したインスタンスについては、2017.09のもので起動されていましたのでここではそれをベースとしています。
Amazon ECS 対応 AMI - Amazon Elastic Container Service
起動完了後にSSHでログインしrootユーザーで以下の設定を行います。
# マウント用のディレクトリ作成 mkdir /efs # NFSクライアントのインストール yum install -y nfs-utils
ここではEFSのFile system IDをfs-1234abcdとしていますが、実際のIDに合わせて変更してください。
# EFSをマウント mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 fs-1234abcd.efs.us-east-1.amazonaws.com:/ /efs # 起動時にマウントするようにfstabを更新 echo 'fs-1234abcd.efs.us-east-1.amazonaws.com:/ /efs nfs4 nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 0 0' | sudo tee -a /etc/fstab
設定完了後にEC2を再起動し、EFSがマウントできていることを確認してください。
確認完了すれば、EC2を停止しAMIを作成し、それをAWS Batchの実行イメージとします。
※ 本番で利用する場合は、サポートログが出力されるefs-utilsのマウントヘルパーの利用を推奨
3. AWS Batchの設定
コンピューティング環境を作成
以下の設定で作成します
設定内容 | 設定値 |
---|---|
コンピューティング環境のタイプ | マネージド型 |
最小vCPU | 1 |
必要なvCPU | 1 |
以下の通りユーザー指定のAMIを有効にします。
上記以外はデフォルトで作成してください。
起動したインスタンスにssh接続
dockerコンテナからマウントした時の確認用として/efs配下に以下のファイル作成しておきます。
mkdir -p /efs/files $echo "Hello EFS!!" > /efs/files/test.txt $cat /efs/files/test.txt Hello EFS!!
ジョブ定義を作成します。
事前に以下を参照しroleを作成しておきます。
Amazon ECS コンテナインスタンスの IAM ロール - Amazon Elastic Container Service
busyboxを使ってホストの/efsディレクトリをコンテナの/homeディレクトリにマウントする設定を行います。
コンテナ起動時にtest.txtが表示することでマウントされていることを確認します。
設定内容 | 設定値 |
---|---|
コンテナイメージ | busybox |
コマンド | cat /home/files/test.txt |
vCPU | 1 |
メモリ | 128 |
ボリューム
設定内容 | 設定値 |
---|---|
名前 | efs |
ソースパス | /efs |
マウントポイント
設定内容 | 設定値 |
---|---|
コンテナパス | /home |
ソースパス | efs |
ジョブキューの作成
設定内容 | 設定値 |
---|---|
優先度 | 1 |
コンピューティング環境を選択 | 上記で作成したコンピューティング環境名 |
ジョブ実行
左ペインの「ジョブ」から「ジョブ送信」を選択します。
設定内容 | 設定値 |
---|---|
ジョブ名 | 任意 |
ジョブ定義 | 上記で作成したジョブ定義名 |
ジョブキュー | 上記で作成したジョブキュー名 |
他はデフォルトでOKです
実行結果
実行後しばらくすると正常に終了した場合は、コンソールのジョブのsucceededにジョブが表示されます。
cloudwatch logsに以下ログが出力されていれば成功です。
これまでAWS Batchのジョブ間でファイルを共有する場合は、S3が主な選択肢でしたが、今後はEFSも選択肢として使えそうです。
S3では耐えきれないワークロードなどで利用検討してみてください。
AWS Certified Big Data – Specialty 認定試験を受けました
AWSの認定資格でビッグデータの専門知識を問われる試験のAWS Certified Big Data – Specialtyを受験しました。何とか合格できたので、この試験に向けて勉強した内容などを残しておきたいと思います。
試験概要
試験の概要は以下参照
AWS 認定ビッグデータ – 専門知識
受験資格
以下のいずれかの試験に合格している必要があります。
AWS 認定クラウドプラクティショナー
AWS 認定ソリューションアーキテクト – アソシエイト
AWS 認定デベロッパー – アソシエイト
AWS 認定 SysOps アドミニストレーター – アソシエイト
試験対策
出題範囲の確認
関連するAWSのサービスについては、以下のブログが参考になります。
AWS Certified Big Data – Specialty – Blue Clouds
サンプル問題
サンプル問題は必ず解きましょう。現時点で模擬試験は提供されてないようです。
サンプル問題
参考資料
勉強にあたっては、BlackbeltをはじめにAWSのホワイトペーパーのビッグデータ関連の資料などを読みました。
実際に各サービスを動かして確認していればなおいいと思います。
blackbelt
- AWS Black Belt Online Seminar Amazon Redshift
- AWS Black Belt Online Seminar 2017 Amazon DynamoDB
- AWS Black Belt Online Seminar 2017 Amazon Kinesis
- AWS Black Belt Online Seminar 2017 Amazon EMR
- AWS Black Belt Online Seminar 2016 AWS IoT
- 20180228 AWS Black Belt Online Seminar QuickSight
- 20180424 AWS Black Belt Online Seminar AWSで構築するデータレイク基盤のアーキテクチャ
英語
現時点で日本語の試験は提供されておらず英語での試験しか提供されていません。
これについては、英語のAWSドキュメントを読むことで勉強しました。
また、他の試験の英語のサンプル問題を読むことで問題文に慣れました。
所感
難易度はアソシエイトとプロフェッショナルの間くらいに感じましたが、英語が得意ではないのが一番の難関でした。170分という試験時間はかなり長いのですが、結局見直す時間もないままぎりぎりまでかかってしまいました。
次はAdvanced Networking - Specialtyに挑戦してみたいと思います。
PHPのファイルアップロード機能をサイト別に切り替える
PHPのファイルアップロード機能をサイトごとにhtaccessで切り替えたいというケースがあり、実現方法について調査したので備忘録として残しておきたいと思います。
アップロードするファイルサイズの切り替えはケースとしてはよくあるのでネット上に情報があがってましたが、ファイルのアップロード機能自体を切り替える例は見つからりませんでした。htaccessで同じようにできるか試してみたところうまくいかずしばらくハマりました。
結論としてはhtaccessではできずにhttpd.confに設定することでサイトごとの設定を変更することができました。
設定方法
以下example.comというvirtualhostに設定する場合の記載例です。
<VirtualHost *:80> ServerName example.com DocumentRoot /var/www/example.com/ php_admin_flag file_uploads on ← 追加 </VirtualHost>
htaccessでできなかった理由は、各設定内容がそれぞれモードをもっており、そのモードに応じて設定できる場所が決まっていました。
file_uploadsについてはPHP_INI_SYSTEMというモードになっており、設定可能な箇所はphp.iniかhttpd.confで設定可能となっています。
※PHPマニュアル抜粋
fish shellでaws-cliのコマンド補完
fish shellでaws cliのコマンド補完を設定してみたので設定方法を紹介したいと思います。
設定方法
bashやzshについては以下の公式で設定方法が紹介されているのですがfishはありませんでした。
docs.aws.amazon.com
色々調べたところ以下をfishのconfig(~/.config/fish/config.fish)に設定することでコマンド補完ができます。
complete -c aws -f -a '(begin; set -lx COMP_SHELL fish; set -lx COMP_LINE (commandline); /usr/local/bin/aws_completer; end)'
設定後にテストを行ったところ、以下のように補完候補の末尾に何故かバックスラッシュ(\)がセットされてしまいました。
⋊> ~ aws acm\
acm\ ecs\ mobile\
alexaforbusiness\ efs\ mq\
apigateway\ elasticache\ mturk\
application-autoscaling\ elasticbeanstalk\ opsworks\
appstream\ elastictranscoder\ opsworks-cm
appsync\ elb\ organizations\
athena\ elbv2\ pinpoint\
autoscaling\ emr\ polly\
そのため、補完はされるのですが、このまま実行してもエラーになるので、いちいちバックスラッシュを消す必要があります。
これではせっかく補完されてもあまり嬉しくないので、回避策がないかさらに調べてみました。
以下のとおり補完プログラムを修正することで対応できました。
■修正ファイル
/usr/local/lib/python2.7/site-packages/awscli/completer.py(パスは環境にあわせてください)
■修正箇所
def complete(cmdline, point): choices = Completer().complete(cmdline, point) #print(' \n'.join(choices)) ← コメントアウト print('\n'.join(choices)) ←追加
なぜかスペースをバックスラッシュとして解釈してしまうようなのでスペースを削除します。
再度テストするとバックスラッシュが除去されています。
⋊> ~ aws acm
acm ecs mobile
alexaforbusiness efs mq
apigateway elasticache mturk
application-autoscaling elasticbeanstalk opsworks
appstream elastictranscoder opsworks-cm
appsync elb organizations
athena elbv2 pinpoint
autoscaling emr polly
これで快適にコマンド補完できました。
ただし、残念なことにawscliのアップデートの度に修正が必要になりそうです。
ec2-scheduleでEC2のインスタンスタイプの自動変更
ec2-scheduleではインスタンスタイプの変更も可能です。
今回はインスタンスタイプを自動で変更する方法を紹介したいと思います。
設定方法
DynamoDBのConfigTableに以下を設定
テーブル名はスタック名+ConfigTableになっています。
ex)
月曜から金曜までの9:00 - 20:00(日本時間)t2.micro
月曜から金曜までの20:00 - 9:00(日本時間)t2.nano
scheduleレコード
type | name | description | periods | timezone |
---|---|---|---|---|
schedule | ec2_type_change | ec2 type change | { "down-period@t2.nano", "up-period@t2.micro" } | Asia/Tokyo |
EC2の設定
以下のとおりタグを付与します
動作確認
インスタンスタイプがt2.nanoの状態です。
設定時間がすぎるとt2.microに変更されて起動されました。
変更の動きとしては、9:00に動くLambdaで一旦EC2が停止されます。
その後、9:05のLambdaでインスタンスタイプが変更されて起動されます。
停止に時間がかかる場合は、変更されるタイミングももう少し遅くなるかと思います。
インスタンスタイプを自動で変更してくれるのはすごく便利な機能ですね。
平日は大きめのインスタンスタイプとし、週末はタイプを落とすなどしたいユースケースなどで使えそうです。
ただし、稀に変更するインスタンスタイプが枯渇しているため起動できないなどのケースも想定されるので、本番での利用についてはそのあたりの考慮が必要になりますがぜひ利用を検討してみてください