contrib/tsm_system_rows

This commit is contained in:
Simon Riggs 2015-05-15 15:31:14 -04:00
parent 149f6f1576
commit 4d40494b11
7 changed files with 390 additions and 0 deletions

4
contrib/tsm_system_rows/.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
# Generated subdirectories
/log/
/results/
/tmp_check/

View file

@ -0,0 +1,21 @@
# src/test/modules/tsm_system_rows/Makefile
MODULE_big = tsm_system_rows
OBJS = tsm_system_rows.o $(WIN32RES)
PGFILEDESC = "tsm_system_rows - SYSTEM TABLESAMPLE method which accepts number of rows as a limit"
EXTENSION = tsm_system_rows
DATA = tsm_system_rows--1.0.sql
REGRESS = tsm_system_rows
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = contrib/tsm_system_rows
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

View file

@ -0,0 +1,31 @@
CREATE EXTENSION tsm_system_rows;
CREATE TABLE test_tablesample (id int, name text) WITH (fillfactor=10); -- force smaller pages so we don't have to load too much data to get multiple pages
INSERT INTO test_tablesample SELECT i, repeat(i::text, 1000) FROM generate_series(0, 30) s(i) ORDER BY i;
ANALYZE test_tablesample;
SELECT count(*) FROM test_tablesample TABLESAMPLE system_rows (1000);
count
-------
31
(1 row)
SELECT id FROM test_tablesample TABLESAMPLE system_rows (8) REPEATABLE (5432);
id
----
7
14
21
28
4
11
18
25
(8 rows)
EXPLAIN SELECT id FROM test_tablesample TABLESAMPLE system_rows (20) REPEATABLE (10);
QUERY PLAN
-----------------------------------------------------------------------------------
Sample Scan (system_rows) on test_tablesample (cost=0.00..80.20 rows=20 width=4)
(1 row)
-- done
DROP TABLE test_tablesample CASCADE;

View file

@ -0,0 +1,14 @@
CREATE EXTENSION tsm_system_rows;
CREATE TABLE test_tablesample (id int, name text) WITH (fillfactor=10); -- force smaller pages so we don't have to load too much data to get multiple pages
INSERT INTO test_tablesample SELECT i, repeat(i::text, 1000) FROM generate_series(0, 30) s(i) ORDER BY i;
ANALYZE test_tablesample;
SELECT count(*) FROM test_tablesample TABLESAMPLE system_rows (1000);
SELECT id FROM test_tablesample TABLESAMPLE system_rows (8) REPEATABLE (5432);
EXPLAIN SELECT id FROM test_tablesample TABLESAMPLE system_rows (20) REPEATABLE (10);
-- done
DROP TABLE test_tablesample CASCADE;

View file

