神戸のデータ活用塾!KDL Data Blog

KDLが誇るデータ活用のプロフェッショナル達が書き連ねるブログです。

【やってみた】AutoEncoder+MVTecデータセットで異常検知!

株式会社神戸デジタル・ラボ DataIntelligenceチームの原口です。

今回はAutoEncoder(オートエンコーダ)と呼ばれるAIモデルとMVTec社が配布している異常検知データセットを用いて、異常検知に取り組みます!

異常検知とAIの関係性

新聞やニュースなどでよく目にする異常検知。様々な分野ですでに実用化され、他の大多数データと異なるデータを検出する技術を指します。

その例として、「センサーデータを用いて、工場内で稼働している機器の不調を検出」「製品に傷・汚れなどがないかAIが検出」するものが挙げられます。

なぜ異常検知にAIが多く利用されているのでしょうか?

AIで行うことの利点の一つは、人件費の削減です。

これまでは、作業員の方が製品に対して傷・汚れ・欠け・割れなどの異常を検査していました。

この検査を各レーンに対して数十人で行っている場合、非常に人件費がかかります

人間が検査を行う場合

このような場合、AIによる異常検知システムを導入することで大幅な人件費削減を実現することができます。

AIが検査を行う場合

また、精度にばらつきが出にくいのもAIの良い点です。

一般的に人が長時間検査を行っていると、作業者の疲れに伴って未検出・誤検出が増えてきます

一方、AIが検査をする場合、こういった問題はすべて解消され、常に高精度の状態を保つことができます

こういった観点から、近年異常検知を導入する企業が増えています。

今回は異常検知の中でも画像を用いる手法の、AutoEncoderと呼ばれるAIモデルを用いて試します!

AutoEncoderとは

今回異常検知に利用するAutoEncoderですが、どういった特徴があるのでしょうか?

まずはAutoEncoderの仕組みから説明します。

AutoEncoderの概略図

AutoEncoderは上図で示すように、以下で構成されます。

  • Encoder(エンコーダー)
  • 潜在変数
  • Decoder(デコーダー)

AutoEncdoerは通常、入力画像を再現できるように学習します

内部ではどのような処理が行われているのでしょうか?

画像がAutoEncoderに入力されると、Encoderが最初の処理を行います。

Encoderは画像から特徴を抽出し、その結果を潜在変数に格納します。

特徴抽出中

ここで1つ注意点があります。それは、「潜在変数が格納できる情報量には限りがある」という点です。

例えば、大まかな情報のみを格納した場合を見てみましょう。

潜在変数に特徴を格納

このEncoderは中央のリンゴの特徴を伝えず、全体の色味を伝えることにしました。

この情報を受け取ったDecoderは、入力画像を再現しようとします。

潜在変数から入力画像を再現

結果を確認

どうでしょうか。Decoderによって再現された画像は入力画像と全く違うものになってしまいました。

このように、間違った情報・必要のない情報を潜在変数に格納した場合は、Decoderは元の画像を再現することが困難になります。

そのため、Encoderは画像を再現するうえで必要最低限な情報を潜在変数に格納することが求められ、それが出来るように学習をします。

一方、Decoderは、Encoderが抽出した特徴をヒントに、入力された画像をきれいに再現できるよう学習をします。

異常検知にAutoEncdoerを適用するには・・・

ここまでの話だと、「AutoEncoderを利用して入力画像を再現しただけで、どうやって異常検知が出来るんだ???」となります。

異常検知におけるAutoEncdoerはこの入力画像を再現する能力に着目して、異常検知を行います。

AutoEncdoerは入力画像を再現すると言いましたが、厳密にはAutoEncoderが学習したもののみ再現することができます。

「AIなんだから当たり前じゃないか!学習してない画像が再現できるわけがない!・・・・はっ!」

そうです、AutoEncdoerに正常画像のみを学習させた場合、異常画像はうまく再現できないのです!!

ではAutoEncdoerを用いた異常検知の原理について説明します。

AutoEncoderを用いた異常検知

先ほど説明した通り、正常画像のみを学習したAutoEncoderは、正常画像は復元できますが異常画像はうまく復元できません。この特性を利用して異常検知を行います。

AutoEncoderによる異常検知

上図に概要図を示します。異常検知を行うには、入力画像と再現画像の差分を取ります。

正常画像の場合は、きれいに再現できるため入力画像と再現画像の差が小さくなります。

一方、異常画像の場合は、異常個所を再現できないため再現画像での異常個所の差が大きくなります。

この差を利用して異常検知を行います。

実装

