请注意:2.0版本与1.x版本的代理服务和配置,并不兼容!!!
一、部署说明
具体请以附件安装包的说明文档为准!
二、功能特性
2.1 特性:
- 支持SqlServer2008使用offset情况下的分页
2.0 新增特性如下:
- 数据可传输类似单引号等特殊字符而无需特殊处理
- 不使用offset查询,支持多主键,且可走索引
- 不使用offset查询,支持ns,ms,s等不同精度的查询,且可走索引
- 支持类似如每日轮询一次、每次按日或全量查询
- 支持自定义函数安全使用DB中间件查询数据
- 支持作废
- 支持查询作废数据
三、配置说明
为了支持上述的特性,新版配置修改较大,前端配置的交互需要待后续改造。
API配置
配置入口:连接器-连接对象-API配置-SQL配置
配置内容:对每个对象(主+从),统一以
YAML
格式,放到批量查询框内,且需要以#YAML
开头表示(前面不能有空格)。所有的配置都放到这一个框内。sql执行逻辑说明:通过
${}
在sql中标识变量,对不同接口,集成平台会根据需要传输不同的变量SQL配置内容说明
querysql:
单项。查询的sql,
${__where}
会根据场景替换成不同的where条件。(仅主对象会替换,从对象,需要使用关联主对象的字段查询)batchWhere:
多项。批量查询时的条件,一般需要增加时间范围条件,参数包含时间、上次id等。
idWhere:
单项。根据id查询时的sql,参数包含id。
insertSql:
单项。新增时执行的sql,参数包含数据的所有字段。
updateSql:
单项。更新时执行的sql,参数包含数据的所有字段。
queryIdSql:
单项。新增或更新后,查询id字段的sql。当存在自增主键时,该配置可以为空。
invalidSql:
单项。单条作废时执行的sql,参数包含数据id。
invalidRelationSqls:
多项。作废时,关联作废的sql,用于主对象作废时同时作废明细数据。
完整示例如下:
主表:
#YAML
# 当时间精度为ms
querySql: select * from test2 where ${__where}
batchWhere:
- date = ${startTime__d} and id > ${lastMaxId} order by date,id # 当lastMaxId为空时,不会执行
- date > ${startTime__d} and date <= ${endTime__d} order by date,id
idWhere:
- id = ${dataId}::INTEGER
insertSql: |
INSERT INTO test2 ("name", "date") VALUES(${name}, ${date__d});
updateSql: |
UPDATE test2 SET name=${name}, date=${date__d} WHERE id=${id}::INTEGER;
invalidSql: |
UPDATE test2 SET is_deleted=true WHERE id=${id}::INTEGER
invalidRelationSqls:|
UPDATE test2-1 SET is_deleted=true WHERE pid=${id}::INTEGER
子表:(假设子表关联主表的字段为parent_id)
#YAML
querySql: select * from test2-d1 where parent_id = ${id}
idWhere: |
id = ${dataId}
分接口配置说明
变量类型说明:
日期类型:需要在字段key后增加__d,代理服务识别到日期时间类型后,会转换为日期时间类型传参到jdbc
id字段类型:包括批量查询的lastMaxId和通过id查询的dataId字段等,会在增加分隔符分隔的数组结果。
__dot
为.分隔;__dashed
为-
分隔
;__underline
为_分隔 批量查询修改:
参数
key | 说明 | 示例值 |
startTime | 开始时间,long类型,ms时间戳,加__d为date类型 | |
endTime | 结束时间,long类型,ms时间戳,加__d为date类型 | |
plus1StartTime | 开始时间+1ms,long类型,ms时间戳,加__d为date类型 | |
plus1EndTime | 结束时间+1ms,long类型,ms时间戳,加__d为date类型 | |
limit | 限制值,不需要在sql填,中间件会自动增加 | |
offset | 无offset轮询始终为0,不需要在sql填,中间件会自动增加 | |
objAPIName | 对象apiName | |
lastMaxId | 上次最大id | |
主表查询结果 | 子表查询中,${__where}不会替换为批量条件,但是可对每条主表的查询结果的字段值作为变量,执行一次查询(无法分页) |
通过id查询:
key | 说明 | 示例值 |
dataId | 数据id | |
dataId__dot | id以.分割列表 | |
dataId__underline | id以_分割列表 | |
dataId__dashed | id以-分割列表 |
新增或修改:
_id | id字段 | |
其他 | 对象的数据 |
分场景的sql配置示例
索引:以下语句,索引根据order by的顺序建符合索引即可!
不使用offset,范围查询 id+日期时间(精度高于ms,如ns)
#YAML
# 当时间精度<ms
querySql: select * from test1 where ${__where}
batchWhere:
# 当精度高于ms时,查询上次查询结果的,最后1ms的数据,
- date >= ${startTime__d} and date < ${plus1StartTime__d} and id > ${lastMaxId} order by date,id
# 主要的查询,大于startTime 小于等于endTime
- date >= ${plus1StartTime__d} and date < ${plus1EndTime__d} order by date,id
不使用offset,范围查询 id+日期时间(毫秒)
#YAML
# 当时间精度为ms
querySql: select * from test2 where ${__where}
batchWhere:
- date = ${startTime__d} and id > ${lastMaxId} order by date,id # 当lastMaxId为空时,不会执行
- date > ${startTime__d} and date <= ${endTime__d} order by date,id
idWhere:
- id = ${dataId}::INTEGER
不使用offset,范围查询 id+日期时间(精度低于ms,如 s)
#YAML
# 时间精度>ms
querySql: select * from test3 where ${__where}
batchWhere:
- date = ${startTime__d} and id > ${lastMaxId} order by date,id # 当lastMaxId为空时,
- date > ${startTime__d} and date <= ${endTime__d} order by date,id
idWhere:
- id = ${dataId}::INTEGER
范围查询,日期字段为日期类型字段,不带具体时间
- 使用offset查询
- 业务上需要保证查询时,查询时间段内的数据不再变化,否则有漏数据风险。
- 由于每次都是查当天数据,请合理降低查询频率,比如每2小时一次。
#YAML
# 仅日期
querySql: select * from test5 where ${__where}
batchWhere:
# 范围查询用于支持历史任务查询,但是又在跨天的时候,不要查询前一天的数据
- date1 > ${startTime__d0} and date1<=${endTime__d0} order by date1,id
# =用于查询当天数据
- date1 = ${endTime__d0} order by date1,id
idWhere:
- id = ${dataId}::INTEGER
范围查询,SqlServer2008, 日期字段为字符串类型, 格式yyyyMMdd
- 使用offset查询
- 业务上需要保证查询时,查询时间段内的数据不再变化,否则有漏数据风险。
- 代理服务的db配置,需要设置isSqlServer2008=true
- 由于每次都是查当天数据,请合理降低查询频率,比如每2小时一次。
#YAML
# 仅日期
querySql: select * from test5 where ${__where}
batchWhere:
# 范围查询用于支持历史任务查询,但是又在跨天的时候,不要查询前一天的数据
- date1 > CONVERT(VARCHAR,${startTime__d0},112) and date1<= CONVERT(VARCHAR,${endTime__d0},112) order by date1,id
# =用于查询当天数据
- date1 = CONVERT(VARCHAR,${endTime__d0},112) order by date1,id
idWhere:
- id = ${dataId}::INTEGER
不使用offset,范围多主键+日期时间
#YAML
# 多主键,这里的主键,是以-分割
querySql: select name1||'-'||name2 as name,* from test4 where ${__where}
batchWhere:
- date = ${startTime__d} and name1 = ${lastMaxId__dashed[0]} and name2 > ${lastMaxId__dashed[1]} order by date,name1,name2
- date = ${startTime__d} and name1 > ${lastMaxId__dashed[0]} order by date,name1,name2
- date > ${startTime__d} and date <= ${endTime__d} order by date,name1,name2
idWhere:
- name1 = ${dataId__dashed[0]} and name2 = ${lastMaxId__dashed[1]}
新增和更新
#YAML
insertSql: |
INSERT INTO test2 ("name", "date") VALUES(${name}, ${date__d})
updateSql: |
UPDATE test2 SET name=${name}, date=${date__d} WHERE id=${id}::INTEGER
更新时检查插入或者更新,以sqlserver为例
主对象配置:
#YAML
# 插入和更新 sqlserver
insertSql: |
insert into t7 (id, name, "date")
values (${id}, ${name}, ${date__d});
updateSql: |
MERGE INTO t7 AS target
USING (VALUES
(${id}, ${name}, ${date__d})
) AS source (id, name, date)
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.date = source.date
WHEN NOT MATCHED THEN
INSERT (id, name, date)
VALUES (source.id, source.name, source.date);
queryIdSql: |
select ${id}
从对象配置:
#YAML
# 插入和更新 sqlserver
insertSql: |
insert into t7_1 (id, pid, name, "date")
values (${id}, ${pid}, ${name}, ${date__d});
updateSql: |
MERGE INTO t7_1 AS target
USING (VALUES
(${id}, ${pid}, ${name}, ${date__d})
) AS source (id, pid, name, date)
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
target.pid = source.pid,
target.name = source.name,
target.date = source.date
WHEN NOT MATCHED THEN
INSERT (id, pid, name, date)
VALUES (source.id, source.pid, source.name, source.date);
queryIdSql: |
select ${id}
自定义函数调用:
示例如下:
/**
* @author 管理员
* @codeName sql
* @description sql
* @createTime 2024-05-23
*/
def type = "dbProxyExecutor"
def dcId = "66193e6e47b33f000110e9ae"
// 查询返回单个值
def params = ["dcId":dcId,
"executeType":"sql_query_string",
"body":[
"sql":"select count(*) from customers"
]
]
def arg = ["type":type, "params":Fx.json.toJson(params)]
def (Boolean error, HttpResult result, String errorMessage) = Fx.proxy.callAPI("erp.syncData.executeCustomFunction", [:], arg)
log.info(result)
log.info("result1="+result.content['data']['body'])
// 查询返回数据列表
params = ["dcId":dcId,
"executeType":"sql_query",
"body":[
"sql":"select * from customers limit 2"
]
]
arg = ["type":type, "params":Fx.json.toJson(params)]
(error,result,errorMessage) = Fx.proxy.callAPI("erp.syncData.executeCustomFunction", [:], arg)
log.info(result)
log.info("result2="+result.content['data']['body'])
// 执行更新
params = ["dcId":dcId,
"executeType":"sql_execute",
"body":[
"sql":'''UPDATE public.customers
SET "name"='程秀222'
WHERE id=68;'''
]
]
arg = ["type":type, "params":Fx.json.toJson(params)]
(error,result,errorMessage) = Fx.proxy.callAPI("erp.syncData.executeCustomFunction", [:], arg)
log.info(result)
log.info("result2="+result.content['data']['body'])