EY-Office ブログ

GmailのPOPサポート終了対応ツールにメール表示機能を追加したが、認証で手間取った

以前書いたGmailのPOPサポート終了に対応する為のツールをAIに作ってもらったPythonでもJSXが使えるぞ! PyJSXを使ってHTMLメールを作ってみた の続きです。

リンク先を見ない方のために、以前の記事をまとめると以下のようになります。

  • GmailがPOPを使用したサードパーティのアカウントからGmailアカウントへのメールの取得サービスを2026年1月より終了
  • そこでホスティングサービスからGmailにメール転送する事にした。ただし昔は転送されないメールがあったが、ホスティングサービスで対応されたようだ
  • 本当にホスティングサービスから正しくGmailのメールが転送されているか、昨日ホスティングサービスに着いたメール一覧をメールで送ってくれるツールを作成した
  • メール一覧はHTMLにした方が見やすいので、HTMLメールに変更。HTML作成にはPyJSXを使用

Gmail_stop_pop.jpg Nano Banana Proが生成した画像を使っています

やはりメールの内容を見たくなる

毎日、以下の画像(ぼかしてあります)のようなメールをチェックしているのですが、時々メールの内容が気になり、メールを読みたくなります。

PyJSXの記事のコードには、以下のようにメール一覧の時間部分をクリックするとメール表示サービスを起動するコードを書きましたが、その時点ではメール表示サービスは作りませんでした。

  <td style={td_style}>
      <a href={f"{read_mail_url}/{email['uid']}"} style={a_style}>
          {email['date'][11:]}
      </a>
  </td>

そこで、今回はメール表示サービスを作り以下の画像(ぼかしてあります)のようにメール内容が表示サービスをAWS Lambdaを使い作りました。HTMLメールも表示できます。

仕様

AWS Lambdaに定義した関数はAmazon API Gatewayと組み合わせるとAPI Gatewayが生成したURLをアクセスすると設定されたAWS Lambda関数が呼び出せ、関数の結果をJSONやHTMLで受け取れます。

今回の仕様は、

  • GETアクセスするURLのhttps://XYZ12345.execute-api.ap-northeast-1.amazonaws.com/default/12345のように最後にはIMAPメールのUIDを指定する
  • レスポンスはHTML
  • 何らかの認証を付加し、他人がURLを直接アクセスしても簡単には表示出来ないようにする

API Gatewayにはいくつかの認証機構があるのですが、API向けのものが多くブラウザー(<a href=..>)でアクセスする場合に付いては色々と調べたのですが、古風なBasic認証が簡単に作れて良さそうです。Basic認証はセキュリティー的にはやや問題ありですが私個人向のサービスなので良いでしょう。

メール取得Lambda

UIDを指定しIMAPでメールを受信しHTMLを生成するPythonのコードをClaudeに生成してもらい。一部は人手で置き換えました、

  • HTML生成する部分はPyJSXに置き換え
  • メール情報取得とレンダリングが一体化していたので分離(リファクタリング)

mail_read.py

特に説明の必要はないかと思います、Pythonのimaplib, email等を使ったメール取得コードです。HTMLメールやMIMEエンコードされたSubject等にも対応しています。
ちなみに、①はLambdaでURLに含まれるUIDの取得コードです。

import pyjsx.auto_setup
import imaplib
import email
import os
from datetime import timedelta, timezone
from email.header import decode_header
from email.utils import parsedate_to_datetime
from mail_render import make_mail_html

# IMAP接続設定
IMAP_SERVER = os.getenv("IMAP_SERVER")
IMAP_PORT = int(os.getenv("IMAP_PORT"))
EMAIL_ACCOUNT = os.getenv("EMAIL_ACCOUNT")
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD")


def connect_to_imap():
    """IMAPサーバーに接続"""
    mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
    mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
    return mail


def decode_mime_words(s):
    """MIMEエンコードされた文字列をデコード"""
    if s is None:
        return ''
    decoded_fragments = decode_header(s)
    fragments = []
    for fragment, encoding in decoded_fragments:
        if isinstance(fragment, bytes):
            if encoding:
                try:
                    fragment = fragment.decode(encoding, errors='replace')
                except:
                    fragment = fragment.decode('utf-8', errors='replace')
            else:
                for enc in ['iso-2022-jp', 'utf-8', 'shift_jis', 'euc-jp']:
                    try:
                        fragment = fragment.decode(enc)
                        break
                    except:
                        continue
                else:
                    fragment = fragment.decode('utf-8', errors='replace')
        fragments.append(str(fragment))
    return ''.join(fragments)