実際に実装していきましょう!

Step1:実装の準備

まずはMVTec社が配布しているデータセットを準備しましょう! データのダウンロードはこちらから行えます!

ダウンロードができましたら、データを展開しましょう!

xz -dv mvtec_anomaly_detection.tar.xz
tar xfv mvtec_anomaly_detection.tar

データセットの中には様々な製品が存在しますが、今回は「bottle」を利用して実験します。

Step2:データの確認

続いてbottleデータの確認をします。

import os
import glob
#学習用正常データの読み出し
good_list = glob.glob(os.path.join("/bottle/train/good/" , '*'))

#評価用正常データの読み出し
good_test_list = glob.glob(os.path.join("/bottle/test/good/" , '*'))

#評価用異常データの読み出し
bad_test_list = glob.glob(os.path.join("/bottle/test/broken_large" , '*')) + glob.glob(os.path.join("/bottle/test/broken_small" , '*')) + glob.glob(os.path.join("/bottle/test/contamination" , '*'))
#正常・異常データの数を確認
print(f"good {len(good_list)} good_test {len(good_test_list)} bad {len(bad_test_list)}")

#出力
#good 209 good_test 20 bad 63

正常画像と比較して、異常画像は1/3程度しかありません。

また、評価用の正常画像は20枚しかありません。

学習用画像の一部を評価用画像に移すことでこの差を埋めたいと思います。

good_test_list += good_list[:43]
good_list = good_list[43:]
print(f"good {len(good_list)} good_test {len(good_test_list)} bad {len(bad_test_list)}")
#出力
#good 166 good_test 63 bad 63

学習用画像は減少しましたが、評価用画像のバランスをとることができました。

では、正常画像と異常画像がどのような画像であるか出力して確認しましょう!

%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from PIL import Image
import math

def view_img(img_list,title = "",size = (15,15)):
    fig = plt.figure(figsize=size)
    fig.suptitle(title)
    for i,d in enumerate(img_list):
        ax = fig.add_subplot(math.ceil(len(img_list)/2),2,i+1)
        ax.imshow(Image.open(d))
        ax.set_title(d)
        ax.axis("off")
    
#正常画像
view_img(good_list[:4],title = "good")
#異常画像
view_img(bad_test_list[:4],title = "bad")

正常画像と異常画像

正常画像は全体を通して似た形をしていますが、細かい部分で差があります(オレンジ円の欠け具合・白い点の位置など)。

一方、異常画像は、左下の画像を見ると上半分がつぶれているます。他の異常画像でも同様に大きな欠損を確認できます。

Step3:データセットの準備

学習にはデータセットを準備する必要があります。今回は学習用と評価用のデータを8:2で分割します。

学習用データ・評価用データについて詳しく知りたい方はこちらからご確認ください!

import torchvision.transforms as T
from torch.utils.data import DataLoader,Dataset
from PIL import Image

# データセット関数の定義
class Custom_Dataset(Dataset):
  def __init__(self,img_list):
    self.img_list = img_list
    self.prepocess = T.Compose([T.Resize((128,128)),
                                T.ToTensor(),
                                ])
  def __getitem__(self,idx):
    img = Image.open(self.img_list[idx])
    img = self.prepocess(img)
    return img
  def __len__(self):
    return len(self.img_list)

#データを学習用・評価用に8:2へ分割
train_list = good_list[:int(len(good_list)*0.8)]
val_list = good_list[int(len(good_list)*0.8):]

train_dataset = Custom_Dataset(train_list)
val_dataset = Custom_Dataset(val_list)
train_loader = DataLoader(train_dataset,batch_size = 32)
val_loader = DataLoader(val_dataset,batch_size = 32)

これでデータセットの準備はばっちりです!

続いてAutoEncoderモデルの構築に移りましょう!

Step4:AutoEncoderの構築

AutoEncoderは、以下で構成されています。

  • Encoder
  • 潜在変数
  • Decoder

上記の内容をコードにすると次のようになります。

import torch.nn as nn
import torchvision

