Python で S3 に保存されている SQL ファイルを読み込み、実行結果を S3 に出力する

AWS

はじめに

前回の記事では、複数の外部 SQL ファイルを読み込んで、結果を CSV で出力する方法をまとめました。
本記事はその続きで、実行結果を S3 に保存する方法をまとめる方法と、実行する SQL を S3 から読み込む方法をまとめます。

対象者

この記事は下記のような人を対象にしています。

  • SQL の実行結果を CSV 出力し、S3 に保存したい人
  • S3 に格納された SQL ファイルを Python で 読み込んで実行したい人

環境

以下が今回の環境です。DB は MySQL を使用します。

$ python -V
Python 3.7.10
$ pip list | grep PyMySQL
PyMySQL               1.0.2

$ mysql --version
mysql  Ver 8.0.29 for Linux on x86_64 (MySQL Community Server - GPL)

サンプルコード

データベース接続情報は.python-dotenvモジュールを利用して読み込みます。
.envファイルを作成し、必要な情報を記載します。

user=root
password=mysql_password
host=localhost
port=33060
database=pets
bucket_name={S3バケット名}

ローカルの SQL ファイルを実行する

まずローカルに存在する複数の SQL ファイルを読み込んで結果を CSV で出力するコードです。

  • アップロードするファイルのプレフィックスに”results/year/month/day/”を設定する
  • アップロードするファイル名の末尾に日時情報を付与し、複数回実行した場合にファイルが上書きされることを回避する
import datetime
import glob
import os

import boto3 
from dotenv import load_dotenv
import pymysql

load_dotenv()

# get the current date and time
t_delta = datetime.timedelta(hours=9)
JST = datetime.timezone(t_delta, 'JST')
now = datetime.datetime.now(JST)
date = now.strftime('%Y/%m/%d')
time = now.strftime('%Y%m%d%M%S')

# S3 
bucket_name = os.environ.get('bucket_name')
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
s3_prefix = f'results/{date}/' # プレフィックスに"results/year/month/day/"を設定する

def get_connection():
    connection = pymysql.connect(
        user = os.environ.get('user'),
        password = os.environ.get('password'),
        host = os.environ.get('host'),
        port = int(os.environ.get('port')),
        database = os.environ.get('database'),
        charset = 'utf8mb4',
        cursorclass = pymysql.cursors.DictCursor,
        )
    return connection

def get_query(query_file_path):
    with open(query_file_path, 'r', encoding='utf-8') as f:
        query = f.read()
    return query


with get_connection() as connection:
    with connection.cursor() as cursor:
        query_file_paths = glob.glob('../query/sql/*.sql')
        for query_file_path in query_file_paths:
            query = get_query(query_file_path)
            cursor.execute(query)
            rows = cursor.fetchall()
            file_name = f'{os.path.splitext(os.path.basename(query_file_path))[0]}'

            with open(f'./results/{file_name}.csv', 'w') as f:
                for row in rows:
                    csv = ''
                    for value in row.values():
                        csv = csv + str(value) + ','
                    f.write(csv[:-1] + '\n')

            bucket.upload_file(
                f'./results/{file_name}.csv',
                f"{s3_prefix}{file_name}-{time}.csv"
                )

S3 に保存されている SQL ファイルを読み込んで実行する

次に S3 に保存されている複数の SQL ファイルを読み込んで結果を CSV で出力するコードです。

  • プレフィックスが”sql/”であるオブジェクトを取得する
  • 取得したオブジェクトから、拡張子が”.sql”であるファイルを読み込み、実行する
import datetime
import os

import boto3 
from dotenv import load_dotenv
import pymysql

load_dotenv()

# get the current date and time
t_delta = datetime.timedelta(hours=9)
JST = datetime.timezone(t_delta, 'JST')
now = datetime.datetime.now(JST)
date = now.strftime('%Y/%m/%d')
time = now.strftime('%Y%m%d%M%S')

# S3 
bucket_name = os.environ.get('bucket_name')
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
s3_prefix = f'results/{date}/' # プレフィックスに"results/year/month/day/"を設定する

def get_connection():
    connection = pymysql.connect(
        user = os.environ.get('user'),
        password = os.environ.get('password'),
        host = os.environ.get('host'),
        port = int(os.environ.get('port')),
        database = os.environ.get('database'),
        charset = 'utf8mb4',
        cursorclass = pymysql.cursors.DictCursor,
        )
    return connection

with get_connection() as connection:
    with connection.cursor() as cursor:
        objects = bucket.objects.filter(Prefix='sql')
        for obj in objects:
            if os.path.splitext(obj.key)[-1] == '.sql':
                query = obj.get()['Body'].read().decode()
                cursor.execute(query)
                rows = cursor.fetchall()
                file_name = f'{os.path.splitext(os.path.basename(obj.key))[0]}'

                with open(f'./results/{file_name}.csv', 'w') as f:
                    for row in rows:
                        csv = ''
                        for value in row.values():
                            csv = csv + str(value) + ','
                        f.write(csv[:-1] + '\n')

                bucket.upload_file(
                    f'./results/{file_name}.csv',
                    f"{s3_prefix}{file_name}-{time}.csv"
                    )
            else:
                continue

おわりに

本記事では、実行結果を S3 に保存する方法をまとめる方法と、実行する SQL を S3 から読み込む方法をまとめます。
この記事がどなたかの参考になれば幸いです。

参考

コメント