def convert_to_jst(date_str):
    """メールの日付を日本時間(JST)に変換"""
    try:
        dt = parsedate_to_datetime(date_str)
        jst = timezone(timedelta(hours=9))
        dt_jst = dt.astimezone(jst)
        return dt_jst.strftime('%Y-%m-%d %H:%M:%S')
    except:
        return date_str


def get_email_body(msg):
    """メールの本文を取得(text/plain と text/html の両方)"""
    text_body = ""
    html_body = ""

    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            content_disposition = str(part.get("Content-Disposition"))

            if "attachment" not in content_disposition:
                payload = part.get_payload(decode=True)
                if payload:
                    charset = part.get_content_charset()
                    decoded_text = decode_payload(payload, charset)

                    if content_type == "text/plain":
                        text_body = decoded_text
                    elif content_type == "text/html":
                        html_body = decoded_text
    else:
        payload = msg.get_payload(decode=True)
        if payload:
            charset = msg.get_content_charset()
            decoded_text = decode_payload(payload, charset)

            if msg.get_content_type() == "text/html":
                html_body = decoded_text
            else:
                text_body = decoded_text

    return text_body, html_body


def decode_payload(payload, charset):
    """ペイロードをデコード"""
    if charset:
        try:
            return payload.decode(charset, errors='replace')
        except:
            return payload.decode('utf-8', errors='replace')
    else:
        for enc in ['iso-2022-jp', 'utf-8', 'shift_jis', 'euc-jp']:
            try:
                return payload.decode(enc)
            except:
                continue
        return payload.decode('utf-8', errors='replace')


def fetch_email_by_uid(uid, mailbox='INBOX'):
    """
    UIDで指定されたメールを取得して表示

    Args:
        uid: メールのUID(文字列または数値)
        mailbox: メールボックス名
    """
    try:
        mail = connect_to_imap()
        mail.select(mailbox)

        # UIDでメールを取得
        uid_str = str(uid) if isinstance(uid, int) else uid
        status, msg_data = mail.uid('fetch', uid_str, '(RFC822)')

        if status != 'OK':
            print(f"メールの取得に失敗しました: UID={uid}")
            return None

        for response_part in msg_data:
            if isinstance(response_part, tuple):
                msg = email.message_from_bytes(response_part[1])

                # メール情報を取得
                subject = decode_mime_words(msg['Subject']) if msg['Subject'] else 'No Subject'
                from_ = decode_mime_words(msg['From']) if msg['From'] else 'Unknown'
                date = convert_to_jst(msg['Date']) if msg['Date'] else 'Unknown'
                text_body, html_body = get_email_body(msg)
                return {
                    'subject': subject,
                    'from': from_,
                    'date': date,
                    'text_body': text_body,
                    'html_body': html_body
                }

        return None

    except imaplib.IMAP4.error as e:
        print(f"IMAP Error: {e}")
        return None
    except Exception as e:
        print(f"Error: {e}")
        return None
    finally:
        try:
            mail.close()
            mail.logout()
        except:
            pass

def lambda_handler(event, context):
    # URLからパスパラメーターuidを取得
    uid = int(event.get('pathParameters', {}).get('uid'))   # ← ①

    mail = fetch_email_by_uid(uid)
    html = make_mail_html(mail)

    return {
        'statusCode': 200,
        'headers': { 'Content-Type': 'text/html' },
        'body': html
    }

mail_render.py

HTML作成は大好きなPythonでJSXが使えるPyJSXを使っています。PyJSXに付いてはPyJSXに付いてのブログを参照して下さい。
ちなみにJSXにHTML文字列をHTMLタグとして取り込むにはHTMLDontEscape()関数を使います。

# coding: jsx
from pyjsx import jsx, JSX, HTMLDontEscape