class CustomModel(nn.Module):
    def __init__(self):
        super(CustomModel,self).__init__() 
        # Encoderの構築。
        # nn.Sequential内にはEncoder内で行う一連の処理を記載する。
        # create_convblockは複数回行う畳み込み処理をまとめた関数。
        # 畳み込み→畳み込み→プーリング→畳み込み・・・・のような動作
        self.Encoder = nn.Sequential(self.create_convblock(3,16),     #256
                                     nn.MaxPool2d((2,2)),
                                     self.create_convblock(16,32),    #128
                                     nn.MaxPool2d((2,2)),
                                     self.create_convblock(32,64),    #64
                                     nn.MaxPool2d((2,2)),
                                     self.create_convblock(64,128),   #32
                                     nn.MaxPool2d((2,2)),
                                     self.create_convblock(128,256),  #16
                                     nn.MaxPool2d((2,2)),
                                     self.create_convblock(256,512),  #8
                                    )
        # Decoderの構築。
        # nn.Sequential内にはDecoder内で行う一連の処理を記載する。
        # create_convblockは複数回行う畳み込み処理をまとめた関数。
        # deconvblockは逆畳み込みの一連の処理をまとめた関数
        # 逆畳み込み→畳み込み→畳み込み→逆畳み込み→畳み込み・・・・のような動作
        self.Decoder = nn.Sequential(self.create_deconvblock(512,256), #16
                                     self.create_convblock(256,256),
                                     self.create_deconvblock(256,128), #32
                                     self.create_convblock(128,128),
                                     self.create_deconvblock(128,64),  #64
                                     self.create_convblock(64,64),
                                     self.create_deconvblock(64,32),   #128
                                     self.create_convblock(32,32),
                                     self.create_deconvblock(32,16),   #256
                                     self.create_convblock(16,16),
                                    )
        # 出力前の調整用
        self.last_layer = nn.Conv2d(16,3,1,1)
                                        
    # 畳み込みブロックの中身                            
    def create_convblock(self,i_fn,o_fn):
        conv_block = nn.Sequential(nn.Conv2d(i_fn,o_fn,3,1,1),
                                   nn.BatchNorm2d(o_fn),
                                   nn.ReLU(),
                                   nn.Conv2d(o_fn,o_fn,3,1,1),
                                   nn.BatchNorm2d(o_fn),
                                   nn.ReLU()
                                  )
        return conv_block
    # 逆畳み込みブロックの中身
    def create_deconvblock(self,i_fn , o_fn):
        deconv_block = nn.Sequential(nn.ConvTranspose2d(i_fn, o_fn, kernel_size=2, stride=2),
                                      nn.BatchNorm2d(o_fn),
                                      nn.ReLU(),
                                     )
        return deconv_block

    # データの流れを定義     
    def forward(self,x):
        x = self.Encoder(x)
        x = self.Decoder(x)
        x = self.last_layer(x)           
        return x

Encoderはself.encoder、Decoderはself.decoderで構成しています。

潜在変数は、Encoderの出力に当たるため、

x = self.Encoder(x)

が潜在変数となります。

Step5:学習

お待ちかねの学習をしてみましょう!! 以下のコードを記述後、実行してください!

「AIの学習ってどうなってるんだ?」と疑問をお持ちの方はこちらをご覧ください!

from tqdm import tqdm
import torch.optim as optim
import torch.nn.functional as F


epoch_num = 1000

device = 'cuda'

best_loss = None
model = CustomModel().to(device)
limit_epoch = 100


optimizer = optim.Adam(model.parameters())
criterion = nn.MSELoss()
#criterion = nn.BCEWithLogitsLoss()
loss_list = {"train":[],"val":[]}

counter = 0
for e in range(epoch_num):
    total_loss = 0
    model.train()
    with tqdm(train_loader) as pbar:
        for itr , data in enumerate(pbar):
            optimizer.zero_grad()
            data = data.to(device)
            output = model(data)
            loss = criterion(output , data)
            total_loss += loss.detach().item()
            pbar.set_description(f"[train] Epoch {e+1:03}/{epoch_num:03} Itr {itr+1:02}/{len(pbar):02} Loss {total_loss/(itr+1):.3f}")
            loss.backward()
            optimizer.step()
    
    loss_list["train"].append(total_loss)
    total_loss = 0
    model.eval()
    with tqdm(val_loader) as pbar:
        for itr , data in enumerate(pbar):
            data = data.to(device)
            with torch.no_grad():
                output = model(data)
            loss = criterion(output , data)
            total_loss += loss.detach().item()
            pbar.set_description(f"[ val ] Epoch {e+1:03}/{epoch_num:03} Itr {itr+1:02}/{len(pbar):02} Loss {total_loss/(itr+1):.3f}")
    
    if best_loss is None or best_loss > total_loss/(itr+1):
        if best_loss is not None:
            print(f"update best_loss {best_loss:.6f} to {total_loss/(itr+1):.6f}")
        best_loss = total_loss/(itr+1)
        model_path = 'model.pth'
        torch.save(model.state_dict(), model_path)
        counter = 0
    else:
        counter += 1
        if limit_epoch <= counter:
            break
    loss_list["val"].append(total_loss)
