Skip to main content

PostgreSQL Source Connector

Create a connector: Type is source, Database is PostgreSQL

Pre-condition: CDC service status is Healthy.

PostgreSQL Configuration

1. pgoutput requires changing the wal_level configuration of the Postgres cluster to logical. CDC must also be performed on the primary, rather than hot or warm replicas.

  • To check the configuration:
SHOW wal_level;
  • To apply the configuration change, run the following command on Postgres and restart the service after the change:
ALTER SYSTEM SET wal_level = 'logical';

2. PostgreSQL source connector requires at minimum the REPLICATION role.

  • If using a SuperUser account, proceed to step 5.
  • To check whether a user is a SuperUser:
SELECT rolsuper FROM pg_roles WHERE rolname = '<USER_NAME>';
  • Otherwise, create a user with the REPLICATION role:
CREATE USER <USER_NAME> WITH REPLICATION LOGIN PASSWORD '<PASSWORD>';
  1. Create a Publication:

    • Note: Perform the following operations with superuser privileges. For <PUBLICATION_NAME>, FPTCloud only accepts strings containing lowercase letters.
    • Create a Publication for all tables:
CREATE PUBLICATION <PUBLICATION_NAME> FOR ALL TABLES;
 * Check existing Publications:
SELECT * FROM pg_publication;
 * Create a Publication for specific tables:
CREATE PUBLICATION <PUBLICATION_NAME> FOR TABLE <SCHEMA1>.<TABLE1>, <SCHEMA2>.<TABLE2>, ...;
 * Add tables to the publication:
ALTER PUBLICATION <PUBLICATION_NAME> ADD TABLE <SCHEMA1>.<TABLE1>, <SCHEMA2>.<TABLE2>, ...;
 * Remove tables from the publication:
ALTER PUBLICATION <PUBLICATION_NAME> DROP TABLE <SCHEMA1>.<TABLE1>, <SCHEMA2>.<TABLE2>, ...;
 * Drop a Publication:
DROP PUBLICATION <PUBLICATION_NAME>;
  1. Grant SELECT permissions on tables for the user being used:

    • Grant SELECT on a single table:
GRANT SELECT ON TABLE '<SCHEMA_NAME>.<TABLE_NAME>' TO <USER_NAME>;
 * Or grant permissions for all tables in a schema:
DO $$
DECLARE
table_record RECORD;
BEGIN
FOR table_record IN
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '<SCHEMA_NAME>' AND table_type = 'BASE TABLE'
LOOP
EXECUTE 'GRANT SELECT ON TABLE <SCHEMA_NAME>."' || table_record.table_name || '" TO <USER_NAME>;';
END LOOP;
END $$;
  1. Change the REPLICA IDENTITY level for tables that require Capture Data Change.

    • This configuration change ensures that data change events contain complete information both before and after the change:
ALTER TABLE your_schema_name.your_table_name REPLICA IDENTITY FULL;
 * Or apply the change to all tables in a schema:
DO $$
DECLARE
table_record RECORD;
BEGIN
FOR table_record IN
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '<SCHEMA_NAME>' AND table_type = 'BASE TABLE'
LOOP
EXECUTE 'ALTER TABLE <SCHEMA_NAME>."' || table_record.table_name || '" REPLICA IDENTITY FULL;';
END LOOP;
END $$;
  1. The Connector will automatically create or reuse an existing replication_slot with the slot.name value entered from the UI , to listen to changes from the wal_log (write-ahead log).

    • Check the maximum number of replication_slots:
show max_replication_slots;
 * Check current replication_slots:
SELECT slot_name, plugin, slot_type, database, active FROM pg_replication_slots;
 * To remove an inactive replication_slot:
SELECT pg_drop_replication_slot('<REPLICATION_SLOT_NAME>');
  1. When deleting a connector, remove its replication_slot and publication:

    • Drop the replication_slot:
SELECT pg_drop_replication_slot('<REPLICATION_SLOT_NAME>');
 * Drop the publication:
DROP PUBLICATION <PUBLICATION_NAME>;
  1. To change the max_replication_slots configuration , modify this setting in the postgres.conf file.

Steps to create a connector:

To create a connector, follow these steps:

Step 1: From the menu bar, select Data Platform > Workspace Management > Workspace name.

Step 2: Under My services, select CDC service

Step 3. On the CDC service detail screen, select the Connectors tab and click Create a connector.

create-connector

Step 4 Enter the information on the Connector Information screen:

  • Name (required): Connector name. Note: The connector name may contain lowercase letters a-z or digits 0-9. Spaces are not allowed; use "-" instead of a space.

  • Type (required): Select source.

  • Database (required): Select PostgreSQL.

connector-info

Step 5: Click Next to proceed to the Properties screen and enter the following information:

  • When selecting From FPT Database Engine: - fill in the following fields:

    • Database (required): Select Database.

    • Host Name (required): Hostname or IP of the Postgres server.

    • Port (required): Postgres server port, default is 5432.

    • Database name (required): The database the Connector will listen to for data changes.

    • Username (required): Postgres user used by the Connector.

    • Password (required): Password.

    • Topic prefix (required): When data changes, change events will be produced to Kafka topics. Topic names follow the format [topic.prefix].[schema_name].[table_name]

Example: topic prefix: syncdata, schema: inventory, tables: customer, order, item. The Connector will record data changes to Kafka topics: syncdata.inventory.customer, syncdata.inventory.order, syncdata.inventory.item)

* **Slot (required):** Replication slot used by the connector; value must contain only lowercase letters.

* **Publication (required):** Publication used by the connector; value must contain only lowercase letters.
  • When selecting Manual configuration - fill in the following fields:

    • Host Name (required): Hostname or IP of the Postgres server

    • Port (required): Postgres server port, default is 5432

    • Database name (required): The database the Connector will listen to for data changes

    • Username (required): Postgres user used by the Connector

    • Password (required): Password

    • Topic prefix (required): When data changes, change events will be produced to Kafka topics. Topic names follow the format [topic.prefix].[schema_name].[table_name]. Example: topic prefix: syncdata, schema: inventory, tables: customer, order, item. The Connector will record data changes to Kafka topics: syncdata.inventory.customer, syncdata.inventory.order, syncdata.inventory.item)

    • Slot (required): Replication slot used by the connector; value must contain only lowercase letters

    • Publication (required): Publication used by the connector; value must contain only lowercase letters

manual-config

* **Enable incremental snapshot** (optional): Checkbox to enable the incremental snapshot feature for the Connector

* Only displayed for source connectors: **MySQL, MariaDB, PostgreSQL**
* When this checkbox is checked and "Test connection" is clicked, the system will verify:
* Whether the database has sufficient permissions to perform a snapshot (INSERT, CREATE TABLE permissions are required for PostgreSQL/MySQL)
* If the database lacks permissions, a detailed error message will be displayed
* If the database has sufficient permissions, "Test connection successfully" will be displayed
* After the Connector is created successfully with this checkbox checked:
* The Connector will have the incremental snapshot management feature
* The "Snapshot Status" column will be displayed in the List Connector screen
* The following operations can be performed: Execute, Pause, Resume, Stop snapshot via the Actions menu

Click Test connection to verify the connection from the Workspace to the entered Database

Step 6: Click Next to proceed to the Additional Properties screen and enter the following information:

  • Mode (required): Connector behavior. Select from the following modes:

    • Initial (default): The Connector will snapshot all existing data in the tables, then continue capturing data changes on these tables

    • Initial_only: The Connector will only snapshot all existing data in the tables, then stop listening to data change events on the tables

    • No_data: The Connector will not snapshot existing data in the tables; it will only listen to data change events on the tables

additional-properties

Click '+' to retrieve schema and table information

schema

warning

Maximum selection limit is 100 tables

Step 7: Click Next to proceed to the Review screen and verify the information.

connector-details

Step 8: Review the information and click Create to complete the connector creation.