@ -0,0 +1,45 @@
/* src/test/modules/tablesample/tsm_system_rows--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION tsm_system_rows" to load this file. \quit
CREATE FUNCTION tsm_system_rows_init(internal, int4, int4)
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE FUNCTION tsm_system_rows_nextblock(internal)
RETURNS int4
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE FUNCTION tsm_system_rows_nexttuple(internal, int4, int2)
RETURNS int2
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE FUNCTION tsm_system_rows_examinetuple(internal, int4, internal, bool)
RETURNS bool
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE FUNCTION tsm_system_rows_end(internal)
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE FUNCTION tsm_system_rows_reset(internal)
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE FUNCTION tsm_system_rows_cost(internal, internal, internal, internal, internal, internal, internal)
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
INSERT INTO pg_tablesample_method VALUES('system_rows', false, true,
'tsm_system_rows_init', 'tsm_system_rows_nextblock',
'tsm_system_rows_nexttuple', 'tsm_system_rows_examinetuple',
'tsm_system_rows_end', 'tsm_system_rows_reset', 'tsm_system_rows_cost');

View file

@ -0,0 +1,270 @@
/*-------------------------------------------------------------------------
*
* tsm_system_rows.c
* interface routines for system_rows tablesample method
*
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* contrib/tsm_system_rows_rowlimit/tsm_system_rows.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "access/tablesample.h"
#include "access/relscan.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "nodes/relation.h"
#include "optimizer/clauses.h"
#include "storage/bufmgr.h"
#include "utils/sampling.h"
PG_MODULE_MAGIC;
/*
* State
*/
typedef struct
{
SamplerRandomState randstate;
uint32 seed; /* random seed */
BlockNumber nblocks; /* number of block in relation */
int32 ntuples; /* number of tuples to return */
int32 donetuples; /* tuples already returned */
OffsetNumber lt; /* last tuple returned from current block */
BlockNumber step; /* step size */
BlockNumber lb; /* last block visited */
BlockNumber doneblocks; /* number of already returned blocks */
} SystemSamplerData;
PG_FUNCTION_INFO_V1(tsm_system_rows_init);
PG_FUNCTION_INFO_V1(tsm_system_rows_nextblock);
PG_FUNCTION_INFO_V1(tsm_system_rows_nexttuple);
PG_FUNCTION_INFO_V1(tsm_system_rows_examinetuple);
PG_FUNCTION_INFO_V1(tsm_system_rows_end);
PG_FUNCTION_INFO_V1(tsm_system_rows_reset);
PG_FUNCTION_INFO_V1(tsm_system_rows_cost);
static uint32 random_relative_prime(uint32 n, SamplerRandomState randstate);
/*
* Initializes the state.
*/
Datum
tsm_system_rows_init(PG_FUNCTION_ARGS)
{
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
uint32 seed = PG_GETARG_UINT32(1);
int32 ntuples = PG_ARGISNULL(2) ? -1 : PG_GETARG_INT32(2);
HeapScanDesc scan = tsdesc->heapScan;
SystemSamplerData *sampler;
if (ntuples < 1)
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("invalid sample size"),
errhint("Sample size must be positive integer value.")));
sampler = palloc0(sizeof(SystemSamplerData));
/* Remember initial values for reinit */
sampler->seed = seed;
sampler->nblocks = scan->rs_nblocks;
sampler->ntuples = ntuples;
sampler->donetuples = 0;
sampler->lt = InvalidOffsetNumber;
sampler->doneblocks = 0;
sampler_random_init_state(sampler->seed, sampler->randstate);
/* Find relative prime as step size for linear probing. */
sampler->step = random_relative_prime(sampler->nblocks, sampler->randstate);
/*
* Randomize start position so that blocks close to step size don't have
* higher probability of being chosen on very short scan.
*/
sampler->lb = sampler_random_fract(sampler->randstate) *
(sampler->nblocks / sampler->step);
tsdesc->tsmdata = (void *) sampler;
PG_RETURN_VOID();
}
/*
* Get next block number or InvalidBlockNumber when we're done.
*
* Uses linear probing algorithm for picking next block.
*/
Datum
tsm_system_rows_nextblock(PG_FUNCTION_ARGS)
{
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
SystemSamplerData *sampler = (SystemSamplerData *) tsdesc->tsmdata;
sampler->lb = (sampler->lb + sampler->step) % sampler->nblocks;
sampler->doneblocks++;
/* All blocks have been read, we're done */
if (sampler->doneblocks > sampler->nblocks ||
sampler->donetuples >= sampler->ntuples)
PG_RETURN_UINT32(InvalidBlockNumber);
PG_RETURN_UINT32(sampler->lb);
}
/*
* Get next tuple offset in current block or InvalidOffsetNumber if we are done
* with this block.
*/
Datum
tsm_system_rows_nexttuple(PG_FUNCTION_ARGS)
{
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
OffsetNumber maxoffset = PG_GETARG_UINT16(2);
SystemSamplerData *sampler = (SystemSamplerData *) tsdesc->tsmdata;
OffsetNumber tupoffset = sampler->lt;
if (tupoffset == InvalidOffsetNumber)
tupoffset = FirstOffsetNumber;
else
tupoffset++;
if (tupoffset > maxoffset ||
sampler->donetuples >= sampler->ntuples)
tupoffset = InvalidOffsetNumber;
sampler->lt = tupoffset;
PG_RETURN_UINT16(tupoffset);
}
/*
* Examine tuple and decide if it should be returned.
*/
Datum
tsm_system_rows_examinetuple(PG_FUNCTION_ARGS)
{
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
bool visible = PG_GETARG_BOOL(3);
SystemSamplerData *sampler = (SystemSamplerData *) tsdesc->tsmdata;
if (!visible)
PG_RETURN_BOOL(false);
sampler->donetuples++;
PG_RETURN_BOOL(true);
}
/*
* Cleanup method.
*/
Datum
tsm_system_rows_end(PG_FUNCTION_ARGS)
{
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
pfree(tsdesc->tsmdata);
PG_RETURN_VOID();
}
/*
* Reset state (called by ReScan).
*/
Datum
tsm_system_rows_reset(PG_FUNCTION_ARGS)
{
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
SystemSamplerData *sampler = (SystemSamplerData *) tsdesc->tsmdata;
sampler->lt = InvalidOffsetNumber;
sampler->donetuples = 0;
sampler->doneblocks = 0;
sampler_random_init_state(sampler->seed, sampler->randstate);
sampler->step = random_relative_prime(sampler->nblocks, sampler->randstate);
sampler->lb = sampler_random_fract(sampler->randstate) * (sampler->nblocks / sampler->step);
PG_RETURN_VOID();
}
/*
* Costing function.
*/
Datum
tsm_system_rows_cost(PG_FUNCTION_ARGS)
{
PlannerInfo *root = (PlannerInfo *) PG_GETARG_POINTER(0);
Path *path = (Path *) PG_GETARG_POINTER(1);
RelOptInfo *baserel = (RelOptInfo *) PG_GETARG_POINTER(2);
List *args = (List *) PG_GETARG_POINTER(3);
BlockNumber *pages = (BlockNumber *) PG_GETARG_POINTER(4);
double *tuples = (double *) PG_GETARG_POINTER(5);
Node *limitnode;
int32 ntuples;
limitnode = linitial(args);
limitnode = estimate_expression_value(root, limitnode);
if (IsA(limitnode, RelabelType))
limitnode = (Node *) ((RelabelType *) limitnode)->arg;
if (IsA(limitnode, Const))
ntuples = DatumGetInt32(((Const *) limitnode)->constvalue);
else
{
/* Default ntuples if the estimation didn't return Const. */
ntuples = 1000;
}
*pages = Min(baserel->pages, ntuples);
*tuples = ntuples;
path->rows = *tuples;
PG_RETURN_VOID();
}
static uint32
gcd (uint32 a, uint32 b)
{
uint32 c;
while (a != 0)
{
c = a;
a = b % a;
b = c;
}
return b;
}
static uint32
random_relative_prime(uint32 n, SamplerRandomState randstate)
{
/* Pick random starting number, with some limits on what it can be. */
uint32 r = (uint32) sampler_random_fract(randstate) * n/2 + n/4,
t;
/*
* This should only take 2 or 3 iterations as the probability of 2 numbers
* being relatively prime is ~61%.
*/
while ((t = gcd(r, n)) > 1)
{
CHECK_FOR_INTERRUPTS();
r /= t;
}
return r;
}

View file

@ -0,0 +1,5 @@
# tsm_system_rows extension
comment = 'SYSTEM TABLESAMPLE method which accepts number rows as a limit'
default_version = '1.0'
module_pathname = '$libdir/tsm_system_rows'
relocatable = true