神戸のデータ活用塾!KDL Data Blog

KDLが誇るデータ活用のプロフェッショナル達が書き連ねるブログです。

AWS Glueを使ってみた!~環境構築・実行編~

こんにちは!Dataintelligenceチームの垣内です。

本連載ではAWSの分散処理サービスである「AWS Glue」を使って、分散処理を実行してみます!第一回目の「サービス調査編」を見られたい方は、こちらからご覧ください! kdl-di.hatenablog.com

今回は、ローカルでの開発方法をご紹介します。

アーキテクチャのおさらい

前回の環境構築編でちらっとお見せしたアーキテクチャをおさらいしましょう。

今回作成したいアーキテクチャ

今回は次の3つのコンテナを作ります。

  • S3コンテナ
    • LocalStackを使ってAmazon S3を構築
    • データ(bank-full.csv)ファイルがある
  • Glueコンテナ
    • AWS GlueのDockerイメージを使ってGlue環境を再現
    • main.pyでJobを実行
  • PostgreSQLコンテナ
    • AWS Glueで変換したデータが格納されていく

このアーキテクチャを再現するため、compose.yamlファイル(旧:docker-compose.yaml)を使って3つのコンテナの設定などを記述します。ベースとなるデータやディレクトリ構造はこちらのSpark記事のものを再利用います。

お手元のディレクトリ構成が図の左側のようになっていたらOKです!これから不足しているファイルやディレクトリを作ったりファイルの中身を変えていき、最終的には右ような構成にしていきたいと思います。

ディレクトリ構成

compose.yamlの修正

①ファイル名の変更

これまでdocker composeを立ち上げる際に元となっていた「docker-compose」のYamlファイルですが、「compose.yaml」というファイル名の利用が推奨され始めました。今後はこのファイル名がベーシックとなっていきそうであるので、今のうちにcompose.yamlというファイル名に変更しておきたいとおもいます。

②S3コンテナ

まず1つ目のS3コンテナを立てていきます。
このコンテナでは、LocalStackのエミュレート機能を使ってAmazon S3を立てていきます。Amazon S3上には bank-full.csv ファイルを配置するのですが、コンテナを立ち上げる度に毎度手動アップロードするとなると手間がかかります。立ち上げる度に必ずアップロード処理をするということがわかっているので、初期化スクリプトを作成して自動化しておくとよいでしょう。

init.shの作成

ファイルをAmazon S3にアップロードするためには、バケットを作成してからファイルのアップロードコマンドを実行する必用があります。各コマンドを見ていきましょう。

1. バケット作成

aws s3 mb s3://dummy/ --endpoint-url=http://localhost:4566

2. ファイルをアップロード

 aws s3 mb s3://dummybucket/ --endpoint-url=http://localhost:4566

endpointはリクエストの送信先を指定するパラメータです。
このパラメータは必須ではないものの、AWS CLIが、選択したサービスとAWS リージョンに基づいてURLが自動的に決定されるそうです。*1
今回は、意図しないエンドポイントへの操作を防ぐ、そして自分が今どこのエンドポイントにリクエストを送っているかを意識するために指定しました。

この2つのコマンドを自動で実行されるためには、init.shというファイルにまとめておくとよいでしょう。画像のようにまとめられていればOKです!

init.sh

この init.shlocalstackというディレクトリを作って、格納しておきます。先ほど示したディレクトリ画像の、右側の一番下のような構造になっていることを確認してください。


yamlの修正 ~LocalstackでAmazon S3を立ち上げる~

次にLocalstackのDockerイメージから、Amazon S3立ち上げられるようにcompose.yamlを修正していきます。基本的にLocalstackのドキュメントと一緒ですが、ここでは公式ドキュメントには書いてない事などをご紹介します。compose.yamlに以下を追記してください。

version: '3.8'
services:
  emulation.localstack:
    container_name: localstack
    image: localstack/localstack:0.12.8
    ports:
      - "127.0.0.1:4566:4566" 
      - "127.0.0.1:4510-4559:4510-4559"
    environment:
      - SERVICES=s3
      - AWS_DEFAULT_REGION=ap-northeast-1
      - AWS_DEFAULT_OUTPUT=json
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
    volumes:
     - ./data:/opt/code/localstack/data   #データをdockre側にマウント
     - ./localstack:/docker-entrypoint-initaws.d
    networks:
      - emulation.network


Localstackを使うときは、4566ポートを指定してエミュレートしたいAWSサービス名をSERVICESという環境変数に、カンマ区切りでサービス名を列挙します。

余談ですが、以前は各サービスごとにポートが用意されていました。「S3だとこのポート、Lambdaだとこのポート」といったような感じです。現在の仕様へと変わってから日が浅いため、サービスごとのポートを設定する方法のドキュメントがまだまだ残っているのでご注意ください。

そして、AWSサービスを使う方はお馴染みのプロファイル情報の記述も忘れないようにしましょう。プロファイルを設定し忘れて待ったのですが、S3バケットが作れませんでした。。。

- AWS_DEFAULT_REGION=ap-northeast-1
- AWS_DEFAULT_OUTPUT=json
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=test

Localstackの起動初期スクリプトをマウントする

先ほど作成した「init.sh」のような初期化スクリプトを実行するためには、「/docker-entrypoint-initaws.d」ディレクトリへのマウントが必要です。ここに置かれたファイルは自動でスクリプトを実行してくれるので、忘れないよう注意しましょう。

volumes:
    - ./data:/opt/code/localstack/data
    - ./localstack:/docker-entrypoint-initaws.d

③Glueコンテナ

次にGlueコンテナを改修します。こちらも注意点含めてご紹介いたします。

yamlの修正 ~Glueコンテナを作る~

  emulation.glue:
    container_name: glue
    image: amazon/aws-glue-libs:glue_libs_3.0.0_image_01
    volumes:
      - ./:/home/glue_user/workspace/jupyter_workspace
    environment:
      - DISABLE_SSL=true
      - AWS_REGION=ap-northeast-1
      - AWS_OUTPUT=json
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
      - ENDPOINT_URL=http://emulation.localstack:4566
    ports:
      - 4040:4040 
      - 8888:8888
    networks:
      - emulation.network
    command: /home/glue_user/jupyter/jupyter_start.sh

こちらでもプロファイル情報の記入が不可欠です。Glueコンテナでは、LocalstackでエミュレートしたS3へアクセスします。そのため、ENDPOINT_URLという設定を入れています。compose.yamlの環境変数に設定しているものはローカルでのみ利用する値を設定しています。本番のAWS Glueでも使うような値の書き方などは後ほどご紹介いたします。

JupyterNotebookの起動スクリプトを忘れない

command: /home/glue_user/jupyter/jupyter_start.sh 

command欄でjupyterの起動スクリプトの実行を忘れてはいけません!これがなければコンテナが終了してしまいます。

#!/bin/bash
# source /home/glue_user/.bashrc
if [[ ! "$?" -eq 1 ]]; then
    livy-server start
    if [[ -z ${DISABLE_SSL} ]]; then
        echo "Starting Jupyter with SSL"
        jupyter lab --no-browser --ip=0.0.0.0 --allow-root --ServerApp.root_dir=/home/glue_user/workspace/jupyter_workspace/ --ServerApp.token='' --ServerApp.password='' --certfile=/home/glue_user/.certs/my_key_store.pem --keyfile /home/glue_user/.certs/my_key_store_key.key
    else
        echo "SSL Disabled"
        jupyter lab --no-browser --ip=0.0.0.0 --allow-root --ServerApp.root_dir=/home/glue_user/workspace/jupyter_workspace/ --ServerApp.token='' --ServerApp.password=''
    fi
fi

おまじないで書かれがちなコマンドですが、中身はこんな感じです。SSLの有効フラグを見て起動方法を変えていることがわかりますね!

④PostgreSQLコンテナ

yamlの修正 ~PostgreSQLコンテナを作る~

  emulation.postgres: 
    container_name: postgres
    image: postgres:14
    volumes:
      - ./postgresql/init:/docker-entrypoint-initdb.d
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=root 
      - POSTGRES_PASSWORD=password
      - POSTGRES_INITDB_ARGS=--encoding=UTF-8
    networks:
      - emulation.network
networks:
  emulation.network:
    name: emulation.network

init.sqlをdocker-entrypoint-initdb.d に置く

データベースやテーブルの作成など初期化用のスクリプトを手動で展開するのは手間がかかります。そこで、初期化スクリプトを「docker-entrypoint-initdb.d 」に配置します。するとマウント時に初期化スクリプトを実行してくれます。ローカルでデータベースコンテナを構築する際は便利なので、ぜひ活用しましょう!

main.pyの修正

Notebook形式の書き方から、Pythonスクリプトの書き方に変更します。

基本的には書き写しと処理の共通化をしています。但し、スクリプトの前半箇所(ファイルの読み込み等)を修正しました。この部分は、ファイルの読み込み時にAmazon S3からファイルを取得するようにしています。

