diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 780120d731..838483ee5c 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -603,7 +603,7 @@ SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn ------------+--------+-----------+--------+----------+--------+------+--------------+------------- + slot_name | plugin | slot_type | datoid | database | active | active_in | xmin | catalog_xmin | restart_lsn +-----------+--------+-----------+--------+----------+--------+-----------+------+--------------+------------- (0 rows) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index d0b78f2782..98847354bd 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5400,6 +5400,16 @@ True if this slot is currently actively being used + + active_in + integer + + The process ID of the session using this slot if the slot + is currently actively being used. NULL if + inactive. + + + xmin xid diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 3650567852..0810a2d1f9 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -62,10 +62,10 @@ postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', ' regression_slot | 0/16B1970 (1 row) -postgres=# SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn ------------------+---------------+-----------+--------+----------+--------+--------+--------------+------------- - regression_slot | test_decoding | logical | 12052 | postgres | f | | 684 | 0/16A4408 +postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots; + slot_name | plugin | slot_type | database | active | restart_lsn +-----------------+---------------+-----------+----------+--------+------------- + regression_slot | test_decoding | logical | postgres | f | 0/16A4408 (1 row) postgres=# -- There are no changes to see yet diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index a4fd88fa98..75ff231de5 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -665,6 +665,7 @@ CREATE VIEW pg_replication_slots AS L.datoid, D.datname AS database, L.active, + L.active_in, L.xmin, L.catalog_xmin, L.restart_lsn diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 16ea80bc3a..fa1f07b3f3 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -262,7 +262,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * be doing that. So it's safe to initialize the slot. */ Assert(!slot->in_use); - Assert(!slot->active); + Assert(slot->active_pid == 0); slot->data.persistency = persistency; slot->data.xmin = InvalidTransactionId; slot->effective_xmin = InvalidTransactionId; @@ -291,8 +291,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, volatile ReplicationSlot *vslot = slot; SpinLockAcquire(&slot->mutex); - Assert(!vslot->active); - vslot->active = true; + Assert(vslot->active_pid == 0); + vslot->active_pid = MyProcPid; SpinLockRelease(&slot->mutex); MyReplicationSlot = slot; } @@ -314,7 +314,7 @@ ReplicationSlotAcquire(const char *name) { ReplicationSlot *slot = NULL; int i; - bool active = false; + int active_pid = 0; Assert(MyReplicationSlot == NULL); @@ -331,8 +331,9 @@ ReplicationSlotAcquire(const char *name) volatile ReplicationSlot *vslot = s; SpinLockAcquire(&s->mutex); - active = vslot->active; - vslot->active = true; + active_pid = vslot->active_pid; + if (active_pid == 0) + vslot->active_pid = MyProcPid; SpinLockRelease(&s->mutex); slot = s; break; @@ -345,10 +346,11 @@ ReplicationSlotAcquire(const char *name) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); - if (active) + if (active_pid != 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("replication slot \"%s\" is already active", name))); + errmsg("replication slot \"%s\" is already active for pid %d", + name, active_pid))); /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; @@ -363,7 +365,7 @@ ReplicationSlotRelease(void) { ReplicationSlot *slot = MyReplicationSlot; - Assert(slot != NULL && slot->active); + Assert(slot != NULL && slot->active_pid != 0); if (slot->data.persistency == RS_EPHEMERAL) { @@ -380,7 +382,7 @@ ReplicationSlotRelease(void) volatile ReplicationSlot *vslot = slot; SpinLockAcquire(&slot->mutex); - vslot->active = false; + vslot->active_pid = 0; SpinLockRelease(&slot->mutex); } @@ -460,7 +462,7 @@ ReplicationSlotDropAcquired(void) bool fail_softly = slot->data.persistency == RS_EPHEMERAL; SpinLockAcquire(&slot->mutex); - vslot->active = false; + vslot->active_pid = 0; SpinLockRelease(&slot->mutex); ereport(fail_softly ? WARNING : ERROR, @@ -477,7 +479,7 @@ ReplicationSlotDropAcquired(void) * scanning the array. */ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); - slot->active = false; + slot->active_pid = 0; slot->in_use = false; LWLockRelease(ReplicationSlotControlLock); @@ -749,7 +751,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) /* count slots with spinlock held */ SpinLockAcquire(&s->mutex); (*nslots)++; - if (s->active) + if (s->active_pid != 0) (*nactive)++; SpinLockRelease(&s->mutex); } @@ -1227,7 +1229,7 @@ RestoreSlotFromDisk(const char *name) slot->candidate_restart_valid = InvalidXLogRecPtr; slot->in_use = true; - slot->active = false; + slot->active_pid = 0; restored = true; break; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index f31925d862..3d9aadbd83 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 8 +#define PG_GET_REPLICATION_SLOTS_COLS 9 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -206,7 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) TransactionId xmin; TransactionId catalog_xmin; XLogRecPtr restart_lsn; - bool active; + pid_t active_pid; Oid database; NameData slot_name; NameData plugin; @@ -227,7 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) namecpy(&slot_name, &slot->data.name); namecpy(&plugin, &slot->data.plugin); - active = slot->active; + active_pid = slot->active_pid; } SpinLockRelease(&slot->mutex); @@ -251,7 +251,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else values[i++] = database; - values[i++] = BoolGetDatum(active); + values[i++] = BoolGetDatum(active_pid != 0); + + if (active_pid != 0) + values[i++] = Int32GetDatum(active_pid); + else + nulls[i++] = true; if (xmin != InvalidTransactionId) values[i++] = TransactionIdGetDatum(xmin); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 619d9969ca..de18a5ad3c 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5106,7 +5106,7 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_in,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); DESCR("set up a logical replication slot"); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a4001360c4..78cff07abf 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -84,8 +84,8 @@ typedef struct ReplicationSlot /* is this slot defined */ bool in_use; - /* is somebody streaming out changes for this slot */ - bool active; + /* Who is streaming out changes for this slot? 0 in unused slots. */ + pid_t active_pid; /* any outstanding modifications? */ bool just_dirtied; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 71fa44a6be..1577ff0e29 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1396,10 +1396,11 @@ pg_replication_slots| SELECT l.slot_name, l.datoid, d.datname AS database, l.active, + l.active_in, l.xmin, l.catalog_xmin, l.restart_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, xmin, catalog_xmin, restart_lsn) + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, active_in, xmin, catalog_xmin, restart_lsn) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper,