diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 9f5f8a9b76..21e9d56f73 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -92,6 +92,36 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'in COMMIT (3 rows) +INSERT INTO replication_example(somedata, text) VALUES (1, 4); +INSERT INTO replication_example(somedata, text) VALUES (1, 5); +SELECT pg_current_wal_lsn() AS wal_lsn \gset +INSERT INTO replication_example(somedata, text) VALUES (1, 6); +SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset +SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn()); + slot_name +------------------ + regression_slot2 +(1 row) + +SELECT :'wal_lsn' = :'end_lsn'; + ?column? +---------- + t +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------------------------------------- + BEGIN + table public.replication_example: INSERT: id[integer]:6 somedata[integer]:1 text[character varying]:'6' + COMMIT +(3 rows) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + DROP TABLE replication_example; -- error SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index fa9561f54e..706340c1d8 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -45,6 +45,21 @@ INSERT INTO replication_example(somedata, text) VALUES (1, 3); SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +INSERT INTO replication_example(somedata, text) VALUES (1, 4); +INSERT INTO replication_example(somedata, text) VALUES (1, 5); + +SELECT pg_current_wal_lsn() AS wal_lsn \gset + +INSERT INTO replication_example(somedata, text) VALUES (1, 6); + +SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset +SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn()); + +SELECT :'wal_lsn' = :'end_lsn'; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + DROP TABLE replication_example; -- error diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 2428434030..487c7ff750 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -19155,6 +19155,25 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); + + + + pg_replication_slot_advance + + pg_replication_slot_advance(slot_name name, upto_lsn pg_lsn) + + + (slot_name name, end_lsn pg_lsn) + bool + + + Advances the current confirmed position of a replication slot named + slot_name. The slot will not be moved backwards, + and it will not be moved beyond the current insert location. Returns + name of the slot and real position to which it was advanced to. + + + diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 537eba7875..6eb0d5527e 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -88,6 +88,9 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); * call ReorderBufferProcessXid for each record type by default, because * e.g. empty xacts can be handled more efficiently if there's no previous * state for them. + * + * We also support the ability to fast forward thru records, skipping some + * record types completely - see individual record types for details. */ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) @@ -332,8 +335,10 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_invalidations *invalidations = (xl_invalidations *) XLogRecGetData(r); - ReorderBufferImmediateInvalidation( - ctx->reorder, invalidations->nmsgs, invalidations->msgs); + if (!ctx->fast_forward) + ReorderBufferImmediateInvalidation(ctx->reorder, + invalidations->nmsgs, + invalidations->msgs); } break; default: @@ -353,14 +358,19 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); - /* no point in doing anything yet */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) return; switch (info) { case XLOG_HEAP2_MULTI_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (!ctx->fast_forward && + SnapBuildProcessChange(builder, xid, buf->origptr)) DecodeMultiInsert(ctx, buf); break; case XLOG_HEAP2_NEW_CID: @@ -408,8 +418,12 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); - /* no point in doing anything yet */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding data changes. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) return; switch (info) @@ -501,8 +515,12 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); - /* No point in doing anything yet. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding messages. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) return; message = (xl_logical_message *) XLogRecGetData(r); @@ -554,8 +572,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, */ if (parsed->nmsgs > 0) { - ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, - parsed->nmsgs, parsed->msgs); + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, + parsed->nmsgs, parsed->msgs); ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); } @@ -574,6 +593,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * are restarting or if we haven't assembled a consistent snapshot yet. * 2) The transaction happened in another database. * 3) The output plugin is not interested in the origin. + * 4) We are doing fast-forwarding * * We can't just use ReorderBufferAbort() here, because we need to execute * the transaction's invalidations. This currently won't be needed if @@ -589,7 +609,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || - FilterByOrigin(ctx, origin_id)) + ctx->fast_forward || FilterByOrigin(ctx, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2fc9d7d70f..7637efc32e 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -115,6 +115,7 @@ StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, bool need_full_snapshot, + bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -140,7 +141,8 @@ StartupDecodingContext(List *output_plugin_options, * (re-)load output plugins, so we detect a bad (removed) output plugin * now. */ - LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); + if (!fast_forward) + LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); /* * Now that the slot's xmin has been set, we can announce ourselves as a @@ -191,6 +193,8 @@ StartupDecodingContext(List *output_plugin_options, ctx->output_plugin_options = output_plugin_options; + ctx->fast_forward = fast_forward; + MemoryContextSwitchTo(old_context); return ctx; @@ -303,8 +307,9 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - need_full_snapshot, read_page, prepare_write, - do_write, update_progress); + need_full_snapshot, true, + read_page, prepare_write, do_write, + update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -342,6 +347,7 @@ CreateInitDecodingContext(char *plugin, LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, + bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -395,8 +401,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - read_page, prepare_write, do_write, - update_progress); + fast_forward, read_page, prepare_write, + do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -573,6 +579,8 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "startup"; @@ -598,6 +606,8 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx) LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "shutdown"; @@ -629,6 +639,8 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "begin"; @@ -658,6 +670,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "commit"; @@ -687,6 +701,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "change"; @@ -721,6 +737,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) ErrorContextCallback errcallback; bool ret; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "filter_by_origin"; @@ -751,6 +769,8 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + if (ctx->callbacks.message_cb == NULL) return; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 9aab6e71b2..54c25f1f5b 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -251,6 +251,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* restart at slot's confirmed_flush */ ctx = CreateDecodingContext(InvalidXLogRecPtr, options, + false, logical_read_local_xlog_page, LogicalOutputPrepareWrite, LogicalOutputWrite, NULL); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index b02df593e9..93d2e20f76 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -17,11 +17,14 @@ #include "miscadmin.h" #include "access/htup_details.h" +#include "replication/decode.h" #include "replication/slot.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" #include "utils/builtins.h" +#include "utils/inval.h" #include "utils/pg_lsn.h" +#include "utils/resowner.h" static void check_permissions(void) @@ -312,3 +315,200 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) return (Datum) 0; } + +/* + * Helper function for advancing physical replication slot forward. + */ +static XLogRecPtr +pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +{ + XLogRecPtr retlsn = InvalidXLogRecPtr; + + SpinLockAcquire(&MyReplicationSlot->mutex); + if (MyReplicationSlot->data.restart_lsn < moveto) + { + MyReplicationSlot->data.restart_lsn = moveto; + retlsn = moveto; + } + SpinLockRelease(&MyReplicationSlot->mutex); + + return retlsn; +} + +/* + * Helper function for advancing logical replication slot forward. + */ +static XLogRecPtr +pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +{ + LogicalDecodingContext *ctx; + ResourceOwner old_resowner = CurrentResourceOwner; + XLogRecPtr retlsn = InvalidXLogRecPtr; + + PG_TRY(); + { + /* restart at slot's confirmed_flush */ + ctx = CreateDecodingContext(InvalidXLogRecPtr, + NIL, + true, + logical_read_local_xlog_page, + NULL, NULL, NULL); + + CurrentResourceOwner = ResourceOwnerCreate(CurrentResourceOwner, + "logical decoding"); + + /* invalidate non-timetravel entries */ + InvalidateSystemCaches(); + + /* Decode until we run out of records */ + while ((startlsn != InvalidXLogRecPtr && startlsn < moveto) || + (ctx->reader->EndRecPtr != InvalidXLogRecPtr && ctx->reader->EndRecPtr < moveto)) + { + XLogRecord *record; + char *errm = NULL; + + record = XLogReadRecord(ctx->reader, startlsn, &errm); + if (errm) + elog(ERROR, "%s", errm); + + /* + * Now that we've set up the xlog reader state, subsequent calls + * pass InvalidXLogRecPtr to say "continue from last record" + */ + startlsn = InvalidXLogRecPtr; + + /* + * The {begin_txn,change,commit_txn}_wrapper callbacks above will + * store the description into our tuplestore. + */ + if (record != NULL) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + /* check limits */ + if (moveto <= ctx->reader->EndRecPtr) + break; + + CHECK_FOR_INTERRUPTS(); + } + + CurrentResourceOwner = old_resowner; + + if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + { + LogicalConfirmReceivedLocation(moveto); + + /* + * If only the confirmed_flush_lsn has changed the slot won't get + * marked as dirty by the above. Callers on the walsender + * interface are expected to keep track of their own progress and + * don't need it written out. But SQL-interface users cannot + * specify their own start positions and it's harder for them to + * keep track of their progress, so we should make more of an + * effort to save it for them. + * + * Dirty the slot so it's written out at the next checkpoint. + * We'll still lose its position on crash, as documented, but it's + * better than always losing the position even on clean restart. + */ + ReplicationSlotMarkDirty(); + } + + retlsn = MyReplicationSlot->data.confirmed_flush; + + /* free context, call shutdown callback */ + FreeDecodingContext(ctx); + + InvalidateSystemCaches(); + } + PG_CATCH(); + { + /* clear all timetravel entries */ + InvalidateSystemCaches(); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return retlsn; +} + +/* + * SQL function for moving the position in a replication slot. + */ +Datum +pg_replication_slot_advance(PG_FUNCTION_ARGS) +{ + Name slotname = PG_GETARG_NAME(0); + XLogRecPtr moveto = PG_GETARG_LSN(1); + XLogRecPtr endlsn; + XLogRecPtr startlsn; + TupleDesc tupdesc; + Datum values[2]; + bool nulls[2]; + HeapTuple tuple; + Datum result; + + Assert(!MyReplicationSlot); + + check_permissions(); + + if (XLogRecPtrIsInvalid(moveto)) + ereport(ERROR, + (errmsg("invalid target wal lsn"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* + * We can't move slot past what's been flushed/replayed so clamp the + * target possition accordingly. + */ + if (!RecoveryInProgress()) + moveto = Min(moveto, GetFlushRecPtr()); + else + moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID)); + + /* Acquire the slot so we "own" it */ + ReplicationSlotAcquire(NameStr(*slotname), true); + + startlsn = MyReplicationSlot->data.confirmed_flush; + if (moveto < startlsn) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move slot to %X/%X, minimum is %X/%X", + (uint32) (moveto >> 32), (uint32) moveto, + (uint32) (MyReplicationSlot->data.confirmed_flush >> 32), + (uint32) (MyReplicationSlot->data.confirmed_flush)))); + } + + if (OidIsValid(MyReplicationSlot->data.database)) + endlsn = pg_logical_replication_slot_advance(startlsn, moveto); + else + endlsn = pg_physical_replication_slot_advance(startlsn, moveto); + + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + nulls[0] = false; + + /* Update the on disk state when lsn was updated. */ + if (XLogRecPtrIsInvalid(endlsn)) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + ReplicationSlotSave(); + } + + ReplicationSlotRelease(); + + /* Return the reached position. */ + values[1] = LSNGetDatum(endlsn); + nulls[1] = false; + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + PG_RETURN_DATUM(result); +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 8bef3fbdaf..130ecd5559 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1075,6 +1075,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) * to be shipped from that position. */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, + false, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 298e0ae2f0..f01648c961 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5357,6 +5357,8 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 DESCR("peek at changes from replication slot"); DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,lsn,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); DESCR("peek at binary changes from replication slot"); +DATA(insert OID = 3878 ( pg_replication_slot_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 3220" "{19,3220,19,3220}" "{i,i,o,o}" "{slot_name,upto_lsn,slot_name,end_lsn}" _null_ _null_ pg_replication_slot_advance _null_ _null_ _null_ )); +DESCR("advance logical replication slot"); DATA(insert OID = 3577 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ )); DESCR("emit a textual logical decoding message"); DATA(insert OID = 3578 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ )); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index d9059e1cca..619c5f4d73 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -45,6 +45,13 @@ typedef struct LogicalDecodingContext struct ReorderBuffer *reorder; struct SnapBuild *snapshot_builder; + /* + * Marks the logical decoding context as fast forward decoding one. + * Such a context does not have plugin loaded so most of the the following + * properties are unused. + */ + bool fast_forward; + OutputPluginCallbacks callbacks; OutputPluginOptions options; @@ -97,6 +104,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, extern LogicalDecodingContext *CreateDecodingContext( XLogRecPtr start_lsn, List *output_plugin_options, + bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write,