現場で、大量のnginxやinfo・Errorログを集計したり検索することがあります。
件数は数万件~数百万件くらいです。
が。
- ログ集計を行う環境にDBやら集計用のツールが入っていない
- 追加もできない
という制限があるため、Linuxに標準で用意されているもので何とかするしかありません。
AWSなのでAthenaが使えればいいのですが、他社管理のサーバーなので、それもできず。。。
仕方なくShellで検索するわけですが、単純に1ファイル内から1レコードを検索×100件やるとすると、100回ファイルを読み込むことになる(しかもファイルサイズがでかいので時間がかかる)ので、本当に非効率です。
DBが使えれば、もっと早く必要な情報が取得できるのに。。。
そんなこんなで数か月、無駄な検索時間をかけて頑張ってきたのですが。
先日pythonだと標準ライブラリにsqlite3が含まれていることを知ったので、ファイルを読み込んでInsert→selectできるようにしてみました。
目次
サンプルファイル
- sqlite.py:sqlite3を扱うための処理
- users.csv:ユーザーデータサンプル
- RentalList.csv.gz:本の貸し出し履歴を書いたつもり(途中で力尽きたので適当なデータ)
sqlite.py
DBの作成
sqlite3は1つのファイルで1DBを管理します。
connectで指定したファイルが存在しなければ新規作成、存在すれば作成済みのデータを読み込みます。
1 2 3 4 5 6 |
import sqlite3 as sq # メイン処理 # dbオブジェクト作成 db = sq.connect('./test.db') c = db.cursor() |
テーブルの作成
テーブルを作成するためのクエリ
対象テーブルが未作成の時だけ作成するよう、「if not exists」を付けます。
1 |
create table if not exists users (userId integer primary key, name text, age integer, gender text) |
これを毎回書いてもいいのですが、書き間違いが減るようにテーブル名とkeyを渡してクエリを自動生成することにしました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# create table文を実行する # テーブルがなければ新規作成する def create_table(database, table_name, key_list): # sqlの作成 key1 = '' for k, v in key_list.items(): key1 = key1 + k + ' ' + v + ', ' else: key1 = '(' + key1.strip(', ') + ')' sql = 'create table if not exists ' + table_name + ' ' + key1 print('------------------------------------------------') print(sql) # sql実行 try: database.execute(sql) print('テーブルを作成しました:' + table_name) except: print('エラーが発生したため、テーブル作成をスキップしました:' + table_name) print('------------------------------------------------') finally: database.commit() print('------------------------------------------------') # usersテーブル作成・読み込み create_keys = { 'userId': 'integer primary key', 'name': 'text', 'age': 'integer', 'gender': 'text' } create_table(db, 'users', create_keys) |
ファイルからデータを読み込む
ファイルからデータをインポートします。
クエリは、?とタプルを渡す方法もありますが、辞書化したほうがよさそうだったので以下のようになりました。
1 |
insert into users (userId, name, age, gender) values (:userId, :name, :age, :gender) |
このクエリも、関数の中で作成するようにします。
対象のファイルは.logと.gzがあるので、gzファイルを未解凍のまま読み込めるように処理を分けます。
非圧縮のサンプルファイルは.csvにしていますが、.logでも.txtでもOKです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# Insert文を実行する # エラーが発生した場合は処理をスキップする def insert_data(database, file_name, table_name, key_list, delimiter_str=','): # sqlの作成 key1 = '' key2 = '' for key in key_list: key1 = key1 + key + ', ' key2 = key2 + ':' + key + ', ' else: key1 = '(' + key1.strip(', ') + ')' key2 = '(' + key2.strip(', ') + ')' sql = 'insert into ' + table_name + ' ' + key1 + ' values ' + key2 sql = sql.replace('TABLE', table_name) # 読み込むファイルの拡張子別に処理を行う extension = file_name.split('.') extension = extension[len(extension) - 1] if extension == 'gz': f = gzip.open(file_name, 'rt', encoding='UTF-8') else: f = open(file_name, 'r', encoding='UTF-8') # delimiter_strでファイルを分割して読み込む reader = csv.reader(f, delimiter=delimiter_str) print('------------------------------------------------') print(sql) # 1行ずつ処理 for row in reader: # 読み込んだデータを辞書化する d = dict(zip(key_list, row)) # sql実行 try: database.execute(sql, d) except: print('エラーが発生したためスキップしました:' + str(row)) else: database.commit() f.close() print('データをInsertしました') print('------------------------------------------------') # CSV読み込み&区切り文字の指定なし insert_keys = ( 'userId', 'name', 'age', 'gender' ) insert_data(db, 'users.csv', 'users', insert_keys) # gzファイル読み込み&タブ区切り insert_keys = ( 'userId', 'returnDate', 'title', 'author' ) insert_data(db, 'RentalList.csv.gz', 'RentalList', insert_keys, '\t') |
RentalListはprimary keyがオートインクリメント以外設定していないので、実行した回数分データが追加されます。
select文で情報を取得する
selectのクエリは関数化すると自由度が下がってしまうので、テキストで直接渡すようにしました。
Table CreateとInsertはDBを渡していましたが、selectの場合はcursorを渡します。
(cursorが何者なのかはよくわかってません。。。)
tableを2つ作成したので、JOINも試してみます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# select文を実行する def select_data(cursor, sql): print('------------------------------------------------') print(sql) for row in cursor.execute(sql): print(row) else: print('------------------------------------------------') select_where_sql = 'select * from users where gender = "male"' select_data(c, select_where_sql) select_where_sql = 'select users.userId, users.name, RentalList.returnDate, RentalList.title from users inner join RentalList on users.userId = RentalList.userId' select_data(c, select_where_sql) |
実行結果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
------------------------------------------------ select * from users where gender = "male" (1, 'Taro', 20, 'male') (2, 'Shota', 54, 'male') (4, 'Masaki', 78, 'male') (6, 'Kenta', 31, 'male') (9, 'Ryoichi', 64, 'male') (10, 'Shota', 10, 'male') ------------------------------------------------ ------------------------------------------------ select users.userId, users.name, RentalList.returnDate, RentalList.title from users inner join RentalList on users.userId = RentalList.userId (1, 'Taro', '2018/01/01', '英単語の語源図鑑') (1, 'Taro', '2018/02/01', 'わけあって絶滅しました。\u3000世界一おもしろい絶滅したいきもの図鑑') (2, 'Shota', '2018/03/11', 'ぼくらの七日間戦争') (5, 'Saki', '2018/04/01', 'syunkonカフェごはんレンジで絶品レシピ\u3000(e-mook)') (5, 'Saki', '2018/05/01', '医者いらずの食材使いこなしレシピ ~トップバリュおすすめ~\u3000(タツミムック)') (5, 'Saki', '2018/06/01', '信長はなぜ葬られたのか\u3000世界史の中の本能寺の変') (3, 'Nana', '2018/07/01', 'おしりたんてい\u3000みはらしそうの\u3000かいじけん') (8, 'Anna', '2018/08/01', '緊急出版!枝野幸男、魂の3時間大演説\u3000安倍政権が不信任に足る7つの理由') (10, 'Shota', '2018/09/01', 'ツバキ文具店の鎌倉案内') (1, 'Taro', '2018/10/01', '人生は、運よりも実力よりも「勘違いさせる力」で決まっている') (10, 'Shota', '2018/11/01', '清原和博 告白') (6, 'Kenta', '2018/12/01', '極上の孤独') (2, 'Shota', '2018/02/10', 'ざんねんないきもの事典\u3000おもしろい!進化のふしぎ') (4, 'Masaki', '2018/03/20', 'ポイズンドーター・ホーリーマザー') ------------------------------------------------ |
最後にDBを閉じる
最後にDBを閉じておしまい。
withを使うとcloseの書き忘れを防げる、とのことでしたが、今回はopenの処理を分けるために使いませんでした。
1 2 3 4 |
db.close() # ファイルOPNE時にwithを使うとcloseを書かなくていい with open('test.txt','r') as f: |
sqliteはどれくらいのデータ数まで耐えられるの?
処理はできたわけですが、今回必要としている「数百万件のデータ管理」に耐えられないと意味がありません。
を見る限り、数億行でも問題なさそうですが、レコードの内容やPCのスペックにもよりますし、試してみないとですね。
今度現場で試験してみようと思います。