現在利用しているWordPressサーバのMySQLへ定期的にデータを投入しているのですが、
どうにも使い勝手が悪いのでBigQueryへデータを投入してMySQLへ同期することにしました。

本番実装はCroudRunを使うのですが、
今回はPythonスクリプト一本で同期する方法を解説します。

Croud Runについてはこちら

【GCP】Cloud RunとワークフローでWPテーマリリースを構築する【サーバレス】


実施スキーム

今回実現するスキームはこちら。

BigQuery上に同期先と同名のテーブルを作成し、データを投入しておきます。
これをPythonで抽出し、MySQLへ接続してデータを投入します。

今回接続するMySQLサーバはエックスサーバーで借りているWordPressサーバにあります。


ここにリモートアクセスするためには、
まずエックスサーバーへSSH接続した上で、SSHトンネル経由でアクセスする必要があります。

その他要件

その他、今回のスクリプトは以下の要件にしています。

  • 同期方式は差分更新と洗い替え更新の両方を選べるようにする
  • 全テーブルを同期するのではなく、プレフィクスが「m_」のテーブルのみを同期する
  • 物理削除(DELETE)には対応しない(物理削除を含む場合には洗い替え更新する)

事前準備

差分更新をするために、前回の同期時刻を保持するテーブルをBigQuery上に作成します。
MySQLへ同期するテーブル群と同じデータセット内に、sync_datetimeというテーブルを作成します。

CREATE OR REPLACE TABLE `project-name.sample.sync_datetime` (
    environment     STRING      OPTIONS (description = '環境')
    sync_datetime   DATETIME    OPTIONS (description = '同期日時')
);

INSERT INTO `project-name.sample.sync_datetime`
    (environment,sync_datetime)
VALUES
   ('development','2023-05-13 22:11:30')
,    ('production','2023-05-13 22:13:40')
;

同期対象のテーブルは、いずれも update_datetimeという、
レコードの最終更新日時を保持するカラムを持っているものとします。

また、今回はPythonのgoogle.cloudライブラリを使ってBigQueryへアクセスするため、
事前に以下の方法でクライアントライブラリをインストールし、認証を済ませておきます。
https://cloud.google.com/bigquery/docs/reference/libraries?hl=ja#client-libraries-install-python

完成品

from datetime import datetime
from google.cloud import bigquery
import pymysql
from sshtunnel import SSHTunnelForwarder

# Parameters
bq_project_name = "project-name"    # BigQueryのプロジェクトID
bq_dataset_name = "sample"          # BigQueryのデータセットID
sync_mode = "diff"                  # 更新方式
environment = "production"          # 環境識別子
wp_host = "xxxxxx.xserver.jp"       # SSHサーバのホスト名
wp_username = "xxxxxx"              # SSHサーバのユーザー名
wp_key_path = "~/.ssh/rsa_key_wp"   # SSHサーバ秘密鍵のファイルパス
mysql_db_name = "xxxxxxx"           # MySQLのデータベース名
mysql_username = "xxxxxxx"          # MySQLのユーザー名
mysql_password = "xxxxxxx"          # MySQLのパスワード

# BigQuery setup
bigquery_client = bigquery.Client()

# SSH and MySQL setup
ssh_config = {
    "ssh_address_or_host": (wp_username, 10022), # WordPressサーバは22でなく10022
    "ssh_username": wp_username,
    "ssh_private_key": wp_key_path,
    "remote_bind_address": ("127.0.0.1", 3306),
}

mysql_config = {
    "user": mysql_username,
    "password": mysql_password,
    "host": "127.0.0.1",
    "database": mysql_db_name,
    "connect_timeout": 10,
    "cursorclass": pymysql.cursors.DictCursor
}

def get_last_sync_datetime(environment):
    # Replace with your BigQuery SQL query
    query = f"""
    SELECT sync_datetime
    FROM `{bq_project_name}.{bq_dataset_name}.sync_datetime`
    WHERE environment = '{environment}'
    """
    # Execute the BigQuery query
    query_job = bigquery_client.query(query)
    result = query_job.result()
    # Get sync_datetime value from the query result
    sync_datetime = next(iter(result)).sync_datetime
    return sync_datetime

def set_last_sync_datetime(environment,sync_datetime):
    # Replace with your BigQuery SQL query
    query = f"""
    UPDATE `{bq_project_name}.{bq_dataset_name}.sync_datetime`
    SET sync_datetime = '{sync_datetime}' WHERE environment = '{environment}'
    """
    # Execute the BigQuery query
    query_job = bigquery_client.query(query)
    query_job.result()