Sparkでは様々なファイルシステムにアクセスする際にHadoopFileSystem APIを利用しているのですが、これに関係してAmazon S3とSpark間の認証回りが少々複雑になっています。*2こちらについては後日詳細を書きたいと思います。

from awsglue.utils import getResolvedOptions
from pyspark.sql import SparkSession
import sys, os, boto3

def init_spark(mode):
    spark = SparkSession.builder.appName("Hello Word").getOrCreate()
    if mode == "local":
        spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ["AWS_ACCESS_KEY_ID"])
        spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"])
        spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.environ["ENDPOINT_URL"])
        spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    return spark

def connect_s3(mode, region):
    if mode == "local":
        s3 = boto3.client(service_name="s3",
                        aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
                        aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
                        region_name=region,
                        endpoint_url=os.environ["ENDPOINT_URL"])
    else:
        s3 = boto3.client(service_name="s3", region_name=region)

    return s3

def get_file_df(spark, mode, region):
    s3_client = connect_s3(mode, region)
    response = s3_client.get_object(Bucket='dummy', Key='bank-full.csv')
    return spark.read.csv('s3a://dummy/bank-full.csv', sep=';', inferSchema=True, header=True)

def read_data(spark):
    return spark.read.format('csv')\
                .option('header', 'True')\
                .option('sep', ';')\
                .load('df')


def df_read(spark, table):
    return spark.read.format("jdbc")\
                .option("url", "jdbc:postgresql://postgres:5432/bankmarketing")\
                .option("dbtable", table)\
                .option("user", "kdl")\
                .option("password", "passw0rd")\
                .option("driver", "org.postgresql.Driver")\
                .load()

def df_write(df, table):
    df.write.format("jdbc")\
            .mode("append")\
            .option("url", "jdbc:postgresql://postgres:5432/bankmarketing")\
            .option("dbtable", table)\
            .option("user", "kdl")\
            .option("password", "passw0rd")\
            .option("driver", "org.postgresql.Driver")\
            .save()

def main():
    # Jobパラメータの取得
    env_list = ["REGION", "MODE"]
    args = getResolvedOptions(sys.argv, env_list)
    MODE =args["MODE"]
    REGION = args["REGION"]

    spark = init_spark(MODE)

    # S3からファイルを取得し、絞り込み・分割
    df = get_file_df(spark, MODE, REGION)

    df2 = df.withColumn('balance', df['balance'].cast('decimal'))
    df2.printSchema()
    df_no = df2.filter(df['y'] == 'no')
    df_yes = df2.filter(df['y'] == 'yes')

    # 書き込み
    df_write(df_yes, "bankschema.yes")
    df_write(df_no, "bankschema.no")

    # データ読み込み
    yes_df = df_read(spark, "bankschema.yes")
    no_df = df_read(spark, "bankschema.no")
    print("yes_df")
    yes_df.show()
    print("no_df")
    no_df.show()

if __name__ == "__main__":
    main()

AWS GlueではJobの実行時に設定する引数を「Jobパラメータ」といいます。Jobパラメータは、getResolvedOptions メソッドで値を取得できます。

動かしてみる

Jobを動かすためには、Glueコンテナに入り、glue-spark-submitというコマンドを実行する必用があります。

①イメージをビルドし、コンテナを立ち上げる

docker compose up --build

②コンテナの中へ入る

docker exec -it glue bash

③main.pyを指定し、Jobを実行
glue-spark-submitでジョブスクリプトが実行できるのですが、スクリプト実行時に必須となる値はジョブの引数として受け渡しが可能です。これをJobパラメータといいます。デフォルトで設定できるものもありますが、自分たちが独自に設定することも可能です。

glue-spark-submit ./jupyter_workspace/main.py --REGION=ap-northeast-1  --MODE=local

独自で設定する場合は「--<パラメータ名>」という形で設定ができます。詳細はこちらのページをご覧ください。 AWS Glue ジョブのパラメータ - AWS Glue

さて、解説を読んで言ううちに実行ができたと思いますので確認していきましょう。「yes_df/no_df」いずれも出力されていて、データもこちらの記事の通り分割できていればOKです!

まとめ

今回はコンテナのyamlファイルの変更と、main.pyの修正等に取り組んできました。公式ドキュメントではあまり触れられていない所や、おまじない的に書かれがちなところもありましたが、お判りいただけたでしょうか?Localstackの使い方やJobの動かし方など習得することが多かったと思いますので、落ち着いたタイミングでもう一度復習にチャレンジしてみてください。