def make_mail_html(mail):
    """メールをHTML形式で表示"""

    body_style = {
        "font-family": "'Segoe UI', Tahoma, Geneva, Verdana, sans-serif",
        "background-color": "#f5f5f5",
        "margin": "0",
        "padding": "20px"
    }
    container_style = {
        "max-width": "900px",
        "margin": "0 auto",
        "background-color": "white",
        "border-radius": "8px",
        "box-shadow": "0 2px 10px rgba(0,0,0,0.1)",
        "overflow": "hidden"
    }
    header_style = {
        "background-color": "#4285f4",
        "color": "white",
        "padding": "20px 30px"
    }
    header_h1_style = {
        "margin": "0",
        "font-size": "24px"
    }
    info_section_style = {
        "padding": "20px 30px",
        "border-bottom": "1px solid #e0e0e0"
    }
    info_row_style = {
        "display": "flex",
        "margin-bottom": "12px"
    }
    info_label_style = {
        "font-weight": "bold",
        "color": "#555",
        "min-width": "80px",
        "margin-right": "10px"
    }
    info_value_style = {
        "color": "#333",
        "word-break": "break-all"
    }
    body_section_style = {
        "padding": "30px"
    }
    body_content_style = {
        "line-height": 1.6,
        "color": "#333"
    }
    text_body_style = {
        "white-space": "pre-wrap",
        "font-family": "'Courier New', monospace",
        "background-color": "#f9f9f9",
        "padding": "15px",
        "border-radius": "4px",
        "border-left": "4px solid #4285f4"
      }
    info_label_style = {
          "font-weight": "bold",
          "color": "#555",
          "min-width": "80px",
          "margin-right": "10px"
    }
    info_value_style = {
          "color": "#333",
          "word-break": "break-all"
      }
    body_section_style = {
          "padding": "30px"
      }
    body_content_style = {
          "line-height": 1.6,
          "color": "#333"
      }
    text_body_style = {
          "white-space": "pre-wrap",
          "font-family": "'Courier New', monospace",
          "background-color": "#f9f9f9",
          "padding": "15px",
          "border-radius": "4px",
          "border-left": "4px solid #4285f4"
      }
    html_body_style = {
          "border": "1px solid #e0e0e0",
          "padding": "15px",
          "border-radius": "4px",
          "background-color": "white"
      }
    content_type_style = {
            "font-size": "12px",
            "color": "#888",
            "margin-bottom": "10px",
            "font-weight": "bold"
        }

    if mail['html_body']:
        body_content = (
            <>
                <div style={content_type_style}>HTML形式</div>
                <div style={html_body_style}>{HTMLDontEscape(mail["html_body"])}</div>
            </>
        )
    elif mail['text_body']:
        body_content = (
            <>
                <div style={content_type_style}>テキスト形式</div>
                <div style={text_body_style}>{HTMLDontEscape(mail["text_body"])}</div>
            </>
        )
    else:
        body_content = (
            <div style={text_body_style}>本文なし</div>
        )

    html = (
      <html lang="ja">
        <head>
            <meta charset="UTF-8" />
            <meta name="viewport" content="width=device-width, initial-scale=1.0" />
            <title>メール表示</title>
        </head>
        <body style={body_style}>
          <div style={container_style}>
              <div style={header_style}>
                  <h1 style={header_h1_style}>📧 メール詳細</h1>
              </div>
              <div style={info_section_style}>
                  <div style={info_row_style}>
                      <div style={info_label_style}>送信者:</div>
                      <div style={info_value_style}>{mail['from']}</div>
                  </div>
                  <div style={info_row_style}>
                      <div style={info_label_style}>日時:</div>
                      <div style={info_value_style}>{mail['date']}</div>
                  </div>
                  <div style={info_row_style}>
                      <div style={info_label_style}>件名:</div>
                      <div style={info_value_style}>{mail['subject']}</div>
                  </div>
              </div>
              <div style={body_section_style}>
                  <div style={body_content_style}>
                      {body_content}
                  </div>
              </div>
          </div>
        </body>
    </html>
    )

    return str(html)

Basic認証の付加

API GatewayはAWS IAM認証やAPIキー認証をサポートしていますが、これらはAPI向けの認証でブラウザーからのアクセスではクライアント側にもコードが必要になります。API GatewayにはLambda オーソライザーという独自の認証機構を組み込める機能を持っています。
今回はBasic認証を採用しました。そこでBasic認証の機構をLambdaオーソライザーを使って作る必要がありますがネット上にたくさんの情報があります、今回はDevelopersIOの記事を参考にしました。ありがとうございます🙇‍♀️

実はLambda オーソライザーはLambda関数として定義し、API Gatewayに定義する事で認証機能を実現します。

basicAuth.py

Basic認証の仕組みはWikipediaのBasic認証にも書かれていますがシンプルなものです。このコードもAI(Claude)が作ってくれました。

  • ① ヘッダーのAuthorization情報を取得
    • いろいろな記事を見るとヘッダー名がAuthorizationauthorizationの2つがあり、このコードではどちらでもよいコードになっていいます
  • ② Authorization情報が無い、またはBasic認証でなければDeny(拒否)
  • ③ Authorization情報からユーザー、パスワード変更を取得
  • ④ 環境変数に定義されたユーザー、パスワードと比較し合っていればAllow(許可)、合ってなければDeny(拒否)
