GBase 8a通过python udf规避ftp,sftp导出文件未完成前就被使用的情况

GBase 8a的导出功能支持ftp,sftp等协议。如果导出内容较多,比如几十GB,则目标文件需要一段时间才能全部生成。而在导出期间如果该文件被使用,则其内容处于不完整状态,会造成数据不一致,格式报错等情况。本文通过python实现ftp和sftp的改名功能,在一定规则下规避这个情况。

实现原理

用户使用文件,一般是根据某个规则扫描匹配的文件名,比如load目录下的csv后缀结尾的文件。所以让尚未完成的文件不符合匹配规则的方法有两种:第一是文件名不符合,第二是文件路径不符合。当然两个都不符合也是可以的。

先导出为一个临时文件

该文件可以根据使用规则

  • 导出不同文件名,特别是后缀名:比如目标是.csv, 则临时文件为.csv.tmp;
  • 导出到一个临时目录里。

导出完成后rename临时名字为正式名字

通过协议的rename功能,将临时文件改变后缀,或者移动目录。

python udf实现 FTP改名代码

如下代码放在gclusterdb下,可以根据具体情况修改。

传入参数为主机,用户名,密码,源名字和目标名字。

本样例未实现端口的修改,如不是默认的ftp端口,可以自行完善代码。

use gclusterdb;
drop function if exists ftpRename;
create function ftpRename(server varchar(100),username varchar(100),password varchar(100),sourceFilename varchar(1000),targetFilename varchar(1000))
returns varchar
$$
from ftplib import FTP
ftp = FTP(server)
ftp.login(username, password)
ftp.rename(sourceFilename,targetFilename)
ftp.close()
return 'OK'

$$ language plpythonu;

调用方法


gbase> select ftprename('10.0.2.201','gbase','gbase1234','/home/gbase/t1.csv.tmp','/home/gbase/t1.csv');
+------------------------------------------------------------------------------------------------+
| ftprename('10.0.2.201','gbase','gbase1234','/home/gbase/t1..scv.tmp','/home/gbase/t1.csv') |
+------------------------------------------------------------------------------------------------+
| OK                                                                                             |
+------------------------------------------------------------------------------------------------+
1 row in set (Elapsed: 00:00:00.08)

python udf实现 SFTP改名代码

默认的python库不支持sftp, 本例用了pysftp库。 可以通过pip install pysftp安装。

其它介绍与FTP一样。

use gclusterdb;
drop function if exists sftpRename;
create function sftpRename(server varchar(100),v_username varchar(100),v_password varchar(100),sourceFilename varchar(1000),targetFilename varchar(1000))
returns varchar
$$
import pysftp
with pysftp.Connection(server, username=v_username, password=v_password) as sftp:
    sftp.rename(sourceFilename,targetFilename)
    sftp.close()
return 'OK'

$$ language plpythonu;

使用样例

gbase> select sftprename('10.0.2.201','gbase','gbase1234','/home/gbase/t1.csv.tmp','/home/gbase/t1.csv');
+------------------------------------------------------------------------------------------------+
| sftprename('10.0.2.201','gbase','gbase1234','/home/gbase/t1..scv.tmp','/home/gbase/t1.csv') |
+------------------------------------------------------------------------------------------------+
| OK                                                                                             |
+------------------------------------------------------------------------------------------------+
1 row in set (Elapsed: 00:00:00.08)

总结

为了避免导出文件过程中被意外使用,可以用如下2个步骤的方案来做

gbase> select * from t1 into outfile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/t1.csv.tmp' fields terminated by ',';
Query OK, 3 rows affected (Elapsed: 00:00:00.32)

gbase> select gclusterdb.sftprename('10.0.2.201','gbase','gbase1234','/home/gbase/t1.csv.tmp','/home/gbase/tmp/t1.csv');
+-----------------------------------------------------------------------------------------------------------+
| gclusterdb.sftprename('10.0.2.201','gbase','gbase1234','/home/gbase/t1.csv.tmp','/home/gbase/tmp/t1.csv') |
+-----------------------------------------------------------------------------------------------------------+
| OK                                                                                                        |
+-----------------------------------------------------------------------------------------------------------+
1 row in set (Elapsed: 00:00:00.43)

其中的临时文件,可以跨目录。前提是有写入权限。 如下例子使用了临时目录tmp 并且名字也带了tmp后缀。

gbase> select * from t1 into outfile 'sftp://gbase:gbase1234@10.0.2.201/home/gbase/tmp/t1_2.csv.tmp' fields terminated by ',';
Query OK, 3 rows affected (Elapsed: 00:00:00.44)

gbase> select gclusterdb.sftprename('10.0.2.201','gbase','gbase1234','/home/gbase/tmp/t1_2.csv.tmp','/home/gbase/t1_2.csv');
+---------------------------------------------------------------------------------------------------------------+
| gclusterdb.sftprename('10.0.2.201','gbase','gbase1234','/home/gbase/tmp/t1_2.csv.tmp','/home/gbase/t1_2.csv') |
+---------------------------------------------------------------------------------------------------------------+
| OK                                                                                                            |
+---------------------------------------------------------------------------------------------------------------+
1 row in set (Elapsed: 00:00:00.44)