一個示例輸出外掛可以在 PostgreSQL 原始碼樹的 contrib/test_decoding 子目錄下找到。
輸出外掛透過動態載入一個以輸出外掛名稱作為庫基礎名稱的共享庫來載入。標準的庫搜尋路徑用於定位該庫。為了提供所需的輸出外掛回撥函式並指示該庫實際上是一個輸出外掛,它需要提供一個名為 _PG_output_plugin_init 的函式。該函式會接收一個結構體,需要用各個操作的回撥函式指標來填充。
typedef struct OutputPluginCallbacks
{
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterPrepareCB filter_prepare_cb;
LogicalDecodeBeginPrepareCB begin_prepare_cb;
LogicalDecodePrepareCB prepare_cb;
LogicalDecodeCommitPreparedCB commit_prepared_cb;
LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
begin_cb、change_cb 和 commit_cb 回撥函式是必需的,而 startup_cb、truncate_cb、message_cb、filter_by_origin_cb 和 shutdown_cb 是可選的。如果 truncate_cb 未設定但需要解碼 TRUNCATE 操作,則該操作將被忽略。
輸出外掛還可以定義函式來支援大型、進行中的事務的流式傳輸。 stream_start_cb、stream_stop_cb、stream_abort_cb、stream_commit_cb 和 stream_change_cb 是必需的,而 stream_message_cb 和 stream_truncate_cb 是可選的。如果輸出外掛也支援兩階段提交,則 stream_prepare_cb 也是必需的。
輸出外掛還可以定義函式來支援兩階段提交,這允許在 PREPARE TRANSACTION 時進行解碼。 begin_prepare_cb、prepare_cb、commit_prepared_cb 和 rollback_prepared_cb 回撥函式是必需的,而 filter_prepare_cb 是可選的。如果輸出外掛也支援大型進行中事務的流式傳輸,則 stream_prepare_cb 也是必需的。
為了解碼、格式化和輸出更改,輸出外掛可以使用後端的大部分正常基礎設施,包括呼叫輸出函式。允許只讀訪問關係,只要訪問的關係是 initdb 在 pg_catalog 模式下建立的,或者使用以下方式標記為使用者提供的目錄表:
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
請注意,在輸出外掛中訪問使用者目錄表或常規系統目錄表必須僅透過 systable_* 掃描 API 進行。透過 heap_* 掃描 API 訪問將導致錯誤。此外,任何導致事務 ID 分配的操作都是禁止的。這包括寫入表、執行 DDL 更改以及呼叫 pg_current_xact_id()。
輸出外掛回撥函式可以以幾乎任意格式將資料傳遞給消費者。對於某些用例,例如透過 SQL 檢視更改,以可以包含任意資料的資料型別(例如 bytea)返回資料會很麻煩。如果輸出外掛僅輸出伺服器編碼的文字資料,則可以透過在 startup callback 中將 OutputPluginOptions.output_type 設定為 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 而不是 OUTPUT_PLUGIN_BINARY_OUTPUT 來宣告這一點。在這種情況下,所有資料都必須是伺服器編碼,以便 text 資料型別可以包含它。這將在啟用斷言的版本中進行檢查。
輸出外掛透過它需要提供的各種回撥函式獲得關於正在發生的更改的通知。
併發事務按提交順序解碼,並且僅在 begin 和 commit 回撥函式之間解碼屬於特定事務的更改。顯式或隱式回滾的事務永遠不會被解碼。成功的儲存點按照它們在事務中執行的順序摺疊到包含它們的事務中。使用 PREPARE TRANSACTION 準備提交的兩階段事務也會被解碼,前提是提供瞭解碼它們的輸出外掛回撥函式。當前正在解碼的已準備事務可能因為併發執行 ROLLBACK PREPARED 命令而被中止。在這種情況下,此事務的邏輯解碼也將被中止。一旦檢測到中止並呼叫了 prepare_cb 回撥函式,就會跳過此類事務的所有更改。因此,即使發生併發中止,也會向輸出外掛提供足夠的資訊,以便它能夠正確處理 ROLLBACK PREPARED。
只有已安全重新整理到磁碟的事務才會被解碼。當 synchronous_commit 設定為 off 時,這可能導致在直接呼叫的 pg_logical_slot_get_changes() 中 COMMIT 未立即被解碼。
可選的 startup_cb 回撥函式在建立複製槽或請求流式傳輸更改時被呼叫,而不管有多少更改已準備好輸出。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
OutputPluginOptions *options,
bool is_init);
is_init 引數在建立複製槽時為 true,否則為 false。options 指向一個輸出外掛可以設定的選項結構。
typedef struct OutputPluginOptions
{
OutputPluginOutputType output_type;
bool receive_rewrites;
} OutputPluginOptions;
output_type 必須設定為 OUTPUT_PLUGIN_TEXTUAL_OUTPUT 或 OUTPUT_PLUGIN_BINARY_OUTPUT。另請參閱 Section 47.6.3。如果 receive_rewrites 為 true,則輸出外掛在某些 DDL 操作期間對堆重寫所做的更改也會被呼叫。這些對於處理 DDL 複製的外掛很有用,但需要特殊處理。
startup 回撥函式應驗證 ctx->output_plugin_options 中的選項。如果輸出外掛需要狀態,它可以使用 ctx->output_plugin_private 來儲存它。
可選的 shutdown_cb 回撥函式在曾經活躍的複製槽不再使用時被呼叫,可用於釋放輸出外掛私有的資源。槽不一定會被刪除,只是停止流式傳輸。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
必需的 begin_cb 回撥函式在解碼已提交事務的開始時被呼叫。中止的事務及其內容永遠不會被解碼。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
txn 引數包含關於事務的元資訊,例如它提交的時間戳及其 XID。
必需的 commit_cb 回撥函式在解碼事務提交時被呼叫。如果存在已修改的行,則在該函式呼叫之前,所有已修改行的 change_cb 回撥函式已被呼叫。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
必需的 change_cb 回撥函式針對事務中的每個單獨的行修改被呼叫,無論是 INSERT、UPDATE 還是 DELETE。即使原始命令一次修改了多行,該回調函式也會為每一行單獨呼叫。change_cb 回撥函式可以訪問系統或使用者目錄表,以協助輸出行修改的詳細資訊。在解碼已準備(但尚未提交)的事務或未提交事務時,由於該事務的同步回滾,此更改回調函式也可能出錯。在這種情況下,此中止事務的邏輯解碼將優雅地停止。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
ctx 和 txn 引數的內容與 begin_cb 和 commit_cb 回撥函式相同,但此外,關係描述符 relation 指向該行所屬的關係,並且一個描述行修改的結構 change 被傳入。
只有使用者定義的表中非未記錄(參見 UNLOGGED)且非臨時(參見 TEMPORARY 或 TEMP)的表中的更改才能使用邏輯解碼提取。
可選的 truncate_cb 回撥函式在執行 TRUNCATE 命令時被呼叫。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
引數類似於 change_cb 回撥函式。但是,由於透過外部索引鍵連線的表上的 TRUNCATE 操作需要一起執行,因此此回撥函式接收一個關係陣列而不是單個關係。有關詳細資訊,請參閱 TRUNCATE 語句的描述。
呼叫可選的 filter_by_origin_cb 回撥函式以確定來自 origin_id 的已重放資料是否對輸出外掛感興趣。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
RepOriginId origin_id);
ctx 引數的內容與其他回撥函式相同。只能獲得源資訊。要表示來自傳入節點的更改無關緊要,請返回 true,從而過濾掉它們;否則返回 false。其他回撥函式不會為已被過濾掉的事務和更改呼叫。
這對於實現級聯或多向複製解決方案很有用。按源過濾允許在這些設定中防止相同的更改來回複製。雖然事務和更改也包含源資訊,但透過此回撥函式進行過濾效率明顯更高。
可選的 message_cb 回撥函式在解碼邏輯解碼訊息時被呼叫。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
txn 引數包含關於事務的元資訊,例如它提交的時間戳及其 XID。但請注意,當訊息非事務性且在記錄訊息的事務中尚未分配 XID 時,它可以為 NULL。lsn 是訊息的 WAL 位置。transactional 表示訊息是作為事務性發送還是非事務性發送。與更改回調函式類似,在解碼已準備(但尚未提交)的事務或未提交事務時,由於該事務的同步回滾,此訊息回撥函式也可能出錯。在這種情況下,此中止事務的邏輯解碼將優雅地停止。prefix 是一個任意的 null 終止字首,可用於識別當前外掛感興趣的訊息。最後,message 引數包含 message_size 大小的實際訊息。
應格外小心,確保輸出外掛認為有趣的prefix是唯一的。使用副檔名或輸出外掛本身的名稱通常是一個不錯的選擇。
可選的 filter_prepare_cb 回撥函式用於確定當前兩階段提交事務中的資料是在準備階段解碼,還是在 COMMIT PREPARED 時作為常規單階段事務解碼。要指示跳過解碼,請返回 true;否則返回 false。當未定義回撥函式時,假定為 false(即不進行過濾,所有使用兩階段提交的事務也以兩階段進行解碼)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
ctx 引數的內容與其他回撥函式相同。xid 和 gid 引數提供了兩種不同的事務標識方式。稍後的 COMMIT PREPARED 或 ROLLBACK PREPARED 會攜帶這兩個識別符號,讓輸出外掛可以選擇使用哪個。
對於每個事務,該回調函式可能會被呼叫多次以進行解碼,並且每次呼叫時都必須為給定的 xid 和 gid 對提供相同的靜態答案。
必需的 begin_prepare_cb 回撥函式在解碼已準備事務的開始時被呼叫。gid 欄位包含在 txn 引數中,可以在此回撥函式中使用,以檢查外掛是否已收到此 PREPARE,在這種情況下,它可以出錯或跳過該事務的剩餘更改。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
必需的 prepare_cb 回撥函式在解碼已準備提交的兩階段事務時被呼叫。如果存在已修改的行,則在呼叫此函式之前,所有已修改行的 change_cb 回撥函式已被呼叫。gid 欄位包含在 txn 引數中,可以在此回撥函式中使用。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
必需的 commit_prepared_cb 回撥函式在解碼事務 COMMIT PREPARED 時被呼叫。gid 欄位包含在 txn 引數中,可以在此回撥函式中使用。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
必需的 rollback_prepared_cb 回撥函式在解碼事務 ROLLBACK PREPARED 時被呼叫。gid 欄位包含在 txn 引數中,可以在此回撥函式中使用。prepare_end_lsn 和 prepare_time 引數可用於檢查外掛是否收到了此 PREPARE TRANSACTION,在這種情況下,它可以應用回滾,否則,它可以跳過回滾操作。gid 本身不足以區分,因為下游節點可能有一個具有相同識別符號的已準備事務。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time);
必需的 stream_start_cb 回撥函式在從進行中的事務中開啟一個流式更改塊時被呼叫。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
必需的 stream_stop_cb 回撥函式在關閉從進行中的事務中流式傳輸的更改塊時被呼叫。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
必需的 stream_abort_cb 回撥函式用於中止先前已流式傳輸的事務。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
stream_prepare_cb 回撥函式在準備先前已流式傳輸的事務作為兩階段提交的一部分時被呼叫。當輸出外掛同時支援大型進行中事務的流式傳輸和兩階段提交時,此回撥函式是必需的。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
必需的 stream_commit_cb 回撥函式在提交先前已流式傳輸的事務時被呼叫。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
必需的 stream_change_cb 回撥函式在流式更改塊(由 stream_start_cb 和 stream_stop_cb 呼叫分隔)中傳送更改時被呼叫。實際更改不會顯示,因為事務稍後可能會中止,而我們不會解碼已中止事務的更改。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
可選的 stream_message_cb 回撥函式在流式更改塊(由 stream_start_cb 和 stream_stop_cb 呼叫分隔)中傳送通用訊息時被呼叫。事務性訊息的內容不會顯示,因為事務稍後可能會中止,而我們不會解碼已中止事務的更改。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size message_size,
const char *message);
可選的 stream_truncate_cb 回撥函式在流式更改塊(由 stream_start_cb 和 stream_stop_cb 呼叫分隔)中的 TRUNCATE 命令時被呼叫。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
引數類似於 stream_change_cb 回撥函式。但是,由於透過外部索引鍵連線的表上的 TRUNCATE 操作需要一起執行,因此此回撥函式接收一個關係陣列而不是單個關係。有關詳細資訊,請參閱 TRUNCATE 語句的描述。
要實際生成輸出,輸出外掛可以在 begin_cb、commit_cb 或 change_cb 回撥函式中使用 ctx->out 中的 StringInfo 輸出緩衝區寫入資料。在寫入輸出緩衝區之前,必須呼叫 OutputPluginPrepareWrite(ctx, last_write),並在完成寫入緩衝區後,必須呼叫 OutputPluginWrite(ctx, last_write) 來執行寫入。last_write 指示特定的寫入是否是回撥的最後一次寫入。
以下示例展示瞭如何向輸出外掛的消費者輸出資料
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);
如果您在文件中發現任何不正確、與您的實際體驗不符或需要進一步澄清的內容,請使用 此表格 報告文件問題。