・キーワード(都道府県名)で元データから抽出し、キーワード毎に集計を行い、その結果を出力テーブルにINSERTする ・キーワード(東京、埼玉、千葉)毎に上記を繰り返し実行する |
◯IMAロール 【GlueのIAMロールを生成】 ・AWS > GlueのIAMロールを生成 と同様 ・ロール名:AWSGlueServiceRoleDefaultCM ◯S3 【テストデータの用意】 1. S3 > バケットを作成 の順にクリック。以下を設定して [バケットを作成] ・バケット名:demo-sagemaker-testdatas ・AWS リージョン:ap-northeast-1 ・他は全てデフォルト 2. バケット「demo-sagemaker-testdatas」にテストデータを格納すえる ・s3://demo-sagemaker-testdatas/profile_data/ ・s3://demo-sagemaker-testdatas/pref_tokyo_capital/ ・s3://demo-sagemaker-testdatas/pref_chiba_local/ ・s3://demo-sagemaker-testdatas/pref_saitama_local/ |
◯Glue 【Glueクローラの追加】 1. Glue > 左側メニューのチュートリアル下のクローラの追加 >クローラの追加 の順にクリック 2. 以下を設定して [次へ] ・クローラの名前:demo-sagemaker 3. 以下を設定して [次へ] ・Crawler source type:Data stores(デフォルト) ・Repeat crawls of S3 data stores:Crawl all folders(デフォルト) 4. 以下を設定して [次へ] ・データストアの選択:S3 ・接続:未選択 ・クロールするデータの場所:自分のアカウントで指定されたパス(デフォルト) ・インクルードパス:s3://demo-sagemaker-testdatas/profile_data/ ※配下のファイル全てにするため末尾に/を付与 5. 以下を設定して [次へ] ・別のデータストアの追加:”はい”を選択すると上記「4. 」へ戻るので、上記【データの用意】>「2. 」で用意した全てを設定する 6. 以下を設定して [次へ] ・IAM ロール:既存の IAM ロールを選択して「AWSGlueServiceRoleDefaultCM」を設定 7. 以下を設定して [次へ] ・頻度:オンデマンドで実行 8. [データベースの追加]をクリックして以下を設定して [次へ] ・データベース名:demo_sagemaker_db 9. [完了] 10. クローラ一覧でdemo-sagemakerを選択して[クローラの実行]をクリック 11. クローラ完了後、 左側メニューのチュートリアル下のテーブルの確認をクリックして、一覧に全テーブルが作成されている事を確認 12. AthenaでDatabaseに「demo_sagemaker_db」があること、Tablesに全テーブルがある事を確認
Glueクローラの追加でハマった事 テストデータには次の仕様が必要らしい ・1行目にカラム名を入れる ・1カラムだけではエラーになる |
◯Athena 【OUTPUTテーブルのCREATE TABLE】 例 rescount_tokyoの場合 CREATE EXTERNAL TABLE IF NOT EXISTS demo_sagemaker_db.rescount_tokyo (
pref string,
cnt_people bigint,
cnt_days bigint
)
PARTITIONED BY (yyyymmdd string)
|
◯SageMaker 【ノートブックインスタンスの作成】 ・AWS > ノートブックインスタンスの作成と同様 ・ノートブックインスタンス名:demo-sagemaker ・新しいIAMロール:AmazonSageMaker-ExecutionRole-20220212T143930 |
◯セキュリティ認証情報 【アクセスキーの作成】 1. 右上のアカウント > マイセキュリティ資格情報 > アクセスキー > 新しいアクセスキーの作成 の順にクリック 2. アクセスキー IDとシークレットアクセスキーをエディターにコピペ ◯SageMaker 【JupyterでPython3の起動】 1. 一覧から作成したインスタンスを選択 2. アクション > Jupyterを開く > ブラウザの別ウィンドウでノートブックのインターフェイスが開く 3. 右上の[NEW] から [Terminal] を選択 4. aws認証情報の更新を行う $ aws configure
AWS Access Key ID [None]: *************************
AWS Secret Access Key [None]: **********************************
Default region name [ap-northeast-1]: ap-northeast-1
Default output format [None]: json
5. 右上の[NEW] からAnacondaのPython3系環境 [conda_python3]を選択 6. 左上のJupyterの右側の「Untitled」をクリックして以下を設定して[Rename] ・Rename Notebook:demo_insert_table |
【Python3を実行】 ・後述のソースを入力して[Run]、または [Kernel ] > [Restart and Run ALL]
Python3の実行でハマった事 ・queries = queries_str.split(';')[0:-1] 最初は[-1]としておりリスト取得ができなかった。[0:-1]に変更したらできた ・def exec_queries(queries_str, conn, print_query=True): INSERTを実行すると権限エラーが発生。IAMロールにAmazonAthenaFullAccessとAmazonSageMakerFullAccessを追加しても変わらず。 Terminalでaws configureを実施しら解決した。追加したポリシーは2件とも関係なかった [ error msg ]
【あとかたづけ(アクセスキーの削除)※重要】 1. Terminalで削除 $ rm -rf ~/.aws
2. 右上のアカウント > マイセキュリティ資格情報 > アクセスキー > アクションの削除 の順にクリック |
import pandas as pd
import numpy as np
import pickle
import csv
import re
import os
import boto3
import datetime
import pytz
import s3fs
from datetime import datetime as dt
from datetime import timedelta
from dateutil.relativedelta import relativedelta
!pip install --upgrade pip
!pip install openpyxl
!pip install PyAthena
import pyathena
import itertools
print("===process end===")
#--------------------------------------------------------------------
#全体の実行時間を測る
grand_start_time = dt.now()
print(grand_start_time)
#--------------------------------------------------------------------
#ロールの取得
from sagemaker import get_execution_role
role = get_execution_role()
print(role)
#regionの取得
region = boto3.Session().region_name
print(region)
#athenaに接続
from pyathena import connect
conn = connect(s3_staging_dir='s3://demo-sagemaker-testdatas/athena/result/ ',
region_name=region)
print(conn)
#--------------------------------------------------------------------
#パラメータ設定
#Athena
db_name = 'demo_sagemaker_db'
#S3
bucket_name="demo-sagemaker-testdatas"
#集計対象期間
start_day = '2022-01-01'
end_day = '2022-01-07'
#イベント名(一つづつ指定する)
pref_names = ['tokyo','chiba','saitama']
print("count for event:{}".format(len(pref_names)))
#--------------------------------------------------------------------
#日付のリスト作成
date_list_2 = [date for date in pd.date_range(start_day, end_day, freq='D')]
#月曜と日曜の日付取得
date_list_s = [date for date in date_list_2 if date.isoweekday()==1]
date_list_e = [date + timedelta(days=6) for date in date_list_2 if date.isoweekday()==1]
#代入用リスト作成
start_yyyymm = [date.strftime('%Y%m%d') for date in date_list_s]
end_yyyymm = [date.strftime('%Y%m%d') for date in date_list_e]
print('\nstart_yyyymm:\n',start_yyyymm)
print('----------')
print('end_yyyymm:\n',end_yyyymm)
yyyymmdd = dt.now(pytz.timezone('Asia/Tokyo')).strftime("%Y%m%d")
#--------------------------------------------------------------------
#マッチング条件の制御準備
dic_is_perfect = {}
for pref_name in pref_names:
if pref_name == 'tokyo':
dic_is_perfect[pref_name] = 1
else:
dic_is_perfect[pref_name] = 0
print(dic_is_perfect)
#--------------------------------------------------------------------
#完全一致の検索数集計
def q_into_search_count_perfect(prefecture_table_name, insert_table_name, start_date_number, end_date_number):
q_into_search_count = '''
INSERT INTO {insert_table}
WITH table1 as (
SELECT
pref
FROM
{kw_table}
),
table2 as ( --データ抽出
SELECT
pref,
yyyymmdd
FROM
demo_sagemaker_db.profile_data a
WHERE
yyyymmdd between {start_date} and {end_date} -- 任意の指定期間
AND
pref in (select pref from table1)
)
--対象データ抽出
SELECT
pref,
count(pref) as cnt_people,
count(distinct yyyymmdd) as cnt_days,
'{start_date}' as yyyymmdd
FROM
table2
GROUP BY
pref
;
SELECT
count(*) AS count
FROM
{insert_table}
WHERE
yyyymmdd = '{start_date}'
;
'''.format(
insert_table=insert_table_name,
kw_table=prefecture_table_name,
start_date=start_date_number,
end_date=end_date_number
)
df_result=exec_queries(q_into_search_count, conn)
return df_result[1]
#--------------------------------------------------------------------
#リスト取得
def q_select_profile_data_list(select_table_name, start_date_number, end_date_number):
q_select_profile_data = '''
SELECT
pref,
ROUND( AVG(age), 1 ) AS age
FROM
{select_table}
WHERE
yyyymmdd between {start_date} and {end_date} -- 任意の指定期間
GROUP BY
pref
;
'''.format(
select_table=select_table_name,
start_date=start_date_number,
end_date=end_date_number
)
df_result=exec_queries(q_select_profile_data, conn)
return df_result[0]
#--------------------------------------------------------------------
#複数クエリ実行関数
def exec_queries(queries_str, conn, print_query=True):
queries = queries_str.split(';')[0:-1]
dfs = []
for query in queries:
if print_query:
print('-----\nExec query\n-----\n{}'.format(query))
print('...', end='')
try:
df = pd.read_sql(query, conn)
except Exception as e:
print(e)
else:
dfs.append(df)
print('ok')
return dfs
#--------------------------------------------------------------------
#ログ文字列追加関数
logs = ''
def add_log(new, title='Other'):
global logs
logs += '----------\n-- {}\n----------\n'.format(title)
logs += new
logs += '\n\n\n'
#S3のパス生成(fnを指定しない場合はフォルダーパス)
def get_s3_path(bucket, paths, fn=''):
return 's3://' + bucket + '/' + '/'.join(paths) + '/' +fn
#追加の期間
"""
start_yyyymm += ['20210118']
end_yyyymm += ['20210124']
print("count for event:{}, {}".format(len(pref_names), pref_names))
print("count for start_yyyymm:{}".format(len(start_yyyymm)))
calc_number1=list(range(0,6,1))
"""
#対象イベントの制御
"""
pref_names=['hoge']
print("{}".format(pref_names))
"""
calc_number=list(range(0,1,1))
print(calc_number)
#時間別に全イベントを回す計算
count_list=calc_number
#ループ(テーマ毎)
for pref_name in pref_names:
print("\n==={}: start===".format(pref_name))
logs = ''
#開始時間を測る
strat_time = dt.now(pytz.timezone('Asia/Tokyo'))
print(strat_time)
#パラメータに修正
prefecture_table_name= db_name+'.pref_'+pref_name+'_'+['local' if dic_is_perfect[pref_name]==0 else 'capital'][0]
insert_table_name=db_name+'.rescount_'+pref_name
print("prefecture_table_name: {}".format(prefecture_table_name))
print("insert_table_name : {}".format(inser_table_name))
start_date_number=start_yyyymm[calc_number]
end_date_number=end_yyyymm[calc_number]
print("start_date_number-end_date_number:{}-{}".format(start_date_number, end_date_number))
#関数の実行
sql_row_count = q_into_search_count_perfect(prefecture_table_name, insert_table_name, start_date_number, end_date_number)
#終了時間を測る
end_time = dt.now(pytz.timezone('Asia/Tokyo'))
print(end_time)
print("processing_time:{}".format(end_time-strat_time))
print("count:{}".format(sql_row_count))
#ログの用意
log_str=" start_time={}\n end_time:{}\n processing_time:{}\n row_count:{}".format(strat_time, end_time, end_time-strat_time, sql_row_count)
add_log(log_str, 'Calc for {}:{}'.format(pref_name, start_yyyymm[0]))
#カレントディレクトリー確認
current_path = !pwd
print("current_path:{}".format(current_path))
#ログファイルをjupyter上に書き出す
f = open('./logss/calc_log_{}_{}.txt'.format(pref_name, yyyymmdd), 'a') # w:上書き a:追加
f.write(logs)
f.close()
print("==={}:calc endding===\n".format(pref_name))
# ファイルアウトプットの確認
!ls -l ./logs/
#ログファイルをS3に書き出し
log_path = get_s3_path(bucket_name, ['logs', pref_name], 'logs_{}_{}.txt'.format(pref_name, yyyymmdd))
fs = s3fs.S3FileSystem()
with fs.open(log_path, 'wb') as f:
f.write(logs.encode())
print(logs)
for pref_name in pref_names:
#リスト取得
select_table_name=db_name+'.profile_data'
ql_row_datas = q_select_profile_data_list(select_table_name, start_date_number, end_date_number)
ql_row_datas
#データーフレームをCSV出力
out_path = 'result/'+pref_name
out_name = pref_name+'.csv'
print("{}/{}".format(out_path, out_name))
os.makedirs(out_path, exist_ok=True)
ql_row_datas.to_csv('{}/{}'.format(out_path, out_name))
本日日付をyyyymmddで取得
|
文字列の連結
|
SageMakerのカレントディレクトリーの確認
Teminalからの確認
|
ログファイルをjupyter上に書き出す
Teminalからの確認
|
ログファイルをS3に書き出し
Teminalからの確認
|
S3上のファイルをデーターフレームに格納
|
データーフレームをCSV出力
Teminalからの確認
|
データーフレームをS3へ出力
Teminalからの確認
|
データフレームの生成
|
日単位でループ
20220101 20220102 |
複数日単位でループ
start_date - end_date : 20220101 - 20220103 start_date - end_date : 20220104 - 20220106 start_date - end_date : 20220107 - 20220107 |
月単位でループ
start_date - end_date : 20201101 - 20201130 start_date - end_date : 20201201 - 20201231 start_date - end_date : 20210101 - 20210131 start_date - end_date : 20210201 - 20210228 |