def sync_data():
    current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    # Get Last Sync DateTime From BigQuery
    print("\n===== Start Get Last Sync Date =====")
    last_sync_datetime = get_last_sync_datetime(environment)
    print(last_sync_datetime)
    print("===== End Get Last Sync Date =====")
    # List tables in the dataset with table names starting with "m_"
    query_get_sync_target=f"""
        SELECT
            table_name
        FROM `{bq_project_name}.{bq_dataset_name}.INFORMATION_SCHEMA.PARTITIONS`
        WHERE
            1=1
        AND table_catalog = '{bq_project_name}'
        AND table_schema  = '{bq_dataset_name}'
        AND table_name  LIKE  'm_%'
    """
    if sync_mode == "diff":
        query_get_sync_target += f" AND DATETIME(last_modified_time, 'Asia/Tokyo') >= DATETIME('{last_sync_datetime}')"
    # Execute the BigQuery query
    query_job = bigquery_client.query(query_get_sync_target)
    tables_to_sync = query_job.result()
    # Connect to MySQL via SSH tunnel
    tunnel = SSHTunnelForwarder(**ssh_config)
    tunnel.start()
    mysql_config["port"] = tunnel.local_bind_port
    conn = pymysql.connect(**mysql_config)
    print("\n===== Start Sync =====")
    try:
        cursor = conn.cursor()
        # Loop through each table
        for table in tables_to_sync:
            table_name = f"{bq_project_name}.{bq_dataset_name}.{table.table_name}"
            print(table_name)
            # Replace with your BigQuery SQL query
            query = f"SELECT * FROM `{table_name}`"
            if sync_mode == "diff":
                query += f" WHERE update_datetime > DATETIME '{last_sync_datetime}';"
            # print(query)
            # Execute the BigQuery query
            query_job = bigquery_client.query(query)
            rows = query_job.result()
            schema = rows.schema
            # Delete All Records When Renew Mode
            if sync_mode == "renew":
                print(f"\tDELETE FROM {table.table_name}.")
                cursor.execute(f"DELETE FROM {table.table_name};")
                print("\t", cursor.rowcount, "records are affected.")
            if rows.total_rows == 0:
                print("\tFound ", rows.total_rows, "records...Skip")
                continue
            print("\tSync ", rows.total_rows, "records...")
            columns = ', '.join([field.name for field in schema])
            update_columns = ', '.join([f"{field.name} = VALUES({field.name})" for field in schema])
            # UPSERT query for MySQL (assuming the first column is the primary key)
            upsert_query = f"INSERT INTO {table.table_name} ({columns}) VALUES "
            values_list = []
            for row in rows:
                # Prepare column names and values for the UPSERT query
                value_array = []
                for i, value in enumerate(row):
                    if value is None:
                        value_array.append("NULL")
                    elif schema[i].field_type in ["STRING","DATE","TIME","DATETIME"]:
                        value = str(value).replace("'", "\\'")
                        value_array.append(f"'{value}'")
                    else:
                        value_array.append(str(value))
                values_list.append("(" + ', '.join(value_array) + ")")
            upsert_query += ', '.join(values_list)
            upsert_query += f"ON DUPLICATE KEY UPDATE {update_columns};"
            cursor.execute(upsert_query)
            print("\t", cursor.rowcount, "records are affected.")
        conn.commit()
    except Exception as e:
        print("Unexpected Error Occurred, Execute Rollback:")
        print(e)
        conn.rollback()
    print("===== End Sync =====")
    conn.close()
    tunnel.stop()
    print("\n===== Start Set Sync Date =====")
    print(current_datetime)
    set_last_sync_datetime(environment,current_datetime)
    print("===== End Set Sync Date =====")

if __name__ == '__main__':
    sync_data()

実行結果

ログ

$ python3 app.py

===== Start Get Last Sync Date =====
2023-05-13 22:13:40
===== End Get Last Sync Date =====

===== Start Sync =====
project-name.sample.m_sample
        Sync  5 records...
         5 records are affected.
===== End Sync =====

===== Start Set Sync Date =====
2023-05-14 11:52:10
===== End Set Sync Date =====

MySQLのデータ

無事同期ができました。

スクリプトの解説

ここからは各関数について解説します。

get_last_sync_datetime(environment)

BigQueryから、前回の同期日時を取得する関数です。
引数で渡されたenvironmentをキーとして、その環境へ最後に同期した日時を返却します。

set_last_sync_datetime(environment, sync_datetime)

今回の同期日時をBigQueryへ保存する関数です。
引数で渡されたenvironmentをキーとして、
同期日時を保持するテーブルをsync_datetimeで更新します。

sync_data()

今回のメインの処理を担う関数です。
ざっくり以下のブロックに別れます。

1. 同期対象のテーブル一覧を取得する

BigQueryのINFORMATION_SCHEMA.PARTITIONSを使って、
同期対象となるテーブル一覧をSQLで取得します。

差分同期の場合には、ここでlast_modified_timeで絞り込んでおくことで、
前回の同期以降に更新されたテーブルのみを抽出できます。

2. MySQLへ接続する

sshtunnelpymysqlを使って、SSHトンネル経由でMySQLへアクセスします。
これらはいずれも事前にpip intallしておく必要があります。

pip3 install sshtunnel
pip3 install PyMySQL

なお元々はmysqlclientを使っていたのですが、
なぜかうまく接続できず、pymysqlを使うとうまくいきました。

3. テーブルごとにデータを同期する

同期対象のテーブルごとにループして、以下を行います。

  • 同期対象のデータをBigQueryから取得する
    (差分同期モードの場合は、update_datetimeで絞り込む)
  • 洗い替えモードの場合、MySQLの同名テーブルに対してDELETEを行う
  • BigQueryから取得したデータをカラムのデータ型に応じて整形する
  • MySQLの同名テーブルへUPSERTを行う
4. BigQueryの同期日時を更新する

今回同期した環境について、
set_last_sync_datetime()を使って同期日時を更新します。


これと以前に解説したスプレッドシートとの連携によって、
簡単にデータメンテができるようになりました。

【GAS】スプレッドシートからBigQueryのテーブルを作成する【Google完結】