AWS GlueでS3から差分データを取得する

日次処理などの定期処理でS3からデータを取得しparquet形式に変換やDBにLoadするといったケースについては、ETLする際によくあるのではないでしょうか?
上記のようなケースでは、ETL処理の際にS3から前日からの差分を取得する必要があります。Glueにおいて自動生成されるコードでは、対象のデータソース全てを読み込んでしまうため読み込み後にフィルタする必要があります。すべて読み込んでフィルタするのはデータ量が多いとあまり効率はよくないですよね。
今回はこうしたケースにおいてどのように取得できるか考えてみたいと思います。

差分データを取得する方法

DynamicFrameを利用して取得する方法としては以下3つがあるかと思います
①from_catalogue関数でPushdown Predicatesオプションを利用
Pushdown Predicatesを利用すると読み込み時に条件を指定してフィルタすることが可能です。
ただし、データソースがパーティショニングされている必要があるため、データソースによっては簡単に使えない場合があります。設定方法については以下で紹介されていますので参照してください。
AWS Glue の Pushdown Predicates を用いてすべてのファイルを読み込むことなく、パーティションをプレフィルタリングする | Developers.IO

②JobBookmarkを利用
JobBookmarkを利用すると前回読み込んだところまで記録されているため、差分のみ取得することが可能となります。
バックエンドで色々とやってくれるので便利でいいのですが、どのような仕組みで実現されているのか見えないため、処理に問題があった際にリカバリが難しくなることが考えられます。また、ETL処理の前に毎回クローラーを実行する必要があります。
AWS GlueのJob Bookmarkの使い方 - cloudfishのブログ

③from_options関数を利用
from_options関数を利用することでS3のパスを直接指定することが可能です。この方法の場合、データソースがパーティショニングされている必要はなくパスを指定することで読み込みが可能です。

今回は③の方法について試してみたいと思います。

準備

まずは、適当なCSVファイルをS3に配置しておいてください。

ジョブの作成

ジョブを作成し以下のコードを入力して実行してください。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#from_options関数で読み込み
datasource0 = glueContext.create_dynamic_frame.from_options( connection_type = "s3",connection_options = {"paths": [ "s3://glue-testdata-xxxxx/input/"]},format="csv", format_options={ "withHeader": "true","skipFirst": "true"})

datasink2 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = "s3", connection_options = {"path": "s3://glue-testdata-xxxxx/output"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

ポイントはfrom_options関数を使って読み込むところになります。connection_optionsにS3のパスを指定します。ここはファイル指定や複数入力可能です。
また、format_optionsで1行目をヘッダーとして読み込むことや読み飛ばすこともオプションで可能となっていますが、現時点ではバグのため指定がきかないようなので、読み飛ばしやカラム名の指定については独自で実装してやる必要があります。
また、型変換、カラムの削除なども自分で実装する必要があります。

まとめ

定期実行してデータを取り込むような処理はよくあると思いますのでそういった場合に使えそうですね。
ただし、まだバグがあるようなので利用する際はしっかりと検証したほうがよさそうです。