神戸のデータ活用塾!KDL Data Blog

KDLが誇るデータ活用のプロフェッショナル達が書き連ねるブログです。

【PySpark】Pythonで分散処理を体験してみよう

株式会社神戸デジタル・ラボ DataIntelligenceチームの高木です。

今回は、分散処理を用いて大容量のデータを扱うためのフレームワークであるSparkをPythonで扱うPySparkについて取り上げます。(SparkはPython以外にも、JavaやScalaなどの様々な言語に対応しています)

分散処理とは?

実行される処理やデータなどを分割し、複数のコンピュータでそれぞれを独立して実行する処理のことを分散処理と言います。大量データを短時間で処理することができる分散処理は、データが爆発的に増大している現代においてとても有効な手段です。

今回の記事ではApache SparkのPython用APIであるPySparkを実際のデータに対して利用して、基本的な扱い方を身につけていきましょう!

PySparkの特徴は、以下の通りです。

  • 複雑な処理をコーディングの際に意識する必要がない

  • 大規模なデータ処理を高速に実行することができる

利用データ

今回はデータとして、UCLのMachine Learning RepositoryからBank Marketing Data Setを利用します。以下のリンクからbank.zipをダウンロードし、解凍してください。

https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip

archive.ics.uci.edu

解凍したフォルダの中にはいくつかファイルが入っていますが、今回利用するデータはbank-full.csvになります。軽くCSVデータの内容を確認しましょう!

このCSVには、ポルトガルの銀行が実施したキャンペーンで蓄積されたデータが格納されています。キャンペーンの目的は顧客に電話をかけて、定期預金を申し込んでもらうことです。CSVでは申し込んでもらえたかどうかの結果とともに、各顧客のさまざまな情報(年齢、職業、既婚等)をデータとして格納しています。実際のデータは、下図のとおりです。

定期預金を申し込んだかどうかは y というカラムにyesかnoかで記録されています。

シナリオ

今回はダウンロードしたCSVデータをSparkのデータフレームとして扱って型変換や分割を行い、データベースに保存する一連の流れを扱います。

詳細は、以下の通りです。

(1) SparkSessionクラスを定義
(2) CSVデータを読み込む
(3) スキーマを確認する
(4) 型変換を行う
(5) データフレームを分割する
(6) PostgreSQLに保存する
(7) 保存したテーブルを確認する

環境構築

今回の作業は仮想環境上で行います。ここでは、環境についての詳細と実際の起動方法について説明します。

環境概要

Docker Composeで次の2つのコンテナを用意します。
(1) 【jupyter】:Sparkを搭載したコンテナ
(2) 【db】:処理結果を格納するDBのコンテナ

マシンと各コンテナの関係性は以下の通りです。

Sparkを搭載したjupyterコンテナはローカルにファイルマウントされており、処理結果はdbコンテナ内のPostgreSQLに保存されます。

これらの関係は、docker-compose.ymlというファイルに記述されることで実現できます。

フォルダ構成

実際のフォルダ構成は以下の通りです。

docker-compose.yml

docker-compose.ymlでは、初回のコンテナ起動時にscriptフォルダの中の01_initialize.sql02_create_table.sqlの二つのファイルが自動的に起動するように設定されています。(ローカルのscriptフォルダをコンテナ内のdocker-entrypoint-initdb.dへマウントすることで実現します)

version: '3'

services:
  jupyter:
    container_name: jupyter
    volumes:
      - $PWD:/home/jovyan/work
    image: jupyter/pyspark-notebook
    ports:
      - 8888:8888
  db:
    image: postgres:14
    container_name: postgres
    ports:
      - 5432:5432
    volumes:
      - db-store:/var/lib/postgresql/data
      - ./script:/docker-entrypoint-initdb.d
    environment:
      POSTGRES_USER: root
      POSTGRES_PASSWORD: root
      POSTGRES_INITDB_ARGS: "--encoding=UTF-8"
    hostname: postgres
    user: root
volumes:
  db-store:
01_initialize.sql

01_initialize.sqlでは、データベースシステムの利用において不可欠なデータベース・スキーマ・ロールを定義しています。

データベース データの集合している場所
スキーマ データベースの構造を表現する設計図
ロール データベースに対する権限の定義
-- DB作成
CREATE DATABASE bankmarketing; 

