南大通用GBase 8a数据库将审计日志文件加载到数据库的一个方法

本文介绍一种,从文本格式的审计日志,通过替换自定义字段和行分隔符,加载到GBase 8a数据库的方法。

参考

GBase 8a 集群审计日志audit_log攻略和使用方法

审计日志输出格式

默认log_output='FILE'

日志文件

gcluster/log/gcluster/gclusterd-audit.log

日志样例

# Threadid=5;
# Taskid=80;
# Time: 211025 13:28:58
# End_time: 211025 13:29:00
# User@Host: root[root] @ localhost []
# UID: 1
# Query_time: 2.002046 Rows: 1
# Tables: WRITE: ; READ: ; OTHER: ; ;
# SET timestamp=1635139738;
# Sql_text: select sleep(2);
# Sql_type: DQL;
# Sql_command: SELECT;
# Status: SUCCESS;
# Connect Type: CAPI;

SHELL 处理方案

处理过程

4.1、将所有的#开头的字段部分,替换成字段分隔符。 样例是^GBASE^
4.2、将最后一个字段 # Connect Type: 最后增加行分隔符,样例是 ^GBASE_GBASE^
4.3 、sed 代码
cat gclusterd-audit.log |sed 's/# Threadid=([0-9]+);/\1/g' |sed 's/# Taskid=([0-9]+);/^GBASE^\1/g' |sed 's/# Time: (.+)/^GBASE^\1/g' |sed 's/# End_time: (.+)/^GBASE^\1/g' |sed 's/# User@Host: (.)/^GBASE^\1/g' |sed 's/# UID: ([0-9]+)/^GBASE^\1/g' |sed 's/# Query_time: (.) Rows: ([0-9]+)/^GBASE^\1\n^GBASE^\2/g' |sed 's/# Tables: (.);/^GBASE^\1/g' |sed 's/# SET timestamp=([0-9]);//g' |sed 's/# # administrator command: ([0-9]);//g' |sed 's/# Sql_text:(.)/^GBASE^\1/g' |sed 's/# Sql_type:(.);/^GBASE^\1/g' |sed 's/# Sql_command: (.)/^GBASE^\1/g' |sed 's/# Status: (.)/^GBASE^\1/g' |sed 's/# Connect Type: (.);/^GBASE^\1^GBASE_GBASE^/g' > al.log

字段名可根据情况,做修改,注意其中的Query_time ,是decimal小数类型

表名字可以随意修改,后面加载的表名注意对应上就行。

create table al(
Threadid bigint,
Taskid bigint,
Time datetime,
End_time datetime,
User_Host varchar(1000),
UID bigint,
Query_time decimal(16,6),
Rows bigint,
Tables_list varchar(8000),
Sql_text varchar(8000),
Sql_command varchar(100),
Sql_type varchar(100),
Status varchar(100),
conn_type varchar(100)
);

加载SQL

load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/al.log' into table al fields terminated by '^GBASE^' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '^GBASE_GBASE^' DATETIME FORMAT '%y%m%d %H:%i:%s';s
Query OK, 7 rows affected (Elapsed: 00:00:01.72)

完整脚本

注意里面infile数据源的部分,IP,操作系统用户名和密码,路径等,需要根据实际情况修改脚本。

[gbase@rh6-1 audit_log]$ cat auditlog_load.sh
logfile=$1
username=$2
password=$3
dbname=$4
tablename=$5

cat $logfile |sed 's/# Threadid=\([0-9]\+\);/\1/g' |sed 's/# Taskid=\([0-9]\+\);/^GBASE^\1/g' |sed 's/# Time: \(.\+\)/^GBASE^\1/g' |sed 's/# End_time: \(.\+\)/^GBASE^\1/g' |sed 's/# User@Host: \(.*\)/^GBASE^\1/g' |sed 's/# UID: \([0-9]\+\)/^GBASE^\1/g' |sed 's/# Query_time: \(.*\) Rows: \([0-9]\+\)/^GBASE^\1\n^GBASE^\2/g' |sed 's/# Tables: \(.*\);/^GBASE^\1/g' |sed 's/# SET timestamp=\([0-9]*\);//g' |sed 's/# # administrator command: \([0-9]*\);//g' |sed 's/# Sql_text:\(.*\)/^GBASE^\1/g' |sed 's/# Sql_type:\(.*\);/^GBASE^\1/g' |sed 's/# Sql_command: \(.*\)/^GBASE^\1/g' |sed 's/# Status: \(.*\)/^GBASE^\1/g' |sed 's/# Connect Type: \(.*\);/^GBASE^\1^GBASE_GBASE^/g'  > tmp_auditlog_load.log

gccli -vvv  -u${username} -p${password} -e"load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/tmp_auditlog_load.log' into table ${dbname}.${tablename} fields terminated by '^GBASE^' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '^GBASE_GBASE^' DATETIME FORMAT '%y%m%d %H:%i:%s'"

运行样例

[gbase@rh6-1 audit_log]$ sh auditlog_load.sh gclusterd-audit.log gbase gbase20110531 al al
--------------
load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/tmp_auditlog_load.log' into table al.al fields terminated by '^GBASE^' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '^GBASE_GBASE^' DATETIME FORMAT '%y%m%d %H:%i:%s'
--------------

Query OK, 7 rows affected (Elapsed: 00:00:01.73)
Task 73 finished, Loaded 7 records, Skipped 28 records

Bye

注意事项和限制

所有从客户端发来的SQL,都能被加载,而数据库内部互相发的SQL, 从原始数据看,没有TIME字段,只有END_TIME
比如