[train] Epoch 001/1000 Itr 06/06 Loss 0.448: 100%|██████████| 6/6 [00:05<00:00,  1.07it/s]
[ val ] Epoch 001/1000 Itr 02/02 Loss 0.454: 100%|██████████| 2/2 [00:01<00:00,  1.48it/s]
[train] Epoch 002/1000 Itr 06/06 Loss 0.291: 100%|██████████| 6/6 [00:05<00:00,  1.08it/s]
[ val ] Epoch 002/1000 Itr 02/02 Loss 0.391: 100%|██████████| 2/2 [00:01<00:00,  1.47it/s]
update best_loss 0.454319 to 0.391192
[train] Epoch 003/1000 Itr 06/06 Loss 0.190: 100%|██████████| 6/6 [00:05<00:00,  1.07it/s]

のように出力されればOKです!学習が終わるまで気長に待ちましょう~~~。(学習は1時間程度かかります)

Step6:可視化

学習が終われば評価の時間です!

まずはAutoEncoderの出力を可視化しましょう!

mkdir create_img
import cv2
import numpy as np
from PIL import Image


model = CustomModel().cuda()

model_path = 'model.pth'
model.load_state_dict(torch.load(model_path))

margin_w = 10
prepocess = T.Compose([T.Resize((128,128)),
                                T.ToTensor(),
                                ])
model.eval()
loss_list = []
labels = [0]*len(good_test_list) + [1]*len(bad_test_list)
for idx , path in enumerate(tqdm(good_test_list + bad_test_list)):

    img = Image.open(path)
    img = prepocess(img).unsqueeze(0).cuda()
    with torch.no_grad():
        output = model(img)[0]
    output = output.cpu().numpy().transpose(1,2,0)
    output = np.uint8(np.maximum(np.minimum(output*255 ,255),0))
    origin = np.uint8(img[0].cpu().numpy().transpose(1,2,0)*255)
    
    
    diff = np.uint8(np.abs(output.astype(np.float32) - origin.astype(np.float32)))
    loss_list.append(np.sum(diff))
    heatmap = cv2.applyColorMap(diff , cv2.COLORMAP_JET)
    margin = np.ones((diff.shape[0],margin_w,3))*255
    
    result = np.concatenate([origin[:,:,::-1],margin,output[:,:,::-1],margin,heatmap],axis = 1)
    label = 'good' if idx < len(good_test_list) else 'bad'
    cv2.imwrite(f"./create_img/{idx}_{label}.jpg",result)

結果は次のようになりました。

評価を行う際に必要な差分画像と呼ばれるものを作成しました。

差分画像は入力画像と再現画像の差を取ったもので、青に近ければ差が小さく赤に近ければ差が大きいことを表しています。

正常画像・異常画像の再現結果

正常画像は、差分結果からきれいに再現できることを確認しました。

一方、異常画像は、異常個所が復元できず差が大きくなっていることが分かります。

では、差分結果をもとに、異常検知が出来るか評価してみましょう!

Step7:ROC曲線による評価

今回は差分画像の数値の総和をもとに分類を行います。

ここで今回利用する評価指標であるROC曲線について説明します。

ROC(Receiver Operating Characteristic)曲線は横軸に偽陽性率(False Positive Rate。FPR)、縦軸に真陽性率(True Positive Rate。TPR)を取り、閾値を変化させその結果をプロットしたものです。

例えば次のような表があるとします。

点数と合格の関係性

この表の左側の「点数」は入試テストの点数を表し、「合格」は入試に合格したかを表しています。TはTrueの略で「合格」、FはFalseの略で「不合格」を表しています。

右側はある点数で閾値を設け、閾値以上を合格、閾値未満を不合格とした場合、実際の結果と比較した結果を表しています。

ここで、

  • TP:真陽性。正しく合格していると分類できた場合TPとなる。
  • FP:偽陽性。間違って合格していると分類した場合FPとなる。
  • TN:真陰性。正しく不合格であると分類できた場合TNとなる。
  • FN:偽陰性。間違って不合格であると分類した場合FNとなる。 です。

上の表を見ると、閾値が変わることで判定が変わっていることが分かります。

続いてTPR、FPRについて説明します。各閾値に対するTPR、FPRの計算方法は次の通りです。

TPR・FPR計算式

TPR(真陽性率)は、0 ~ 1の値を取ります。1に近いほど合格の取りこぼしが少なく、0に近ければ取りこぼしが多く発生します。値が大きいほど網羅性が高いと言えます。