-- 作成したDBへ切り替え
\c bankmarketing

-- スキーマ作成
CREATE SCHEMA bankschema;

-- ロール作成
CREATE ROLE kdl WITH LOGIN PASSWORD 'passw0rd';

-- 権限追加
GRANT ALL PRIVILEGES ON SCHEMA bankschema TO kdl;
02_create_table.sql

02_create_table.sqlでは先ほど作成したデータベース上に二つのテーブルを作成します。

-- DB切り替え
\c bankmarketing

-- テーブル作成
CREATE TABLE  bankschema.yes(
  age INT,
  job VARCHAR(20),
  marital VARCHAR(20),
  education VARCHAR(20),
  "default" VARCHAR(20),
  balance NUMERIC(10, 0),
  housing VARCHAR(20),
  loan VARCHAR(20),
  contact VARCHAR(20),
  "day" INT,
  "month" VARCHAR(20),
  duration INT,
  campaign INT,
  pdays INT,
  previous INT,
  poutcome VARCHAR(20),
  y VARCHAR(20)
);

CREATE TABLE  bankschema.no(
  age INT,
  job VARCHAR(20),
  marital VARCHAR(20),
  education VARCHAR(20),
  "default" VARCHAR(20),
  balance NUMERIC(10, 0),
  housing VARCHAR(20),
  loan VARCHAR(20),
  contact VARCHAR(20),
  "day" INT,
  "month" VARCHAR(20),
  duration INT,
  campaign INT,
  pdays INT,
  previous INT,
  poutcome VARCHAR(20),
  y VARCHAR(20)
);

-- 権限追加
GRANT ALL PRIVILEGES ON bankschema.yes TO kdl;
GRANT ALL PRIVILEGES ON bankschema.no TO kdl;
main.ipynb

分散処理コードを記述するファイルです。構築時には、作成されていなくても構いません。

実際に構築

ここでは、実際に構築していきます。

(1) まずは、docker-compose.ymlと同じ階層に移動してください。lsコマンドを実行して、以下のようになっていればOKです!

(2) dockerを実際に起動します。docker compose up コマンドを入力してください!下記のように、ターミナルにずらっと文字列が表示されるます。

(3) ターミナルの出力が止まるとその一番下に赤枠で囲ったようなURLがありますので、表示されているどちらかのURLをお使いのブラウザに入力してください。(どちらを利用しても構いません)

(4) 下記のような画面が開かれます。

(5) 分散処理コードを記述するmain.ipynbを作成します。(デフォルトのファイル名はUntitled.ipynbですので、左サイドバーの該当ファイルで右クリックをしてファイル名の変更を行なってください)

こちらで準備が整いました。次の章から実際に分散処理のコードを書いていきましょう。

分散処理

(1) SparkSessionクラスを定義

Sparkを利用するためには、すべての機能へのアクセスするためのエントリポイントを作成しなければなりません。こちらはSparkSessionクラスを定義することで実現できます。

from pyspark.sql import SparkSession

spark = SparkSession \
  .builder \
  .appName("Hello Word") \
  .getOrCreate()

SparkSession以下に利用されている各メソッドの簡単な説明は以下の通りです。

builder 基本的な SparkSessionを生成する
appName SparkWebUIに表示されるアプリケーションの名前を設定。アプリケーション名が設定されていない場合は、ランダムに生成された名前が使用される。
getOrCreate 最初に有効なグローバルデフォルトSparkSessionがあるかどうかをチェックしてあれば返却。ない場合は新しいSparkSessionを作成して割り当てる。

(2) CSVデータを読み込む

今回用意したCSVデータをSparkでのデータ形式であるデータフレームとして読み込みます。読み取り時にはフォーマットを指定したり、自動でスキーマを推測してくれるようなオプションを加えてあります。(実務では誤った推定を防ぐため、各スキーマに対して型をあらかじめ定義することが多いです)

注)SparkのデータフレームはPandasのデータフレームと似ていますが、異なるものです。しかし、親和性は高く相互変換可能です。

df = spark.read.format('csv') \
        .option('inferSchema', 'True') \
        .option('header', 'True') \
        .option('sep', ';') \
        .load('./data/bank-full.csv')
df.show(10)

(3) スキーマを確認する

printSchemaメソッドを使って、各カラムのスキーマ設定を確認します。nullableはnullを許容するかどうかを示しますが、出力を見ると今回は全てのカラムでnullが許容されるようですね!

df.printSchema()

(4) 型変換を行う

テーブルデータにおいて、型を正確に定義することはデータを管理する上で大切なことです。もし異なるデータ型で保存してしまうと、小数点以下の情報がなくなったりといった予期せぬハプニングが起こる可能性があります。

balance(残高)カラムをintegerからdecimalに変更してみます。カラム全ての変更を行うためには、withColumnメソッドを利用します。

df2 = df.withColumn('balance', df['balance'].cast('decimal'))
df2.printSchema()

(5) データフレームを分割する

キャンペーンの申し込みについてデータを分けて管理してみましょう。申し込みをした人(yカラムが'yes')と申し込みをしなかった人(yカラムが'no')に分割します。こちらは、filterメソッドで条件を加えることで実現します。

df_no = df2.filter(df['y'] == 'no')
df_yes = df2.filter(df['y'] == 'yes')

分割したそれぞれのデータに対して、それぞれの残高に違いがあるかを確認してみましょう。

df_no.select('balance').groupby().mean().show()
df_yes.select('balance').groupby().mean().show()

申し込みをした人の方が残高が多い結果となりました。キャンペーンに申し込みをする人は残高に余裕があるという傾向がうかがえますね!!

(6) PostgreSQLに保存する

作成したデータフレームをテーブルとしてPostgreSQLに保存します。

ここのオプションで指定しているDB名、ユーザー名、テーブル名、パスワード名は初回のコンテナ起動時に01_initialize.sql02_create_table.sqlで設定した値になります。

df_yes.write\
    .format("jdbc")\
    .mode("append")\
    .option("url", "jdbc:postgresql://postgres:5432/bankmarketing")\
    .option("user", "kdl")\
    .option("dbtable", "bankschema.yes")\
    .option("password", "passw0rd")\
    .option("driver", "org.postgresql.Driver") \
    .save()

df_no.write\
    .format("jdbc")\
    .mode("append")\
    .option("url", "jdbc:postgresql://postgres:5432/bankmarketing")\
    .option("user", "kdl")\
    .option("dbtable", "bankschema.no")\
    .option("password", "passw0rd")\
    .option("driver", "org.postgresql.Driver") \
    .save()

(7) 保存したテーブルを確認する

PostgreSQLに保存したテーブルが正しく保存されているかを確認しましょう!

まずは、申し込みがあった方のテーブルを確認します。

yes = spark.read.format("jdbc")\
    .option("url", "jdbc:postgresql://postgres:5432/bankmarketing")\
    .option("user", "kdl")\
    .option("dbtable", "bankschema.yes")\
    .option("password", "passw0rd")\
    .option("driver", "org.postgresql.Driver") \
    .load()

# 表示
yes.show()

最終列のyカラムが全て'yes'となっていることから、申込みのあった方のみのテーブルとなっていることが確認できます。

続いて、もう片方の残念ながら申し込みがなかった方のテーブルを確認します。

no = spark.read.format("jdbc")\
    .option("url", "jdbc:postgresql://postgres:5432/bankmarketing")\
    .option("user", "kdl")\
    .option("dbtable", "bankschema.no")\
    .option("password", "passw0rd")\
    .option("driver", "org.postgresql.Driver") \
    .load()

# 表示
no.show()

最終列のyカラムが全て'no'となっていることから、こちらのテーブルも正しいと確認することができました。

これで、今回のシナリオは完了です。少し文法などに特徴はありますが、分散処理を意識することなくコーディングできることが実感いただけていれば嬉しいです!!

まとめ

本記事では、分散処理フレームワークの一つであるApache SparkのPython用APIであるPysparkを利用して処理を行い、処理結果をPostgreSQLデータベースに格納しました。実務でも、Pysparkを利用したこのようなシナリオは十分に考えられます。

しかし、実務と本記事では少し異なる点が、、、それは環境です。

今回はコンテナを立ててSparkを利用していましたが、実務ではDataBricksやAWSのGlueなどの環境が利用されることが多いです

本ブログでは今後それらのツールを使用した分散処理の記事も発信予定ですので、是非【読者になる】ボタンをクリックして、最新記事をお待ちください!

高木裕仁

データサイエンティスト
非構造化・構造化データを偏りなく扱います。