現在利用しているWordPressサーバのMySQLへ定期的にデータを投入しているのですが、
どうにも使い勝手が悪いのでBigQueryへデータを投入してMySQLへ同期することにしました。
本番実装はCroudRunを使うのですが、
今回はPythonスクリプト一本で同期する方法を解説します。
Croud Runについてはこちら
実施スキーム
BigQuery上に同期先と同名のテーブルを作成し、データを投入しておきます。
これをPythonで抽出し、MySQLへ接続してデータを投入します。
今回接続するMySQLサーバはエックスサーバーで借りているWordPressサーバにあります。
ここにリモートアクセスするためには、
まずエックスサーバーへSSH接続した上で、SSHトンネル経由でアクセスする必要があります。
その他要件
その他、今回のスクリプトは以下の要件にしています。
- 同期方式は差分更新と洗い替え更新の両方を選べるようにする
- 全テーブルを同期するのではなく、プレフィクスが「m_」のテーブルのみを同期する
- 物理削除(DELETE)には対応しない(物理削除を含む場合には洗い替え更新する)
事前準備
差分更新をするために、前回の同期時刻を保持するテーブルをBigQuery上に作成します。
MySQLへ同期するテーブル群と同じデータセット内に、sync_datetime
というテーブルを作成します。
CREATE OR REPLACE TABLE `project-name.sample.sync_datetime` (
environment STRING OPTIONS (description = '環境')
sync_datetime DATETIME OPTIONS (description = '同期日時')
);
INSERT INTO `project-name.sample.sync_datetime`
(environment,sync_datetime)
VALUES
('development','2023-05-13 22:11:30')
, ('production','2023-05-13 22:13:40')
;
同期対象のテーブルは、いずれも update_datetime
という、
レコードの最終更新日時を保持するカラムを持っているものとします。
また、今回はPythonのgoogle.cloud
ライブラリを使ってBigQueryへアクセスするため、
事前に以下の方法でクライアントライブラリをインストールし、認証を済ませておきます。
https://cloud.google.com/bigquery/docs/reference/libraries?hl=ja#client-libraries-install-python
完成品
from datetime import datetime
from google.cloud import bigquery
import pymysql
from sshtunnel import SSHTunnelForwarder
# Parameters
bq_project_name = "project-name" # BigQueryのプロジェクトID
bq_dataset_name = "sample" # BigQueryのデータセットID
sync_mode = "diff" # 更新方式
environment = "production" # 環境識別子
wp_host = "xxxxxx.xserver.jp" # SSHサーバのホスト名
wp_username = "xxxxxx" # SSHサーバのユーザー名
wp_key_path = "~/.ssh/rsa_key_wp" # SSHサーバ秘密鍵のファイルパス
mysql_db_name = "xxxxxxx" # MySQLのデータベース名
mysql_username = "xxxxxxx" # MySQLのユーザー名
mysql_password = "xxxxxxx" # MySQLのパスワード
# BigQuery setup
bigquery_client = bigquery.Client()
# SSH and MySQL setup
ssh_config = {
"ssh_address_or_host": (wp_username, 10022), # WordPressサーバは22でなく10022
"ssh_username": wp_username,
"ssh_private_key": wp_key_path,
"remote_bind_address": ("127.0.0.1", 3306),
}
mysql_config = {
"user": mysql_username,
"password": mysql_password,
"host": "127.0.0.1",
"database": mysql_db_name,
"connect_timeout": 10,
"cursorclass": pymysql.cursors.DictCursor
}
def get_last_sync_datetime(environment):
# Replace with your BigQuery SQL query
query = f"""
SELECT sync_datetime
FROM `{bq_project_name}.{bq_dataset_name}.sync_datetime`
WHERE environment = '{environment}'
"""
# Execute the BigQuery query
query_job = bigquery_client.query(query)
result = query_job.result()
# Get sync_datetime value from the query result
sync_datetime = next(iter(result)).sync_datetime
return sync_datetime
def set_last_sync_datetime(environment,sync_datetime):
# Replace with your BigQuery SQL query
query = f"""
UPDATE `{bq_project_name}.{bq_dataset_name}.sync_datetime`
SET sync_datetime = '{sync_datetime}' WHERE environment = '{environment}'
"""
# Execute the BigQuery query
query_job = bigquery_client.query(query)
query_job.result()
def sync_data():
current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Get Last Sync DateTime From BigQuery
print("\n===== Start Get Last Sync Date =====")
last_sync_datetime = get_last_sync_datetime(environment)
print(last_sync_datetime)
print("===== End Get Last Sync Date =====")
# List tables in the dataset with table names starting with "m_"
query_get_sync_target=f"""
SELECT
table_name
FROM `{bq_project_name}.{bq_dataset_name}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
1=1
AND table_catalog = '{bq_project_name}'
AND table_schema = '{bq_dataset_name}'
AND table_name LIKE 'm_%'
"""
if sync_mode == "diff":
query_get_sync_target += f" AND DATETIME(last_modified_time, 'Asia/Tokyo') >= DATETIME('{last_sync_datetime}')"
# Execute the BigQuery query
query_job = bigquery_client.query(query_get_sync_target)
tables_to_sync = query_job.result()
# Connect to MySQL via SSH tunnel
tunnel = SSHTunnelForwarder(**ssh_config)
tunnel.start()
mysql_config["port"] = tunnel.local_bind_port
conn = pymysql.connect(**mysql_config)
print("\n===== Start Sync =====")
try:
cursor = conn.cursor()
# Loop through each table
for table in tables_to_sync:
table_name = f"{bq_project_name}.{bq_dataset_name}.{table.table_name}"
print(table_name)
# Replace with your BigQuery SQL query
query = f"SELECT * FROM `{table_name}`"
if sync_mode == "diff":
query += f" WHERE update_datetime > DATETIME '{last_sync_datetime}';"
# print(query)
# Execute the BigQuery query
query_job = bigquery_client.query(query)
rows = query_job.result()
schema = rows.schema
# Delete All Records When Renew Mode
if sync_mode == "renew":
print(f"\tDELETE FROM {table.table_name}.")
cursor.execute(f"DELETE FROM {table.table_name};")
print("\t", cursor.rowcount, "records are affected.")
if rows.total_rows == 0:
print("\tFound ", rows.total_rows, "records...Skip")
continue
print("\tSync ", rows.total_rows, "records...")
columns = ', '.join([field.name for field in schema])
update_columns = ', '.join([f"{field.name} = VALUES({field.name})" for field in schema])
# UPSERT query for MySQL (assuming the first column is the primary key)
upsert_query = f"INSERT INTO {table.table_name} ({columns}) VALUES "
values_list = []
for row in rows:
# Prepare column names and values for the UPSERT query
value_array = []
for i, value in enumerate(row):
if value is None:
value_array.append("NULL")
elif schema[i].field_type in ["STRING","DATE","TIME","DATETIME"]:
value = str(value).replace("'", "\\'")
value_array.append(f"'{value}'")
else:
value_array.append(str(value))
values_list.append("(" + ', '.join(value_array) + ")")
upsert_query += ', '.join(values_list)
upsert_query += f"ON DUPLICATE KEY UPDATE {update_columns};"
cursor.execute(upsert_query)
print("\t", cursor.rowcount, "records are affected.")
conn.commit()
except Exception as e:
print("Unexpected Error Occurred, Execute Rollback:")
print(e)
conn.rollback()
print("===== End Sync =====")
conn.close()
tunnel.stop()
print("\n===== Start Set Sync Date =====")
print(current_datetime)
set_last_sync_datetime(environment,current_datetime)
print("===== End Set Sync Date =====")
if __name__ == '__main__':
sync_data()
実行結果
ログ
$ python3 app.py
===== Start Get Last Sync Date =====
2023-05-13 22:13:40
===== End Get Last Sync Date =====
===== Start Sync =====
project-name.sample.m_sample
Sync 5 records...
5 records are affected.
===== End Sync =====
===== Start Set Sync Date =====
2023-05-14 11:52:10
===== End Set Sync Date =====
MySQLのデータ
無事同期ができました。
スクリプトの解説
ここからは各関数について解説します。
get_last_sync_datetime(environment)
BigQueryから、前回の同期日時を取得する関数です。
引数で渡されたenvironment
をキーとして、その環境へ最後に同期した日時を返却します。
set_last_sync_datetime(environment, sync_datetime)
今回の同期日時をBigQueryへ保存する関数です。
引数で渡されたenvironment
をキーとして、
同期日時を保持するテーブルをsync_datetime
で更新します。
sync_data()
今回のメインの処理を担う関数です。
ざっくり以下のブロックに別れます。
1. 同期対象のテーブル一覧を取得する
BigQueryのINFORMATION_SCHEMA.PARTITIONS
を使って、
同期対象となるテーブル一覧をSQLで取得します。
差分同期の場合には、ここでlast_modified_time
で絞り込んでおくことで、
前回の同期以降に更新されたテーブルのみを抽出できます。
2. MySQLへ接続する
sshtunnel
とpymysql
を使って、SSHトンネル経由でMySQLへアクセスします。
これらはいずれも事前にpip intall
しておく必要があります。
pip3 install sshtunnel
pip3 install PyMySQL
なお元々はmysqlclient
を使っていたのですが、
なぜかうまく接続できず、pymysql
を使うとうまくいきました。
3. テーブルごとにデータを同期する
同期対象のテーブルごとにループして、以下を行います。
- 同期対象のデータをBigQueryから取得する
(差分同期モードの場合は、update_datetime
で絞り込む) - 洗い替えモードの場合、MySQLの同名テーブルに対して
DELETE
を行う - BigQueryから取得したデータをカラムのデータ型に応じて整形する
- MySQLの同名テーブルへUPSERTを行う
4. BigQueryの同期日時を更新する
今回同期した環境について、
set_last_sync_datetime()
を使って同期日時を更新します。
これと以前に解説したスプレッドシートとの連携によって、
簡単にデータメンテができるようになりました。