FPR(偽陽性率)は、0 ~ 1の値を取ります。不合格を合格と誤って判定することが0に近いほど少なく、1に近ければ多く発生します。値が小さいほど誤検知率が低いと言えます。

TPR・FPRの計算の例として閾値10を考えます。

これは10点未満は不合格とするルールを表しています。

このルールに従った場合、10点は1つのみでTPと表されており、その他はTN・FNで表されています。

まずはTPR・FPRの計算をしてみます。TPRの計算ではTPとFNを利用します。表からTPは1つ、TNは6つ、FNは4つ、FPは6つ存在します。

これよりTPR・FPRは次のように計算されます。

閾値10の際のTPR・FPR計算結果

TPRは0.2、FPRは0.0が得られました。TPRが0.2であることから、閾値10点で合格・不合格を予測すると合格の取りこぼしが多く発生することが分かります。

一方、FPRが0.0であることから、誤った判定が行われないというのもポイントです。

上記で説明したTPR・FPRの計算をすべての閾値で起算した結果が表の下部分です。

この結果をグラフにプロットしたものがROC曲線と言われます。

上の表では次のような結果になりました。

合格・不合格分類のROC曲線

ROC曲線では、グラフが左上に近ければ近いほど良いとされます。

ROC曲線を評価する際、AUC(Area Under the Curve)と呼ばれる手法を用います。

AUCは0 ~ 1の値を取り、1に近いほど精度が良い傾向にあります。

上のROC曲線のAUCは0.82と1に近いため、良い分類精度であったと言えます。

ではこの指標を利用して、今回作成したモデルを用いた異常検知精度を評価しましょう!

from sklearn.metrics import roc_curve,roc_auc_score,precision_recall_curve,auc
import matplotlib.pyplot as plt

fpr, tpr, thresholds = roc_curve(labels,loss_list)

plt.plot(fpr, tpr, marker='o')

plt.xlabel('FPR: False positive rate')
plt.ylabel('TPR: True positive rate')
plt.grid()
plt.savefig('./sklearn_roc_curve.png')
print(roc_auc_score(labels,loss_list))

#出力
#0.915

正常・異常分類のROC曲線

結果は上図のようになりました!かなり左上に寄った結果になっていることが確認できます!

AUCの値も0.915と、1に非常に近く高い精度で分類できることが確認できます!

実際に運用する際は、ROC曲線を見ながら正常・異常の閾値を決定することになります。

ここではAIの使い方に合わせた閾値の決定方法についてご紹介します。

AIを補助的に使いたい

AIを補助的に使う

製品に異常が多く発生し、AIにすべてを任せるのは少し不安・・・という場合は、AIが異常と判断する閾値を高く設定します。

閾値を高く設定することで、取りこぼしが発生するようになりますが、AIの判断の確実性が向上します。

この場合AIの運用は、まずAIで明らかな傷の有無の検査をします。異常が発見された製品はこの段階で弾きます。

その後の工程では、AIが発見できなかった細かな異常が存在しないか、作業員の方が検査します。

結果的に作業員の方は、簡単な異常検知の業務を行わずに済むため、より集中して細かな異常検知に取り組むことが可能となります。

このように、AIがすべての異常を見つけるのではなく、前段の振り分けのように利用する場合は、閾値を高めに設定します。

「いつもと違う」を検出

「いつもと違う」を検出

製品にほとんど異常が発生せず、稀に発生する異常を見つけたい場合は、閾値を低く設定します。

閾値を低く設定することで、異常を取りこぼさなくなりますが、誤検知が多く発生するようになります。

この場合AIの運用は、まずAIで異常の有無を検査します。閾値が低く設定されているため、少しでも疑わしいところがあれば作業員による再検査を要求します。

一方で、AIが反応しなかった製品は確実に正常品であると言えるため、それ以上の検査を必要としません。

結果的に作業員の方は、確実に正常品である製品の検品を行わずに済むため、効率的な異常検知が可能となります。

このように、少しでも疑わしい部分がある製品のみを再検査するように利用する場合は、閾値を低めに設定します。

まとめ

今回はAutoEncoderとMVTec社の異常検知データセットを用いて、bottleの異常検知に取り組みました!

AutoEncoderによる異常検知は、正常画像のみを用いて取り組むことができるため、その他の方法による異常検知と比較して導入がしやすいです!

正常画像はたくさんあるけど、どうやってAIで使えばいいのか分からない・・・という方は、ぜひこの機会に一度お試しください!