GBase 8a 数据库集群从862Build43开始,支持python的UDF自定义函数。本文通过python功能在命令行调用gccli正在执行SQL语句,包括某些不允许在存储过程中执行的。
目录导航
python自定义函数
drop function if exists executePythonSQL;
create function executePythonSQL(sql_str varchar(8000))
returns varchar
$$
try:
import commands
output = commands.getoutput("gccli -ugbase -pgbase20110531 -N -vvv -e\\\"%s\\\"" %sql_str)
except:
return None
return str(output)
$$ language plpythonu;
执行效果
gbase> select executePythonSQL('load data infile ''sftp://gbase:gbase1234@10.0.2.101/home/gbase/t.txt'' into table testdb.t1 fields terminated by '',''') a;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| a |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| --------------
load data infile 'sftp://gbase:gbase1234@10.0.2.101/home/gbase/t.txt' into table testdb.t1 fields terminated by ','
--------------
Query OK, 0 rows affected (Elapsed: 00:00:01.29)
Task 6154 finished, Loaded 0 records, Skipped 1 records
Bye |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (Elapsed: 00:00:01.32)
gbase> select executePythonSQL('select now()');
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| executePythonSQL('select now()') |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| --------------
select now()
--------------
+---------------------+
| 2020-11-25 10:25:17 |
+---------------------+
1 row in set (Elapsed: 00:00:00.00)
Bye |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (Elapsed: 00:00:00.04)
gbase>
审计日志归档的应用样例
drop function if exists executePythonSQL;
create function executePythonSQL(sql_str varchar(8000))
returns varchar
$$
try:
import commands
output = commands.getoutput("gccli -ugbase -pgbase20110531 -N -vvv -e\\\"%s\\\"" %sql_str)
except:
return None
return str(output)
$$ language plpythonu;
--
-- 此功能一定要确保每个节点,包括管理和数据,主机名不同,并全部配置了DNS,能按照主机名访问,否则加载会报错找不到服务器或文件
--
drop procedure if exists audit_log_archive;
delimiter //
create procedure audit_log_archive()
begin
select
@@hostname as hostname,
thread_id,
taskid ,
start_time,
uid,
user,
host_ip,
query_time,
rows,
substr(table_list, 0, 4096),
substr(sql_text, 0, 8191), -- utf8mb4 charset
sql_type, sql_command,
operators,
status,
conn_type
from gbase.audit_log_bak1
into outfile '/home/gbase/audit_log.txt' -- 文件路径选择一个合适的
fields terminated by '\xFF\xFE\xFD' -- 自定义列分割符,多个不可见字符
lines terminated by '\xFF\xFD\xFE' -- 用户定义的行分割符,多个不可见字符
writemode by overwrites; -- 覆盖现有的
-- SQL语句里的最外面的单引号,用两个单引号转义
-- SQL语句里的右斜杠\, 用两个右斜杠\\代替
select executePythonSQL(concat('load data infile ''sftp://gbase:gbase1234@',
@@hostname, -- 此处用主机名,一定要确保每个节点主机名不同,并配置了DNS
'/home/gbase/audit_log.txt'' into table gclusterdb.audit_log_express fields terminated by ''\\xFF\\xFE\\xFD'' lines terminated by ''\\xFF\\xFD\\xFE'' time format ''%H:%i:%s.%f'''));
end //
delimiter ;
drop event if exists import_audit_log;
delimiter //
CREATE EVENT "import_audit_log" ON SCHEDULE EVERY 10 MINUTE ENABLE LOCAL DO
begin
DECLARE insert_sign INT;
DECLARE errno INT;
DECLARE msg TEXT;
DECLARE EXIT handler FOR sqlexception BEGIN
get diagnostics condition 1 errno = gbase_errno,msg = message_text;
CREATE TABLE IF NOT EXISTS import_audit_log_errors(err_time DATETIME, hostname VARCHAR(64), err_no INT, msg_txt VARCHAR(1024)) CHARSET = utf8;
INSERT INTO import_audit_log_errors VALUES (now(),@@hostname,errno,substr(msg, 0, 1024));
END;
drop self table if exists gbase.audit_log_bak1;
drop self table if exists gbase.audit_log_bak2;
CREATE TABLE if not exists audit_log_express (hostname varchar(512) DEFAULT NULL,thread_id int(11) DEFAULT NULL,taskid bigint(20) DEFAULT NULL,start_time datetime DEFAULT NULL,end_time datetime DEFAULT NULL,user_host text,user text,host_ip text,query_time time DEFAULT NULL,rows bigint(20) DEFAULT NULL,db varchar(512) DEFAULT NULL,table_list text,sql_text text,sql_type text,sql_command text,algorithms text,status text,conn_type text) CHARSET = utf8;
SET SELF sql_mode = '';
CREATE SELF TABLE gbase.audit_log_bak2 LIKE gbase.audit_log;
SET SELF sql_mode = DEFAULT;
rename SELF TABLE gbase.audit_log TO gbase.audit_log_bak1,gbase.audit_log_bak2 TO gbase.audit_log;
call audit_log_archive();
DROP SELF TABLE gbase.audit_log_bak1;
end//
delimiter ;