import json
import base64
import os

def lambda_handler(event, context):
    """
    API Gateway Lambda Authorizer with Basic Authentication
    """
    
    # 環境変数から認証情報を取得(Lambda環境変数に設定)
    VALID_USERNAME = os.environ.get('BASIC_AUTH_USERNAME')
    VALID_PASSWORD = os.environ.get('BASIC_AUTH_PASSWORD')
    
    # Authorizationヘッダーを取得
    auth_header = event.get('headers', {}).get('Authorization') or \
                  event.get('headers', {}).get('authorization')        # ← ①

    if not auth_header:
        return generate_policy('Deny', event['methodArn'])             # ← ②
    
    # "Basic "プレフィックスを確認
    if not auth_header.startswith('Basic '):
        return generate_policy('Deny', event['methodArn'])             # ← ②
    
    # Base64エンコードされた認証情報をデコード
    try:
        encoded_credentials = auth_header.split(' ')[1]
        decoded_credentials = base64.b64decode(encoded_credentials).decode('utf-8')
        username, password = decoded_credentials.split(':', 1)         # ← ③
    except Exception as e:
        print(f"認証情報のデコードエラー: {e}")
        return generate_policy('Deny', event['methodArn'])
    
    # 認証チェック
    if username == VALID_USERNAME and password == VALID_PASSWORD:      # ← ④
        return generate_policy('Allow', event['methodArn'], username)
    else:
        return generate_policy('Deny', event['methodArn'])


def generate_policy(effect, resource, principal_id='user'):
    """
    IAMポリシーを生成
    """
    policy = {
        'principalId': principal_id,
        'policyDocument': {
            'Version': '2012-10-17',
            'Statement': [
                {
                    'Action': 'execute-api:Invoke',
                    'Effect': effect,
                    'Resource': resource
                }
            ]
        }
    }
    
    # コンテキストを追加(オプション:後続のLambdaで使用可能)
    if effect == 'Allow':
        policy['context'] = {
            'username': principal_id
        }
    
    return policy

API Gatewayの設定

AWSの設定はCloudFormationのような定義ファイルを使う方法やコードで定義する方法もありますが、私はAWS弱者なのでAWSコンソールから定義しました。そしてハマりました❗ ハマった箇所には😅を付けました。

API Gateway リーソース
  • ① APIは GET /{uid} になります
メソッドリクエストの定義
  • ② BASIC認証用のオーソライザーを指定
  • ③ リクエストパスの定義{uid}の部分がパラメーターになります

結合リクエストの定義
  • ① API GatewayはLambda以外の使い方があります、ここではLambdaです
  • ② Lambda プロキシ統合をONししないとHTMLをレスポンとして戻せません。OFFだとJSONが戻ります😅
  • ③ 対応するLambda関数の定義

API Gateway オーソライザー

オーソライザー(認証)の定義です。

  • Lambda 関数:認証用Lambda関数を指定
  • Lambda イベントペイロード:リクエストを設定
    • AI(Claude)はトークンだと言っていましたが間違いのようです😅
  • ID ソース: ヘッダーのAuthorization を指定
  • 認可のキャッシュ:これをOFFにしないと、UIDを変えてリクエストすると認証エラーになってしまいます😅

このメニューの編集にあるオーソライザーをテストで認証用Lambda関数を単体テストできます。

API Gateway ゲートウェイのレスポンス

未認証時(HTTPステータス401)のレスポンスにWWW-Authenticate: Basicを設定しています。このレスポンスを受け取るとブラウザーはBasic認証用のダイアログを表示してくれます。また値は'Basic'と書く必要があります。

まとめ

AIにコードや設定を聞きながら今回の機能を作りましたがAIの回答が間違っていたり、またネット上の記事が最新のAWSコンソール画面ではなかったりで苦労しましたが、何とか動きました。
AWSにはたくさんの機能があり、本当に毎回勉強になりますね。😅

今回の開発で手こずった最大の原因は、APIをデプロイボタンの押し忘れでした。設定を変更しいざテスト❗と実行すると動かず悩むというのも何度も体験したので、途中からはテスト前にAPIをデプロイボタンを押す癖が付き、開発が順調になりました。😅

- about -

EY-Office代表取締役
・プログラマー
吉田裕美の
開発者向けブログ