はじめに
前回の記事では、複数の外部 SQL ファイルを読み込んで、結果を CSV で出力する方法をまとめました。
本記事はその続きで、実行結果を S3 に保存する方法をまとめる方法と、実行する SQL を S3 から読み込む方法をまとめます。
対象者
この記事は下記のような人を対象にしています。
- SQL の実行結果を CSV 出力し、S3 に保存したい人
- S3 に格納された SQL ファイルを Python で 読み込んで実行したい人
環境
以下が今回の環境です。DB は MySQL を使用します。
$ python -V
Python 3.7.10
$ pip list | grep PyMySQL
PyMySQL 1.0.2
$ mysql --version
mysql Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)
サンプルコード
データベース接続情報は.python-dotenv
モジュールを利用して読み込みます。
.envファイルを作成し、必要な情報を記載します。
user=root
password=mysql_password
host=localhost
port=33060
database=pets
bucket_name={S3バケット名}
ローカルの SQL ファイルを実行する
まずローカルに存在する複数の SQL ファイルを読み込んで結果を CSV で出力するコードです。
- アップロードするファイルのプレフィックスに”results/year/month/day/”を設定する
- アップロードするファイル名の末尾に日時情報を付与し、複数回実行した場合にファイルが上書きされることを回避する
import datetime
import glob
import os
import boto3
from dotenv import load_dotenv
import pymysql
load_dotenv()
# get the current date and time
t_delta = datetime.timedelta(hours=9)
JST = datetime.timezone(t_delta, 'JST')
now = datetime.datetime.now(JST)
date = now.strftime('%Y/%m/%d')
time = now.strftime('%Y%m%d%M%S')
# S3
bucket_name = os.environ.get('bucket_name')
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
s3_prefix = f'results/{date}/' # プレフィックスに"results/year/month/day/"を設定する
def get_connection():
connection = pymysql.connect(
user = os.environ.get('user'),
password = os.environ.get('password'),
host = os.environ.get('host'),
port = int(os.environ.get('port')),
database = os.environ.get('database'),
charset = 'utf8mb4',
cursorclass = pymysql.cursors.DictCursor,
)
return connection
def get_query(query_file_path):
with open(query_file_path, 'r', encoding='utf-8') as f:
query = f.read()
return query
with get_connection() as connection:
with connection.cursor() as cursor:
query_file_paths = glob.glob('../query/sql/*.sql')
for query_file_path in query_file_paths:
query = get_query(query_file_path)
cursor.execute(query)
rows = cursor.fetchall()
file_name = f'{os.path.splitext(os.path.basename(query_file_path))[0]}'
with open(f'./results/{file_name}.csv', 'w') as f:
for row in rows:
csv = ''
for value in row.values():
csv = csv + str(value) + ','
f.write(csv[:-1] + '\n')
bucket.upload_file(
f'./results/{file_name}.csv',
f"{s3_prefix}{file_name}-{time}.csv"
)
S3 に保存されている SQL ファイルを読み込んで実行する
次に S3 に保存されている複数の SQL ファイルを読み込んで結果を CSV で出力するコードです。
- プレフィックスが”sql/”であるオブジェクトを取得する
- 取得したオブジェクトから、拡張子が”.sql”であるファイルを読み込み、実行する
import datetime
import os
import boto3
from dotenv import load_dotenv
import pymysql
load_dotenv()
# get the current date and time
t_delta = datetime.timedelta(hours=9)
JST = datetime.timezone(t_delta, 'JST')
now = datetime.datetime.now(JST)
date = now.strftime('%Y/%m/%d')
time = now.strftime('%Y%m%d%M%S')
# S3
bucket_name = os.environ.get('bucket_name')
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
s3_prefix = f'results/{date}/' # プレフィックスに"results/year/month/day/"を設定する
def get_connection():
connection = pymysql.connect(
user = os.environ.get('user'),
password = os.environ.get('password'),
host = os.environ.get('host'),
port = int(os.environ.get('port')),
database = os.environ.get('database'),
charset = 'utf8mb4',
cursorclass = pymysql.cursors.DictCursor,
)
return connection
with get_connection() as connection:
with connection.cursor() as cursor:
objects = bucket.objects.filter(Prefix='sql')
for obj in objects:
if os.path.splitext(obj.key)[-1] == '.sql':
query = obj.get()['Body'].read().decode()
cursor.execute(query)
rows = cursor.fetchall()
file_name = f'{os.path.splitext(os.path.basename(obj.key))[0]}'
with open(f'./results/{file_name}.csv', 'w') as f:
for row in rows:
csv = ''
for value in row.values():
csv = csv + str(value) + ','
f.write(csv[:-1] + '\n')
bucket.upload_file(
f'./results/{file_name}.csv',
f"{s3_prefix}{file_name}-{time}.csv"
)
else:
continue
おわりに
本記事では、実行結果を S3 に保存する方法をまとめる方法と、実行する SQL を S3 から読み込む方法をまとめます。
この記事がどなたかの参考になれば幸いです。
コメント