目录
- 引言
- 【功能】
- 【代码】
- 【实用函数】
- 1 :管道函数是什么,应用于什么场景
- 2 return next实现
- 具体处理函数:exec_stmt_return_next
- 1 初始化tuple store
- 场景一:return next返回var类型
- 场景二:return next返回record类型
- 场景三:return next返回row类型
- 3 用例
引言
【功能】
- Oracle的return pipelined管道函数可以使一次返回的集合类型,变为 逐条返回pipe row(集合中的一条)给SQL层,大大减少内存的使用。
- Postgresql的return setof函数并不能起到降低内存使用的效果,return next 单条数据只起到了缓存的效果,并不会把数据逐条返回SQL层处理,没有降低内存的效果。
【代码】
- exec_stmt_return_next中的tupledesc从执行计划node中取出,返回值需要满足desc要求,缓存值也会按该desc保存。
- return next对rec类型和row类型处理的区别
- rec类型本质上就是tuple,数据和desc都以扩展形式存放在erh中。如果需要转换为tuple,有几个标准函数提供转换功能,且支持类型转换。【转换后调用tuplestore的标准接口缓存tuple】
- row类型本质上是一个虚拟行(由一组datum位置组成),row->varnos[i]指向某一个datum,如果想把row转换为tuple,需要用exec_eval_datum算出varnos指向的datum的值,然后组装成values和nulls数组,用heap_form_tuple构造。注意这种转换过程不会有类型转换,如果需要的desc和算出来的列类型对不上,返回空。成功【转换后调用tuplestore的标准接口缓存tuple】
- return next对var类型的处理:var看做单列tuple,按执行计划给的desc转换类型后构造tuple。【转换后调用tuplestore的标准接口缓存tuple】
【实用函数】
- 通用
- 类型转换:exec_cast_value(传入的值不能是eoh真实的头,使用前需要转成eoh存的1be头,1be指向真实头)
- 数组拼接minimaltuple:heap_form_minimal_tuple
- 有一个tuple和desc转换为另一个desc的tuple:convert_tuples_by_position、execute_attr_map_tuple
- tuplestore:
- 用values数组存tuple(用tuplestore_puttuple_common拼好后传tuple):tuplestore_putvalues
- 用HeapTuple存tuple(直接传tuple):tuplestore_puttuple
- 类型
- 根据类型id和mod找出desc:lookup_rowtype_tupdesc
- erh
- 从erh扩展类型拿到紧凑tuple:expanded_record_get_tuple
1 :管道函数是什么,应用于什么场景
oracle支持pipelined函数,可以在函数定义时指定RETURN 集合类型 PIPELINED 来说明当前函数是管道函数。
管道函数最大的作用就是可以使一次返回的集合类型,变为 逐条返回,大大减少内存的使用。
例如:嵌套表类型outrecset是函数f_trans的返回值,普通函数只能组装好嵌套表outrecset(全部缓存在内存),一次性返回。如果嵌套表内容较多,可能会占用较大的内存空间。
如果使用管道函数,可以通过pipe row(嵌套表中的一行)来代替return语句,函数把嵌套表逐行返回给上层处理,无需缓存,降低内存使用。
ORACLE实例:
CREATE OR REPLACE PACKAGE refcur_pkg AUTHID DEFINER IS | |
TYPE refcur_t IS REF CURSOR RETURN employees%ROWTYPE; | |
TYPE outrec_typ IS RECORD ( | |
var_num NUMBER(), | |
var_char VARCHAR2(30), | |
var_char VARCHAR2(30) | |
); | |
TYPE outrecset IS TABLE OF outrec_typ; | |
FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED; | |
END refcur_pkg; | |
/ | |
CREATE OR REPLACE PACKAGE BODY refcur_pkg IS | |
FUNCTION f_trans (p refcur_t) RETURN outrecset PIPELINED IS | |
out_rec outrec_typ; | |
in_rec p%ROWTYPE; | |
BEGIN | |
LOOP | |
FETCH p INTO in_rec; -- input row | |
EXIT WHEN p%NOTFOUND; | |
out_rec.var_num := in_rec.employee_id; | |
out_rec.var_char := in_rec.first_name; | |
out_rec.var_char := in_rec.last_name; | |
PIPE ROW(out_rec); -- first transformed output row | |
out_rec.var_char := in_rec.email; | |
out_rec.var_char := in_rec.phone_number; | |
PIPE ROW(out_rec); -- second transformed output row | |
END LOOP; | |
CLOSE p; | |
RETURN; | |
END f_trans; | |
END refcur_pkg; | |
/ | |
SELECT * FROM TABLE ( | |
refcur_pkg.f_trans ( | |
CURSOR (SELECT * FROM employees WHERE department_id =) | |
) | |
); |
在PG中,普通的return语句也是需要一次性返回数据,但PG应该是参考ORACLE实现了return next的功能,也希望逐条返回数据(PG没有集合类型,已普通类型为例):
drop function f; | |
create or replace function f(in i int, out j int) returns setof int as $$ | |
begin | |
j := i+; | |
return next; | |
j := i+; | |
return next; | |
return; | |
end$$ language plpgsql; | |
select * from f(42); | |
j | |
---- |
但在内核实现中,并不是逐条返回的,return next其实只起到了缓存数据的功能,总的数据集也是一次性返回SQL层的,和直接return没有区别(只有语法上的区别)。
所以PG的return setof函数并不能起到降低内存使用的效果。下面来分析具体过程。
2 return next实现
return next目前支持三类数据的返回,var、rec、rows return next也可以不加参数,返回值按out参数列表拼接
具体处理函数:exec_stmt_return_next
static int | |
exec_stmt_return_next(PLpgSQL_execstate *estate, | |
PLpgSQL_stmt_return_next *stmt) | |
{ | |
TupleDesc tupdesc; | |
int natts; | |
HeapTuple tuple; | |
MemoryContext oldcontext; |
1 初始化tuple store
初始化总结:
1 初始化的过程就是在构造Tuplestorestate,主要动作:
- 给Tuplestorestate新的内存上下文ExecutorState
- 记录不能随机访问:eflags = EXEC_FLAG_REWIND
- 记录三个操作函数:copytup_heap、writetup_heap、readtup_heap
2 给estate->tuple_store_desc添加desc,desc来源:
- 从执行计划节点中node(Tuplestorestate)拿到后,传入ExecMakeTableFunctionResult
- ExecMakeTableFunctionResult组装ReturnSetInfo挂到fcinfo->resultinfo上
- plpgsql_exec_function时从fcinfo中拿出ReturnSetInfo取到desc
- plpgsql_estate_setup将取到的desc存入estate->rsi = rsi
plpgsql_estate_setup (estate=0x7ffd81e2f850, func=0x2419028, rsi=0x7ffd81e2fb20, simple_eval_estate=0x0, simple_eval_resowner=0x0) at pl_exec.c:3972 | |
in plpgsql_exec_function (func=0x2419028, fcinfo=0x24da5a8, simple_eval_estate=0x0, simple_eval_resowner=0x0, procedure_resowner=0x0, atomic=true) at pl_exec.c:485 | 0x00007fe0a3992064|
in plpgsql_call_handler (fcinfo=0x24da5a8) at pl_handler.c:277 | 0x00007fe0a39ac8f9|
in ExecMakeTableFunctionResult (setexpr=0x24e0b40, econtext=0x24e0a10, argContext=0x24da490, expectedDesc=0x24e1110, randomAccess=false) at execSRF.c:235 | 0x0000000000738829|
in FunctionNext (node=0x24e0800) at nodeFunctionscan.c:95 | 0x0000000000753eed|
in ExecScanFetch (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:133 | 0x000000000073a081|
in ExecScan (node=0x24e0800, accessMtd=0x753e3b <FunctionNext>, recheckMtd=0x754242 <FunctionRecheck>) at execScan.c:182 | 0x000000000073a0f6|
in ExecFunctionScan (pstate=0x24e0800) at nodeFunctionscan.c:270 | 0x000000000075428c|
in ExecProcNodeFirst (node=0x24e0800) at execProcnode.c:464 | 0x000000000073614e|
in ExecProcNode (node=0x24e0800) at ../../../src/include/executor/executor.h:262 | 0x000000000072a08a|
in ExecutePlan (estate=0x24e05d8, planstate=0x24e0800, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x24d5910, execute_once=true) at execMain.c:1632 | 0x000000000072cb80|
in standard_ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:364 | 0x000000000072a6d1|
in ExecutorRun (queryDesc=0x23f1248, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:308 | 0x000000000072a50b|
in PortalRunSelect (portal=0x2474a28, forward=true, count=0, dest=0x24d5910) at pquery.c:924 | 0x0000000000997ba9|
in PortalRun (portal=0x2474a28, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x24d5910, altdest=0x24d5910, qc=0x7ffd81e300b0) at pquery.c:768 | 0x0000000000997867|
in exec_simple_query (query_string=0x23c9518 "select * from f1(42);") at postgres.c:1238 | 0x0000000000991408|
in PostgresMain (dbname=0x2400998 "postgres", username=0x23c5178 "mingjie") at postgres.c:4563 | 0x0000000000995a3e|
in BackendRun (port=0x23f7220) at postmaster.c:4396 | 0x00000000008d3cfe|
in BackendStartup (port=0x23f7220) at postmaster.c:4124 | 0x00000000008d3697|
in ServerLoop () at postmaster.c:1791 | 0x00000000008d00b8|
in PostmasterMain (argc=1, argv=0x23c3120) at postmaster.c:1463 | 0x00000000008cf98a|
in main (argc=1, argv=0x23c3120) at main.c:200 | 0x00000000007ada4b
分析:
if (estate->tuple_store == NULL) | |
exec_init_tuple_store(estate); | |
tupdesc = estate->tuple_store_desc; | |
natts = tupdesc->natts; | |
if (stmt->retvarno >=) | |
{ | |
PLpgSQL_datum *retvar = estate->datums[stmt->retvarno]; | |
switch (retvar->dtype) | |
{ |
初始化函数exec_init_tuple_store
static void | |
exec_init_tuple_store(PLpgSQL_execstate *estate) | |
{ | |
ReturnSetInfo *rsi = estate->rsi; | |
MemoryContext oldcxt; | |
ResourceOwner oldowner; | |
// 从"SPI Proc"切换到"ExecutorState" | |
oldcxt = MemoryContextSwitchTo(estate->tuple_store_cxt); | |
// 从“Portal”切换到"Portal" | |
oldowner = CurrentResourceOwner; | |
CurrentResourceOwner = estate->tuple_store_owner; | |
// 进入tuplestore_begin_heap函数 | |
estate->tuple_store = | |
tuplestore_begin_heap(rsi->allowedModes & SFRM_Materialize_Random, false, work_mem); | |
CurrentResourceOwner = oldowner; | |
MemoryContextSwitchTo(oldcxt); | |
// 给estate添加DESC,rsi->expectedDesc的来源? | |
estate->tuple_store_desc = rsi->expectedDesc; | |
} |
进入tuplestore_begin_heap
Tuplestorestate * | |
tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes) | |
{ | |
// 输入false不允许随机访问、false、 | |
Tuplestorestate *state; | |
int eflags; | |
// eflags = EXEC_FLAG_REWIND | |
eflags = randomAccess ? | |
(EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) : | |
(EXEC_FLAG_REWIND); | |
// 进入tuple store模块开始初始化返回Tuplestorestate,注意他会直接拿当前的memcontext | |
state = tuplestore_begin_common(eflags, interXact, maxKBytes); | |
// 返回的Tuplestorestate状态: | |
// state = {status = TSS_INMEM, eflags =, backward = false, interXact = false, | |
// truncated = false, availMem =, allowedMem = 8388608, tuples = 0, | |
// myfile =x0, context = "ExecutorState", resowner = "Portal", copytup = 0x0, | |
// writetup =x0, readtup = 0x0, memtuples = 0x24f0d88, memtupdeleted = 0, | |
// memtupcount =, memtupsize = 2048, growmemtuples = true, readptrs = 0x24e7a70, | |
// activeptr =, readptrcount = 1, readptrsize = 8, writepos_file = 0,writepos_offset = 0} | |
state->copytup = copytup_heap; | |
state->writetup = writetup_heap; | |
state->readtup = readtup_heap; | |
return state; | |
} |
后面根据返回值的不同,进入几个分支。
在进入前,desc已经获取到了: tupdesc = estate->tuple_store_desc; natts = tupdesc->natts;
场景一:return next返回var类型
case PLPGSQL_DTYPE_VAR: | |
{ | |
PLpgSQL_var *var = (PLpgSQL_var *) retvar; | |
Datum retval = var->value; | |
bool isNull = var->isnull; | |
Form_pg_attribute attr = TupleDescAttr(tupdesc,); | |
if (natts !=) | |
ereport(ERROR, | |
(errcode(ERRCODE_DATATYPE_MISMATCH), | |
errmsg("wrong result type supplied in RETURN NEXT"))); | |
// retval是一个eoh的头,后续处理需要一个be的头(1be的data部分指向eoh) | |
retval = MakeExpandedObjectReadOnly(retval, isNull, var->datatype->typlen); | |
// 转成需要的类型 | |
retval = exec_cast_value(estate, | |
retval, | |
&isNull, | |
var->datatype->typoid, | |
var->datatype->atttypmod, | |
attr->atttypid, | |
attr->atttypmod); | |
tuplestore_putvalues(estate->tuple_store, tupdesc, &retval, &isNull); | |
} | |
break; |
执行tuplestore_putvalues保存元组
void | |
tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, | |
Datum *values, bool *isnull) | |
{ | |
MinimalTuple tuple; | |
MemoryContext oldcxt = MemoryContextSwitchTo(state->context); | |
tuple = heap_form_minimal_tuple(tdesc, values, isnull); | |
// 记录使用了多少空间,修改state->availMem | |
USEMEM(state, GetMemoryChunkSpace(tuple)); | |
tuplestore_puttuple_common(state, (void *) tuple); | |
MemoryContextSwitchTo(oldcxt); | |
} | |
static void | |
tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) | |
{ | |
TSReadPointer *readptr; | |
int i; | |
ResourceOwner oldowner; | |
state->tuples++; | |
switch (state->status) | |
{ |
内存态直接用数组缓存tuple,tuple使用的内存是在外层函数切换上下文申请的。
case TSS_INMEM: | |
readptr = state->readptrs; | |
for (i =; i < state->readptrcount; readptr++, i++) | |
{ | |
if (readptr->eof_reached && i != state->activeptr) | |
{ | |
readptr->eof_reached = false; | |
readptr->current = state->memtupcount; | |
} | |
} | |
if (state->memtupcount >= state->memtupsize -) | |
{ | |
(void) grow_memtuples(state); | |
} | |
state->memtuples[state->memtupcount++] = tuple; | |
if (state->memtupcount < state->memtupsize && !LACKMEM(state)) | |
return; | |
PrepareTempTablespaces(); | |
oldowner = CurrentResourceOwner; | |
CurrentResourceOwner = state->resowner; | |
state->myfile = BufFileCreateTemp(state->interXact); | |
CurrentResourceOwner = oldowner; | |
state->backward = (state->eflags & EXEC_FLAG_BACKWARD) !=; | |
state->status = TSS_WRITEFILE; | |
dumptuples(state); | |
break; | |
... |
场景二:return next返回record类型
case PLPGSQL_DTYPE_REC: | |
{ | |
PLpgSQL_rec *rec = (PLpgSQL_rec *) retvar; | |
TupleDesc rec_tupdesc; | |
TupleConversionMap *tupmap; |
拿到record:
{dtype = PLPGSQL_DTYPE_REC, dno = 1, refname = 0x24db608 "r", lineno = 3, isconst = false, notnull = false, default_val = 0x0, datatype = {typname='foo'}, rectypeid = 17117, firstfield = -1, erh = 0x2509708}
- 数据和desc都在erh中,列名在firstfield指向的位置。
- 数据类型在datatype中:foo
- 数据类型oid在rectypeid中:17117->foo
if (rec->erh == NULL) | |
instantiate_empty_record_variable(estate, rec); | |
if (ExpandedRecordIsEmpty(rec->erh)) | |
deconstruct_expanded_record(rec->erh); | |
// "SPI Proc"切到"ExprContext" | |
oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate)); | |
// return erh->er_tupdesc; | |
rec_tupdesc = expanded_record_get_tupdesc(rec->erh); | |
// 从保存的desc:rec_tupdesc转换到输出的desc:tupdesc,第一步:生成转换map | |
tupmap = convert_tuples_by_position(rec_tupdesc, | |
tupdesc, | |
gettext_noop("wrong record type supplied in RETURN NEXT")); | |
tuple = expanded_record_get_tuple(rec->erh); | |
if (tupmap) | |
// 从保存的desc:rec_tupdesc转换到输出的desc:tupdesc,第二步:用map生成转换后的元组 | |
tuple = execute_attr_map_tuple(tuple, tupmap); | |
// 缓存元组 | |
tuplestore_puttuple(estate->tuple_store, tuple); | |
MemoryContextSwitchTo(oldcontext); | |
} | |
break; |
场景三:return next返回row类型
必须是两列以上的out参数,直接return next空,才会使用这段逻辑。
case PLPGSQL_DTYPE_ROW: | |
{ | |
PLpgSQL_row *row = (PLpgSQL_row *) retvar; | |
oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate)); | |
// 必须严格匹配tupdesc的类型,对不上则转换失败 | |
tuple = make_tuple_from_row(estate, row, tupdesc); | |
if (tuple == NULL) | |
ereport(ERROR,...) | |
tuplestore_puttuple(estate->tuple_store, tuple); | |
MemoryContextSwitchTo(oldcontext); | |
} | |
break; | |
default: | |
elog(ERROR, "unrecognized dtype: %d", retvar->dtype); | |
break; | |
} | |
} |
3 用例
drop function f; | |
create or replace function f(in i int, out j int) returns setof int as $$ | |
begin | |
j := i+; | |
return next; | |
j := i+; | |
return next; | |
return; | |
end$$ language plpgsql; | |
select * from f(42); | |
---- | |
CREATE TABLE foo (fooid INT, foosubid INT, fooname TEXT); | |
INSERT INTO foo VALUES (, 2, 'three'); | |
INSERT INTO foo VALUES (, 5, 'six'); | |
CREATE OR REPLACE FUNCTION get_all_foo() RETURNS SETOF foo AS | |
$BODY$ | |
DECLARE | |
r foo%rowtype; | |
BEGIN | |
FOR r IN | |
SELECT * FROM foo WHERE fooid > | |
LOOP | |
-- can do some processing here | |
RETURN NEXT r; -- return current row of SELECT | |
END LOOP; | |
RETURN; | |
END; | |
$BODY$ | |
LANGUAGE plpgsql; | |
SELECT * FROM get_all_foo(); | |
-------- | |
drop function f(int); | |
create function f(in i int, out j int, out k text) returns setof record as $$ | |
begin | |
j := i+; | |
k := 'foo'; | |
return next; | |
j := j+; | |
k := 'foot'; | |
return next; | |
return; | |
end$$ language plpgsql; | |
select * from f(42); |