-
Notifications
You must be signed in to change notification settings - Fork 26
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Summary
If the source DB has table that contains -infinity
value in the timestamp type column - pgstream reports about successful snapshot creation, but no data is created in the destination DB and an error appears in postgresql logs:
ERROR: COPY from stdin failed: unable to encode -1 into binary format for timestamp (OID 1114): cannot find encode plan"
Relevant Information
pgstream version: 0.7.6
Steps to reproduce
- Create a database.
- Create any schema and any table within it. The table should contain timestamp type column and at least one row in the table should have '-infinity' value in such column.
In my case I've tried to snapshot&replicate this table:
simple_inventory_db=> select * from inventory.categories;
category_id | category_name | description | created_at | active | last_updated_at
-------------+-----------------+---------------------------------------+----------------------------+--------+-----------------
3 | Furniture | Category for Furniture products | 2025-07-21 10:51:58.166312 | t |
4 | Kitchen | Category for Kitchen products | 2025-07-21 10:51:58.166312 | t |
5 | Tools | Category for Tools products | 2025-07-21 10:51:58.166312 | f |
6 | Outdoor | Category for Outdoor products | 2025-07-21 10:51:58.166312 | t |
7 | Storage | Category for Storage products | 2025-07-21 10:51:58.166312 | t |
8 | Cleaning | Category for Cleaning products | 2025-07-21 10:51:58.166312 | t |
9 | Lighting | Category for Lighting products | 2025-07-21 10:51:58.166312 | t |
2 | Office Supplies | Category for Office Supplies products | 2025-07-21 10:51:58.166312 | f |
10 | Decor | Category for Decor products | 2025-07-21 10:51:58.166312 | t | -infinity
1 | Electronics | Category for Electronics products | 2025-07-21 10:51:58.166312 | t | -infinity
(10 rows)
- Create a configuration file (config.yaml). Here is an example using custom database, schema, and table names:
source:
postgres:
url: "postgres://<masked>:<masked>@<masked>:5432/simple_inventory_db?sslmode=require"
mode: snapshot_and_replication # options are replication, snapshot or snapshot_and_replication
replication:
replication_slot: pgstream_slot
snapshot: # when mode is snapshot or snapshot_and_replication
mode: full # options are data_and, schema or data
tables:
- "inventory.categories"
recorder:
repeatable_snapshots: true # whether to repeat snapshots that have already been taken
postgres_url: "postgres://<masked>:<masked>@<masked>:5432/simple_inventory_db?sslmode=require" # URL of the database where the snapshot status is recorded
snapshot_workers: 4 # number of schemas to be snapshotted in parallel
data: # when mode is full or data
schema_workers: 4 # number of schema tables to be snapshotted in parallel
table_workers: 4 # number of workers to snapshot a table in parallel
batch_bytes: 83886080 # bytes to read per batch (defaults to 80MiB)
schema: # when mode is full or schema
mode: pgdump_pgrestore # options are pgdump_pgrestore or schemalog
pgdump_pgrestore:
clean_target_db: true # whether to clean the target database before restoring
target:
postgres:
url: "postgres://<masked>:<masked>@<masked>:5432/simple_inventory_db?sslmode=require"
batch:
timeout: 5000 # batch timeout in milliseconds
size: 25 # number of messages in a batch
disable_triggers: true # whether to disable triggers on the target database
on_conflict_action: "nothing" # options are update, nothing or error
modifiers:
filter: # one of include_tables or exclude_tables
include_tables: # list of tables for which events should be allowed. Tables should be schema qualified. If no schema is provided, the public schema will be assumed. Wildcards "*" are supported.
- "inventory.categories"
- Execute pgstream:
pgstream init -c config.yaml && \
pgstream run -c config.yaml --log-level trace
- Wait until snapshot is created and replication is started
using config file: config.yaml
2025-07-22T13:33:31.917539681Z INF logger.go:37 > postgres processor configured
2025-07-22T13:33:31.917951218Z INF logger.go:37 > adding filtering to processor...
2025-07-22T13:33:31.918014174Z INF logger.go:37 > postgres listener configured
2025-07-22T13:33:31.918064839Z INF logger.go:37 > initial snapshot enabled
2025-07-22T13:33:31.918104492Z INF logger.go:37 > postgres processor configured
2025-07-22T13:33:31.91816418Z INF logger.go:37 > postgres bulk ingest writer enabled
2025-07-22T13:33:31.91834049Z INF logger.go:37 > adding filtering to processor...
2025-07-22T13:33:31.96745377Z INF logger.go:37 > starting listener...
2025-07-22T13:33:31.989910739Z INF logger.go:37 > creating schema snapshot module=postgres_schema_snapshot_generator schemaTables={"inventory":["categories"]}
2025-07-22T13:33:32.005934376Z DBG logger.go:33 > dumping schema module=postgres_schema_snapshot_generator pg_dump_options=["postgres://<masked>:<masked>@<masked>:5432/simple_inventory_db?sslmode=require","-Fp","--schema-only","--schema=inventory","--exclude-table=\"inventory\".\"products\"","--exclude-table=\"inventory\".\"transactions\"","--clean","--if-exists"] schema_tables={"inventory":["categories"]}
2025-07-22T13:33:32.158629986Z DBG logger.go:33 > dumping sequence data module=postgres_schema_snapshot_generator pg_dump_options=["postgres://<masked>:<masked>@<masked>:5432/simple_inventory_db?sslmode=require","-Fp","--data-only","--table=\"inventory\".\"categories_category_id_seq\"","--table=\"inventory\".\"products_product_id_seq\"","--table=\"inventory\".\"transactions_transaction_id_seq\""] sequences=["\"inventory\".\"categories_category_id_seq\"","\"inventory\".\"products_product_id_seq\"","\"inventory\".\"transactions_transaction_id_seq\""]
2025-07-22T13:33:32.489197561Z INF logger.go:37 > creating data snapshot module=postgres_data_snapshot_generator schema=inventory tables=["categories"]
2025-07-22T13:33:32.503173191Z DBG logger.go:33 > querying total bytes for schema module=postgres_data_snapshot_generator query="SELECT SUM(pg_table_size(c.oid)) FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = 'inventory' AND c.relname IN ($1) AND c.relkind = 'r';" schema=inventory snapshotID=000000CA-00003482-1 tables=["categories"]
[inventory] Snapshotting data... 0% [ ] ( 0 B/16 kB) [0s:0s]2025-07-22T13:33:32.528182372Z DBG logger.go:33 > snapshotting table module=postgres_data_snapshot_generator schema=inventory snapshotID=000000CA-00003482-1 table=categories
2025-07-22T13:33:32.539461385Z DBG logger.go:33 > table page count: 0, missed pages: 0, batch page size: 1 module=postgres_data_snapshot_generator schema=inventory snapshotID=000000CA-00003482-1 table=categories
2025-07-22T13:33:32.540988811Z DBG logger.go:33 > querying table page range 0-1 module=postgres_data_snapshot_generator schema=inventory snapshotID=000000CA-00003482-1 table=categories
2025-07-22T13:33:32.563242872Z TRC logger.go:29 > batching query args=[3,"Furniture","Category for Furniture products","2025-07-21T10:51:58.166312Z",true,null] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
2025-07-22T13:33:32.563376422Z DBG logger.go:33 > creating new batch sender module=postgres_bulk_ingest_writer schema=inventory table=categories
2025-07-22T13:33:32.563526462Z TRC logger.go:29 > batching query args=[4,"Kitchen","Category for Kitchen products","2025-07-21T10:51:58.166312Z",true,null] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
2025-07-22T13:33:32.563641619Z TRC logger.go:29 > batching query args=[5,"Tools","Category for Tools products","2025-07-21T10:51:58.166312Z",false,null] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
2025-07-22T13:33:32.563753635Z TRC logger.go:29 > batching query args=[6,"Outdoor","Category for Outdoor products","2025-07-21T10:51:58.166312Z",true,null] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
2025-07-22T13:33:32.563947529Z TRC logger.go:29 > batching query args=[7,"Storage","Category for Storage products","2025-07-21T10:51:58.166312Z",true,null] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
2025-07-22T13:33:32.564105327Z TRC logger.go:29 > batching query args=[8,"Cleaning","Category for Cleaning products","2025-07-21T10:51:58.166312Z",true,null] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
2025-07-22T13:33:32.564241959Z TRC logger.go:29 > batching query args=[9,"Lighting","Category for Lighting products","2025-07-21T10:51:58.166312Z",true,null] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
2025-07-22T13:33:32.564381294Z TRC logger.go:29 > batching query args=[2,"Office Supplies","Category for Office Supplies products","2025-07-21T10:51:58.166312Z",false,null] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
2025-07-22T13:33:32.564558592Z TRC logger.go:29 > batching query args=[10,"Decor","Category for Decor products","2025-07-21T10:51:58.166312Z",true,-1] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
2025-07-22T13:33:32.564693511Z TRC logger.go:29 > batching query args=[1,"Electronics","Category for Electronics products","2025-07-21T10:51:58.166312Z",true,-1] commit_position=0/0 module=postgres_bulk_ingest_writer schema=inventory sql="INSERT INTO \"inventory\".\"categories\"(\"category_id\", \"category_name\", \"description\", \"created_at\", \"active\", \"last_updated_at\") OVERRIDING SYSTEM VALUE VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING" table=categories
[inventory] Snapshotting data... 0% [ ] ( 0 B/16 kB) [0s:0s]2025-07-22T13:33:32.564897584Z DBG logger.go:33 > 10 rows processed module=postgres_data_snapshot_generator schema=inventory snapshotID=000000CA-00003482-1 table=categories
2025-07-22T13:33:32.565466915Z DBG logger.go:33 > table snapshot completed module=postgres_data_snapshot_generator schema=inventory snapshotID=000000CA-00003482-1 table=categories
[inventory] Snapshotting data... 100% [====================] (16/16 kB, 436 kB/s) [0s]
2025-07-22T13:33:32.565549647Z DBG logger.go:33 > context terminated, draining in flight batch module=postgres_bulk_ingest_writer
2025-07-22T13:33:32.565687257Z DBG logger.go:33 > bulk writing batch batch_size=10 module=postgres_bulk_ingest_writer
2025-07-22T13:33:32.566560168Z DBG logger.go:33 > closing bulk ingest writer module=postgres_bulk_ingest_writer
2025-07-22T13:33:32.566668871Z TRC logger.go:29 > closing batch sender module=postgres_bulk_ingest_writer
2025-07-22T13:33:32.585797083Z TRC logger.go:29 > batch sender closed module=postgres_bulk_ingest_writer
2025-07-22T13:33:32.586423198Z INF logger.go:37 > restoring schema indices and constraints module=postgres_schema_snapshot_generator schemaTables={"inventory":["categories"]}
2025-07-22T13:33:32.723932248Z INF logger.go:37 > snapshot generation completed duration=740.834905ms err=null module=snapshot_generator_adapter
2025-07-22T13:33:32.724065312Z TRC logger.go:29 > set start LSN db_name=simple_inventory_db module=postgres_replication_handler position=2B0/8C0BFAF8 slot_name=pgstream_slot system_id=7480911317629857806
2025-07-22T13:33:32.726225095Z INF logger.go:37 > logical replication started db_name=simple_inventory_db module=postgres_replication_handler position=2957287095032 slot_name=pgstream_slot system_id=7480911317629857806
2025-07-22T13:33:32.726345166Z TRC logger.go:29 > stored new LSN position module=postgres_replication_handler position=2B0/8C0BFAF8
2025-07-22T13:33:32.726535956Z TRC logger.go:29 > module=wal_postgres_listener server_time=2025-07-22T13:33:32.72613Z wal_data= wal_end=2B0/8C0BFAF8
2025-07-22T13:33:32.731026751Z TRC logger.go:29 > module=wal_postgres_listener server_time=2025-07-22T13:33:32.730733Z wal_data="{\"action\":\"I\",\"timestamp\":\"2025-07-22 13:33:31.984935+00\",\"lsn\":\"2B0/8C0C9B90\",\"schema\":\"pgstream\",\"table\":\"snapshot_requests\",\"columns\":[{\"name\":\"req_id\",\"type\":\"integer\",\"value\":1},{\"name\":\"schema_name\",\"type\":\"text\",\"value\":\"inventory\"},{\"name\":\"table_names\",\"type\":\"text[]\",\"value\":\"{categories}\"},{\"name\":\"created_at\",\"type\":\"timestamp with time zone\",\"value\":\"2025-07-22 13:33:31.984232+00\"},{\"name\":\"updated_at\",\"type\":\"timestamp with time zone\",\"value\":\"2025-07-22 13:33:31.984232+00\"},{\"name\":\"status\",\"type\":\"text\",\"value\":\"requested\"},{\"name\":\"errors\",\"type\":\"jsonb\",\"value\":null}]}" wal_end=2B0/8C0C9DE1
2025-07-22T13:33:32.745471798Z TRC logger.go:29 > skipping event module=wal_filter schema=pgstream table=snapshot_requests
- Check the destination DB - you'll see that pgstream created a schema, but the table is empty:
simple_inventory_db=> select * from inventory.categories;
category_id | category_name | description | created_at | active | last_updated_at
-------------+---------------+-------------+------------+--------+-----------------
(0 rows)
- Check postgresql logs - you'll find the error:
ERROR: COPY from stdin failed: unable to encode -1 into binary format for timestamp (OID 1114): cannot find encode plan"
- Check that pg_dump/pg_restore tools can process mentioned table without any issues:
$ pg_dump --format c -h $SRC_DB_ADDR -U $DB_USER -d $DB_NAME --table inventory.categories --verbose -f db.dump
pg_dump: last built-in OID is 16383
pg_dump: reading extensions
pg_dump: identifying extension members
pg_dump: reading schemas
pg_dump: reading user-defined tables
pg_dump: reading user-defined functions
pg_dump: reading user-defined types
pg_dump: reading procedural languages
pg_dump: reading user-defined aggregate functions
pg_dump: reading user-defined operators
pg_dump: reading user-defined access methods
pg_dump: reading user-defined operator classes
pg_dump: reading user-defined operator families
pg_dump: reading user-defined text search parsers
pg_dump: reading user-defined text search templates
pg_dump: reading user-defined text search dictionaries
pg_dump: reading user-defined text search configurations
pg_dump: reading user-defined foreign-data wrappers
pg_dump: reading user-defined foreign servers
pg_dump: reading default privileges
pg_dump: reading user-defined collations
pg_dump: reading user-defined conversions
pg_dump: reading type casts
pg_dump: reading transforms
pg_dump: reading table inheritance information
pg_dump: reading event triggers
pg_dump: finding extension tables
pg_dump: finding inheritance relationships
pg_dump: reading column info for interesting tables
pg_dump: finding table default expressions
pg_dump: flagging inherited columns in subtables
pg_dump: reading partitioning data
pg_dump: reading indexes
pg_dump: flagging indexes in partitioned tables
pg_dump: reading extended statistics
pg_dump: reading constraints
pg_dump: reading triggers
pg_dump: reading rewrite rules
pg_dump: reading policies
pg_dump: reading row-level security policies
pg_dump: reading publications
pg_dump: reading publication membership of tables
pg_dump: reading publication membership of schemas
pg_dump: reading subscriptions
pg_dump: reading subscription membership of tables
pg_dump: reading dependency data
pg_dump: saving encoding = UTF8
pg_dump: saving "standard_conforming_strings = on"
pg_dump: saving "search_path = "
pg_dump: saving database definition
pg_dump: dumping contents of table "inventory.categories"
$ pg_restore -h $DST_DB_ADDR -U $DB_USER -d $DB_NAME --verbose db.dump
pg_restore: connecting to database for restore
pg_restore: creating TABLE "inventory.categories"
pg_restore: creating SEQUENCE "inventory.categories_category_id_seq"
pg_restore: creating SEQUENCE OWNED BY "inventory.categories_category_id_seq"
pg_restore: creating DEFAULT "inventory.categories category_id"
pg_restore: processing data for table "inventory.categories"
pg_restore: executing SEQUENCE SET categories_category_id_seq
pg_restore: creating CONSTRAINT "inventory.categories categories_pkey"
$ psql -h $DST_DB_ADDR -U $DB_USER -d $DB_NAME
psql (17.5)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, compression: off, ALPN: postgresql)
Type "help" for help.
simple_inventory_db=> select * from inventory.categories;
category_id | category_name | description | created_at | active | last_updated_at
-------------+-----------------+---------------------------------------+----------------------------+--------+-----------------
3 | Furniture | Category for Furniture products | 2025-07-21 10:51:58.166312 | t |
4 | Kitchen | Category for Kitchen products | 2025-07-21 10:51:58.166312 | t |
5 | Tools | Category for Tools products | 2025-07-21 10:51:58.166312 | f |
6 | Outdoor | Category for Outdoor products | 2025-07-21 10:51:58.166312 | t |
7 | Storage | Category for Storage products | 2025-07-21 10:51:58.166312 | t |
8 | Cleaning | Category for Cleaning products | 2025-07-21 10:51:58.166312 | t |
9 | Lighting | Category for Lighting products | 2025-07-21 10:51:58.166312 | t |
2 | Office Supplies | Category for Office Supplies products | 2025-07-21 10:51:58.166312 | f |
10 | Decor | Category for Decor products | 2025-07-21 10:51:58.166312 | t | -infinity
1 | Electronics | Category for Electronics products | 2025-07-21 10:51:58.166312 | t | -infinity
(10 rows)
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working