# Threadid=52;
# Taskid=0;
# End_time: 211025 13:00:01
# User@Host: root[root] @  [10.0.2.201]
# UID: 1
# Query_time: 0.000228 Rows: 0
# SET timestamp=1635138001;
# administrator command: Quit;
# Sql_type: OTHERS;
# Sql_command: Quit;
# Status: SUCCESS;
# Connect Type: CAPI;

如果想这类内部SQL也加载,只能

1、牺牲掉其它SQL的TIME字段,sed时替换成空
2、分2次加载,第一次带TIME字段,其它的为错误数据,第二次不带TIME字段。
3、其它的没加载成功的,就算了吧

gbase> load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/al.log' into table al fields terminated by '$GBASE$' TABLE_FIELDS 'Threadid,Taskid,Time,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '$GBASE_GBASE$' DATETIME FORMAT '%y%m%d %H:%i:%s';
Query OK, 7 rows affected (Elapsed: 00:00:01.62)
Task 74 finished, Loaded 7 records, Skipped 28 records

gbase> load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/zxq/audit_log/al.log' into table al fields terminated by '$GBASE$' TABLE_FIELDS 'Threadid,Taskid,End_time,User_Host,UID,Query_time,Rows,Tables_list,Sql_text,Sql_command,sql_type,Status,conn_type' lines terminated by '$GBASE_GBASE$' DATETIME FORMAT '%y%m%d %H:%i:%s';
Query OK, 18 rows affected (Elapsed: 00:00:01.39)
Task 75 finished, Loaded 18 records, Skipped 17 records

Python处理方案(建议)

脚本

其中日志文件名写死了,请自行修改。 原理就是按照\n# Threadid=分割成多条记录,然后在一个记录内,再分割,并逐个字段的扫描匹配。 继续用自定义分隔符做字段和行分隔符。

注意,相比shell脚本的版本,

  • 对日期字段已经做了格式化处理
  • 开始时间会选择time字段和timestamp字段中的一个
  • 分割符中,确实发现了部分数据由于\x00, 内部做了replace处理
[root@rh6-1 gbase]# cat auditlog.py
from datetime import datetime

fo = open('gclusterd-audit.log')
str = fo.read()
x=str.split('\n# Threadid=')
num=0
for part in x:
  num = num + 1
  ps=part.replace('\n\x00#','\n#').split('\n# ')
  threadid=ps[0].strip(';')
  taskid=0
  starttime=''
  timestamp=''
  endtime=''
  user=''
  host=''
  uid=0
  querytime=0
  rows=0
  sqltext=''
  status=''
  sqltype=''
  sqlcommand=''
  contype=''
  for index in range(1,len(ps)):
    if ps[index].startswith('Taskid='):
      taskid=ps[index].split('=')[1].strip(';')
    elif ps[index].startswith('Time: '):
      starttime=datetime.strptime(ps[index].split(':',1)[1].strip(),'%y%m%d %H:%M:%S')
    elif ps[index].startswith('SET timestamp='):
      timestamp=datetime.fromtimestamp(int(ps[index].split('=')[1].strip(';')))
    elif ps[index].startswith('End_time:'):
      endtime=datetime.strptime(ps[index].split(':',1)[1].strip(),'%y%m%d %H:%M:%S')
    elif ps[index].startswith('User@Host'):
      userhost=ps[index].split(': ',1)[1].strip().split('@')
      user=userhost[0].strip()
      host=userhost[1].strip()
    elif ps[index].startswith('UID:'):
      uid=ps[index].split(': ',1)[1].strip()
    elif ps[index].startswith('Query_time'):
      qt=ps[index].split(': ',1)[1].split('Rows:')
      querytime=qt[0].strip()
      rows=qt[1].strip()
    elif ps[index].startswith('Sql_text:'):
      sqltext=ps[index].split(': ',1)[1].strip(';')
    elif ps[index].startswith('Status:'):
      status=ps[index].split(': ',1)[1].strip(';')
    elif ps[index].startswith('Sql_type:'):
      sqltype=ps[index].split(': ',1)[1].strip(';')
    elif ps[index].startswith('Sql_command:'):
      sqlcommand=ps[index].split(': ',1)[1].strip(';')
    elif ps[index].startswith('Connect Type:'):
      contype=ps[index].split(': ',1)[1].strip(';\n')
#  print("\n------------------------%s------\nthreadid=%s,taskid=%s,starttime=%s,timestamp=%s,userhost=%s,uid=%s,querytime=%s,rows=%s,sqltext=%s,status=%s,sqltype=%s,sqlcommand=%s,contype=%s#" %(num,threadid,taskid,starttime,timestamp,userhost,uid,querytime,rows,sqltext,status,sqltype,sqlcommand,contype))
  if starttime=='':
    starttime=timestamp
  print('%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE^%s^GBASE_GBASE^' %(threadid,taskid,starttime,endtime,user,host,uid,querytime,rows,sqltext,status,sqltype,sqlcommand,contype))

fo.close()

建表语句

create table auditlog_load(
thread_id bigint,
taskid bigint,
start_time datetime,
end_time datetime,
user varchar(200),
host varchar(200),
uid bigint,
querytime decimal(16,6),
rows bigint,
sql_text text,
status varchar(20) ,
sql_type varchar(20),
sql_command text,
conn_type varchar(100)
);

加载方式

load data infile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/audit_toload.txt' into table auditlog_load fields terminated by '^GBASE^' lines terminated by '^GBASE_GBASE^\n' DATETIME FORMAT '%Y-%m-%d %H:%i:%s';