本文介绍一种,从文本格式的审计日志,通过替换自定义字段和行分隔符,加载到GBase 8a数据库的方法。
目录导航
参考
GBase 8a 集群审计日志audit_log攻略和使用方法
审计日志输出格式
默认log_output='FILE'
日志文件
gcluster/log/gcluster/gclusterd-audit.log
日志样例
# Threadid=5;
# Taskid=80;
# Time: 211025 13:28:58
# End_time: 211025 13:29:00
# User@Host: root[root] @ localhost []
# UID: 1
# Query_time: 2.002046 Rows: 1
# Tables: WRITE: ; READ: ; OTHER: ; ;
# SET timestamp=1635139738;
# Sql_text: select sleep(2);
# Sql_type: DQL;
# Sql_command: SELECT;
# Status: SUCCESS;
# Connect Type: CAPI;
SHELL 处理方案
处理过程
4.1、将所有的#开头的字段部分,替换成字段分隔符。 样例是^GBASE^
4.2、将最后一个字段 # Connect Type: 最后增加行分隔符,样例是 ^GBASE_GBASE^
4.3 、sed 代码
cat gclusterd-audit.log |sed 's/# Threadid=([0-9]+);/\1/g' |sed 's/# Taskid=([0-9]+);/^GBASE^\1/g' |sed 's/# Time: (.+)/^GBASE^\1/g' |sed 's/# End_time: (.+)/^GBASE^\1/g' |sed 's/# User@Host: (.)/^GBASE^\1/g' |sed 's/# UID: ([0-9]+)/^GBASE^\1/g' |sed 's/# Query_time: (.) Rows: ([0-9]+)/^GBASE^\1\n^GBASE^\2/g' |sed 's/# Tables: (.);/^GBASE^\1/g' |sed 's/# SET timestamp=([0-9]);//g' |sed 's/# # administrator command: ([0-9]);//g' |sed 's/# Sql_text:(.)/^GBASE^\1/g' |sed 's/# Sql_type:(.);/^GBASE^\1/g' |sed 's/# Sql_command: (.)/^GBASE^\1/g' |sed 's/# Status: (.)/^GBASE^\1/g' |sed 's/# Connect Type: (.);/^GBASE^\1^GBASE_GBASE^/g' > al.log
表
字段名可根据情况,做修改,注意其中的Query_time ,是decimal小数类型
表名字可以随意修改,后面加载的表名注意对应上就行。
create table al(
Threadid bigint,
Taskid bigint,
Time datetime,
End_time datetime,
User_Host varchar(1000),
UID bigint,
Query_time decimal(16,6),
Rows bigint,
Tables_list varchar(8000),
Sql_text varchar(8000),
Sql_command varchar(100),
Sql_type varchar(100),
Status varchar(100),
conn_type varchar(100)
);
加载SQL
load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/al.log' into table al fields terminated by '^GBASE^' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '^GBASE_GBASE^' DATETIME FORMAT '%y%m%d %H:%i:%s';s
Query OK, 7 rows affected (Elapsed: 00:00:01.72)
完整脚本
注意里面infile数据源的部分,IP,操作系统用户名和密码,路径等,需要根据实际情况修改脚本。
[gbase@rh6-1 audit_log]$ cat auditlog_load.sh
logfile=$1
username=$2
password=$3
dbname=$4
tablename=$5
cat $logfile |sed 's/# Threadid=\([0-9]\+\);/\1/g' |sed 's/# Taskid=\([0-9]\+\);/^GBASE^\1/g' |sed 's/# Time: \(.\+\)/^GBASE^\1/g' |sed 's/# End_time: \(.\+\)/^GBASE^\1/g' |sed 's/# User@Host: \(.*\)/^GBASE^\1/g' |sed 's/# UID: \([0-9]\+\)/^GBASE^\1/g' |sed 's/# Query_time: \(.*\) Rows: \([0-9]\+\)/^GBASE^\1\n^GBASE^\2/g' |sed 's/# Tables: \(.*\);/^GBASE^\1/g' |sed 's/# SET timestamp=\([0-9]*\);//g' |sed 's/# # administrator command: \([0-9]*\);//g' |sed 's/# Sql_text:\(.*\)/^GBASE^\1/g' |sed 's/# Sql_type:\(.*\);/^GBASE^\1/g' |sed 's/# Sql_command: \(.*\)/^GBASE^\1/g' |sed 's/# Status: \(.*\)/^GBASE^\1/g' |sed 's/# Connect Type: \(.*\);/^GBASE^\1^GBASE_GBASE^/g' > tmp_auditlog_load.log
gccli -vvv -u${username} -p${password} -e"load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/tmp_auditlog_load.log' into table ${dbname}.${tablename} fields terminated by '^GBASE^' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '^GBASE_GBASE^' DATETIME FORMAT '%y%m%d %H:%i:%s'"
运行样例
[gbase@rh6-1 audit_log]$ sh auditlog_load.sh gclusterd-audit.log gbase gbase20110531 al al
--------------
load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/tmp_auditlog_load.log' into table al.al fields terminated by '^GBASE^' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '^GBASE_GBASE^' DATETIME FORMAT '%y%m%d %H:%i:%s'
--------------
Query OK, 7 rows affected (Elapsed: 00:00:01.73)
Task 73 finished, Loaded 7 records, Skipped 28 records
Bye
注意事项和限制
所有从客户端发来的SQL,都能被加载,而数据库内部互相发的SQL, 从原始数据看,没有TIME字段,只有END_TIME
比如
# Threadid=52;
# Taskid=0;
# End_time: 211025 13:00:01
# User@Host: root[root] @ [10.0.2.201]
# UID: 1
# Query_time: 0.000228 Rows: 0
# SET timestamp=1635138001;
# administrator command: Quit;
# Sql_type: OTHERS;
# Sql_command: Quit;
# Status: SUCCESS;
# Connect Type: CAPI;
如果想这类内部SQL也加载,只能
1、牺牲掉其它SQL的TIME字段,sed时替换成空
2、分2次加载,第一次带TIME字段,其它的为错误数据,第二次不带TIME字段。
3、其它的没加载成功的,就算了吧
gbase> load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/al.log' into table al fields terminated by '$GBASE$' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '$GBASE_GBASE$' DATETIME FORMAT '%y%m%d %H:%i:%s';
Query OK, 7 rows affected (Elapsed: 00:00:01.62)
Task 74 finished, Loaded 7 records, Skipped 28 records
gbase> load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/al.log' into table al fields terminated by '$GBASE$' TABLE_FIELDS 'Threadid,Taskid,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '$GBASE_GBASE$' DATETIME FORMAT '%y%m%d %H:%i:%s';
Query OK, 18 rows affected (Elapsed: 00:00:01.39)
Task 75 finished, Loaded 18 records, Skipped 17 records
Python处理方案(建议)
脚本
其中日志文件名写死了,请自行修改。 原理就是按照\n# Threadid=分割成多条记录,然后在一个记录内,再分割,并逐个字段的扫描匹配。 继续用自定义分隔符做字段和行分隔符。
注意,相比shell脚本的版本,
- 对日期字段已经做了格式化处理
- 开始时间会选择time字段和timestamp字段中的一个
- 分割符中,确实发现了部分数据由于\x00, 内部做了replace处理
[root@rh6-1 gbase]# cat auditlog.py
from datetime import datetime
fo = open('gclusterd-audit.log')
str = fo.read()
x=str.split('\n# Threadid=')
num=0
for part in x:
num = num + 1
ps=part.replace('\n\x00#','\n#').split('\n# ')
threadid=ps[0].strip(';')
taskid=0
starttime=''
timestamp=''
endtime=''
user=''
host=''
uid=0
querytime=0
rows=0
sqltext=''
status=''
sqltype=''
sqlcommand=''
contype=''
for index in range(1,len(ps)):
if ps[index].startswith('Taskid='):
taskid=ps[index].split('=')[1].strip(';')
elif ps[index].startswith('Time: '):
starttime=datetime.strptime(ps[index].split(':',1)[1].strip(),'%y%m%d %H:%M:%S')
elif ps[index].startswith('SET timestamp='):
timestamp=datetime.fromtimestamp(int(ps[index].split('=')[1].strip(';')))
elif ps[index].startswith('End_time:'):
endtime=datetime.strptime(ps[index].split(':',1)[1].strip(),'%y%m%d %H:%M:%S')
elif ps[index].startswith('User@Host'):
userhost=ps[index].split(': ',1)[1].strip().split('@')
user=userhost[0].strip()
host=userhost[1].strip()
elif ps[index].startswith('UID:'):
uid=ps[index].split(': ',1)[1].strip()
elif ps[index].startswith('Query_time'):
qt=ps[index].split(': ',1)[1].split('Rows:')
querytime=qt[0].strip()
rows=qt[1].strip()
elif ps[index].startswith('Sql_text:'):
sqltext=ps[index].split(': ',1)[1].strip(';')
elif ps[index].startswith('Status:'):
status=ps[index].split(': ',1)[1].strip(';')
elif ps[index].startswith('Sql_type:'):
sqltype=ps[index].split(': ',1)[1].strip(';')
elif ps[index].startswith('Sql_command:'):
sqlcommand=ps[index].split(': ',1)[1].strip(';')
elif ps[index].startswith('Connect Type:'):
contype=ps[index].split(': ',1)[1].strip(';\n')
# print("\n------------------------%s------\nthreadid=%s,taskid=%s,starttime=%s,timestamp=%s,userhost=%s,uid=%s,querytime=%s,rows=%s,sqltext=%s,status=%s,sqltype=%s,sqlcommand=%s,contype=%s#" %(num,threadid,taskid,starttime,timestamp,userhost,uid,querytime,rows,sqltext,status,sqltype,sqlcommand,contype))
if starttime=='':
starttime=timestamp
print('%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE_GBASE^' %(threadid,taskid,starttime,endtime,user,host,uid,querytime,rows,sqltext,status,sqltype,sqlcommand,contype))
fo.close()
建表语句
create table auditlog_load(
thread_id bigint,
taskid bigint,
start_time datetime,
end_time datetime,
user varchar(200),
host varchar(200),
uid bigint,
querytime decimal(16,6),
rows bigint,
sql_text text,
status varchar(20) ,
sql_type varchar(20),
sql_command text,
conn_type varchar(100)
);
加载方式
load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/audit_toload.txt' into table auditlog_load fields terminated by '^GBASE^' lines terminated by '^GBASE_GBASE^\n' DATETIME FORMAT '%Y-%m-%d %H:%i:%s';