时间:2021-07-01 10:21:17 帮助过:12人阅读
首先 PostgreSQL服务器必须正确配置才能够支持逻辑解码︰
wal_level = ‘logical‘ # one per database needed on (provider/subscriber)provider node max_worker_processes = 10 # one per node needed on provider node max_replication_slots = 10 # one per node needed on provider node max_wal_senders = 10 shared_preload_libraries = ‘pglogical‘
如果你想要处理解决与上一次/第一次更新之间的冲突 wins(参阅冲突章节), 你的数据库版本需要为PostgreSQL 9.5+ (在9.4中无效) 您可以向 PostgreSQL.conf 添加此额外的选项:
# needed for last/first update wins conflict resolution property available in Postgre track_commit_timestamp = on
pg_hba.conf 需要配置成允许从本地主机复制,用户拥有有复制权限,连接权限;并重启数据库服务
host replication postgres 网段ip/24 trust
在所有节点上所对应数据库安装pglogical拓展模块:
CREATE EXTENSION pglogical;
现有实验环境
数据库版本 | IP | 角色 |
---|---|---|
psql (PostgreSQL) 9.6.0 | 192.168.1.221 | provider |
psql (PostgreSQL) 10.5 | 192.168.1.235 | subscriber |
服务器时间同步(主备库都需操作)
echo "*/20 * * * * /usr/sbin/ntpdate -u ntp.api.bz >/dev/null" >> /var/spool/cron/root
在一个数据库里创建提供者节点
# 创建节点 SELECT pglogical.create_node( node_name := ‘provider1‘, dsn := ‘host=192.168.1.221 port=5432 dbname=lottu‘ );
将public架构中的所有表添加到default复制集中
SELECT pglogical.replication_set_add_all_tables(‘default‘, ARRAY[‘public‘]);
复制集default的表都必需要primary key
在另一个数据库创建订阅者节点
SELECT pglogical.create_node( node_name := ‘subscriber1‘, dsn := ‘host=192.168.1.235 port=5432 dbname=lottu‘ );
订阅提供者节点,该订阅将在后台启动同步和复制过程
SELECT pglogical.create_subscription( subscription_name := ‘subscription1‘, provider_dsn := ‘host=192.168.1.221 port=5432 dbname=lottu‘ );
前面我们已经完成安装/配置 pglogical 操作。
create table tbl_lottu01(id int primary key, name text, reg_time timestamp);
由于需要验证insert/update/delete/truncate操作是否同步;所以创建的表要有主键。当然只对发布者必须要主键约束。
lottu=# insert into tbl_lottu01 select generate_series(1,10000),‘lottu‘,now(); INSERT 0 10000
对新建的表;并没有为其分配对应的复制集;需要手动添加。当然可以利用触发器自动添加;后续补充。
lottu=# select * from pglogical.replication_set_table ; set_id | set_reloid | set_att_list | set_row_filter --------+------------+--------------+---------------- (0 rows)
前面讲解创建复制集中;3.2.2中“将public架构中的所有表添加到default复制集中”
SELECT pglogical.replication_set_add_all_tables(‘default‘, ARRAY[‘public‘]);
将表添加到对应的复制集中;详细介绍可以查看前面文档。
pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text)
两种方法都可以;我们采用第二种方法。
lottu=# select pglogical.replication_set_add_table( set_name := ‘default‘, relation := ‘tbl_lottu01‘,synchronize_data := true); replication_set_add_table --------------------------- t (1 row)
我们查看复制集
lottu=# select * from pglogical.replication_set_table ; set_id | set_reloid | set_att_list | set_row_filter -----------+-------------+--------------+---------------- 290045701 | tbl_lottu01 | | (1 row)
同时,数据也同步到 subscriber 节点。因为在第二种方法有 同步 的操作。若使用第一种方法;还需要在subscriber 节点同步表的操作。
#重新同步一个表 pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass) #将所有的表都同步 pglogical.alter_subscription_synchronize(subscription_name name, truncate bool)
查看表 tbl_lottu01 信息
lottu=# select * from pglogical.show_subscription_table(‘subscription1‘,‘tbl_lottu01‘); nspname | relname | status ---------+-------------+-------------- public | tbl_lottu01 | synchronized (1 row) lottu=# select count(1) from tbl_lottu01; count ------- 10000 (1 row)
在复制集default中: update/delete/truncate 操作也是同步复制。不作演示
复制集 | INSERT | UPDATE | DELETE | TRUNCATE |
---|---|---|---|---|
default | √ | √ | √ | √ |
default_insert_only | √ | × | × | × |
pglogical.create_subscription(subscription_name name, provider_dsn text, replication_sets text[], synchronize_structure boolean, synchronize_data boolean, forward_origins text[], apply_delay interval)
参数:
示例:数据表结构同步;且延迟复制1分钟
SELECT pglogical.create_subscription( subscription_name := ‘subscription1‘, provider_dsn := ‘host=192.168.1.221 port=5432 dbname=lottu‘, synchronize_structure := true, apply_delay := ‘00:01:00‘::interval );
过滤机制需要 PostgreSQL 9.5 +
pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text)
参数:
**
示例:对表tbl_lottu02中字段{id, name, job} 字段列过滤;且对条件 ‘id > 10’ 进行行过滤 ** # provider 节点 创建表并插入测试数据 create table tbl_lottu02 (id int primary key, name text, job text, reg_time timestamp ); insert into tbl_lottu02 select generate_series(1,20) id,‘lottu‘||generate_series(1,20),‘pg‘, now(); # subscriber节点创建表; 可以只创建复制的列的数据表 create table tbl_lottu02 (id int primary key, name text, job text, reg_time timestamp ); # or create table tbl_lottu02 (id int primary key, name text, job text); #provider 节点 将表加入复制集中;并同步记录 lottu=# select pglogical.replication_set_add_table(set_name := ‘default‘, relation := ‘tbl_lottu02‘, synchronize_data := true, columns := ‘{id, name, job}‘,row_filter := ‘id < 10‘); replication_set_add_table --------------------------- t (1 row) # subscriber节点查看表tbl_lottu02记录 lottu=# select * from tbl_lottu02; id | name | job ----+--------+----- 1 | lottu1 | pg 2 | lottu2 | pg 3 | lottu3 | pg 4 | lottu4 | pg 5 | lottu5 | pg 6 | lottu6 | pg 7 | lottu7 | pg 8 | lottu8 | pg 9 | lottu9 | pg (9 rows)
事件触发器工具可用于描述为新创建的表定义复制集的规则。
CREATE OR REPLACE FUNCTION pglogical_assign_repset() RETURNS event_trigger AS $$ DECLARE obj record; BEGIN FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands() LOOP IF obj.object_type = ‘table‘ THEN IF obj.schema_name = ‘config‘ THEN PERFORM pglogical.replication_set_add_table(‘configuration‘, obj.objid); ELSIF NOT obj.in_extension THEN PERFORM pglogical.replication_set_add_table(‘default‘, obj.objid); END IF; END IF; END LOOP; END; $$ LANGUAGE plpgsql; CREATE EVENT TRIGGER pglogical_assign_repset_trg ON ddl_command_end WHEN TAG IN (‘CREATE TABLE‘, ‘CREATE TABLE AS‘) EXECUTE PROCEDURE pglogical_assign_repset();
冲突检测需要 PostgreSQL 9.5 +
如果节点订阅多个提供程序,或当本地写入在订阅服务器上发生,可能会发生冲突,尤其是对传入的变化。这些都自动检测,并可以就此采取行动取决于配置。
解决冲突的办法是通过配置 pglogical.conflict_resolution 参数。
pglogical.conflict_resolution 支持的配置参数选项为︰
当参数track_commit_timestamp被禁用时,唯一允许的配置值是 apply_remote。 PostgreSQL 9.4 不支持 track_commit_timestamp 配置参数只能配置参数apply_remote(该参数是默认值)。
# 在 订阅者 节点配置;我们保留最新的数据 track_commit_timestamp = on pglogical.conflict_resolution = ‘last_update_wins‘ # 在 订阅者 节点创建测试表tbl_lottu03 lottu=# create table tbl_lottu03(id int primary key, name text); CREATE TABLE lottu=# insert into tbl_lottu03 values (1001,‘subscriber‘); INSERT 0 1 # 在 发布者 节点 创建测试表 create table tbl_lottu03(id int primary key, name text); select pglogical.replication_set_add_table( set_name := ‘default‘, relation := ‘tbl_lottu03‘,synchronize_data := true); insert into tbl_lottu03 values (1001,‘provider‘); # 在 订阅者 节点 查看数据 lottu=# select * from tbl_lottu03; id | name ------+---------- 1001 | provider
后记: 在订阅者的表需要主键约束;不然检测不到冲突;是否需要主键约束当然这个也是根据需求而定。
发布者跟订阅者的关系;一个发布者可以被多个订阅者订阅。多个发布者可以被同一个订阅者订阅。
数据库版本 | IP | 数据库 | 角色 |
---|---|---|---|
psql (PostgreSQL) 9.6.0 | 192.168.1.221 | lottu | provider1 |
psql (PostgreSQL) 9.6.0 | 192.168.1.221 | lottu02 | provider2 |
psql (PostgreSQL) 10.5 | 192.168.1.235 | lottu | subscriber |
为了加以区分;我们定制SQL提示符;例如
lottu=# \set PROMPT1 ‘%`echo provider1=`‘ provider1=
# 每个节点创建测试表; 订阅者创建的表可以无主键;若订阅者有主键,可利用序列自增来解决冲突。(例如:本例是两个发布者,则发布者1可取奇数;发布者二可取偶数)。若无主键;数据不受影响。 provider1=create table tbl_lottu05(id int primary key,name text); CREATE TABLE provider1=CREATE SEQUENCE seq_lottu05_id INCREMENT BY 2 START WITH 1; CREATE SEQUENCE provider2=create table tbl_lottu05(id int primary key,name text); CREATE TABLE provider2=CREATE SEQUENCE seq_lottu05_id INCREMENT BY 2 START WITH 2; CREATE SEQUENCE subscriber=create table tbl_lottu05(id int primary key,name text); CREATE TABLE
更多介绍查看第三节;或者查考《PostgreSQL 逻辑复制文档 (pglogical 文档 )》
# provider 节点1 provider1=SELECT pglogical.create_node(node_name := ‘provider1‘, dsn := ‘host=192.168.1.221 port=5432 dbname=lottu‘); create_node ------------- 2976894835 provider1=select pglogical.replication_set_add_table( set_name := ‘default‘, relation := ‘tbl_lottu05‘,synchronize_data := true); replication_set_add_table --------------------------- t # provider 节点2 provider2=SELECT pglogical.create_node(node_name := ‘provider2‘, dsn := ‘host=192.168.1.221 port=5432 dbname=lottu02‘); create_node ------------- 1828187473 provider2=select pglogical.replication_set_add_table( set_name := ‘default‘, relation := ‘tbl_lottu05‘,synchronize_data := true); replication_set_add_table --------------------------- t # subscriber 节点 subscriber=SELECT pglogical.create_node(node_name := ‘subscriber‘, dsn := ‘host=192.168.1.235 port=5432 dbname=lottu‘); create_node ------------- 2941155235 subscriber=SELECT pglogical.create_subscription(subscription_name := ‘subscription1‘, provider_dsn := ‘host=192.168.1.221 port=5432 dbname=lottu‘); create_subscription --------------------- 1763399739 subscriber=SELECT pglogical.create_subscription(subscription_name := ‘subscription2‘, provider_dsn := ‘host=192.168.1.221 port=5432 dbname=lottu02‘); create_subscription --------------------- 1871150101
provider1=insert into tbl_lottu05 select nextval(‘seq_lottu05_id‘),‘lottu‘ || generate_series(1,10,2); INSERT 0 5 provider2=insert into tbl_lottu05 select nextval(‘seq_lottu05_id‘),‘lottu‘ || generate_series(1,10,2); INSERT 0 5 subscriber=select * from tbl_lottu05; id | name ----+-------- 1 | lottu1 3 | lottu3 5 | lottu5 7 | lottu7 9 | lottu9 2 | lottu1 4 | lottu3 6 | lottu5 8 | lottu7 10 | lottu9 (10 rows)
pglogical 对 PostgreSQL 版本升级是一个很实用的工具。能实现以几乎为零的停机时间迁移和升级PostgreSQL。局限性在于pglogical支持的 PostgreSQL 版本。
本例简单模拟下pglogical 对 PostgreSQL 版本升级;忽略插件、存储空间、表空间、以及业务SQL和自定义函数创建。
数据库版本 | IP | 数据库 | 角色 |
---|---|---|---|
psql (PostgreSQL) 9.6.0 | 192.168.1.221 | lottu | provider |
psql (PostgreSQL) 10.5 | 192.168.1.235 | lottu | subscriber |
以一个全新的数据库进行操作
PG10-235=drop database if exists lottu; NOTICE: database "lottu" does not exist, skipping DROP DATABASE PG10-235=create database lottu owner lottu; CREATE DATABASE
本环境已经安装pglogical;只要到对应数据库创建pglogical插件即可
PG10-235=CREATE EXTENSION pglogical; CREATE EXTENSION PG10-235=\dx List of installed extensions Name | Version | Schema | Description -----------+---------+------------+-------------------------------- pglogical | 2.2.0 | pglogical | PostgreSQL Logical Replication plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language (2 rows)
这个要根据真实环境来设置;考虑到真实环境数据库中表不一定都有主键约束,可将表放到复制集 "default_insert_only"。
PG96-221=SELECT pglogical.create_node(node_name := ‘provider‘, dsn := ‘host=192.168.1.221 port=5432 dbname=lottu‘); create_node ------------- 3171898924 (1 row) PG96-221=SELECT pglogical.replication_set_add_all_tables(‘default_insert_only‘, ARRAY[‘public‘]); replication_set_add_all_tables -------------------------------- t (1 row)
该函数可实现主键和非主键分别放到‘default‘和‘default_insert_only‘复制集
CREATE OR REPLACE FUNCTION "public"."pglogical_relhaspkey_repset"() RETURNS "pg_catalog"."void" AS $BODY$ DECLARE obj record; BEGIN FOR obj IN (SELECT n.nspname, c.relname, c.relhaspkey FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = ‘r‘ AND n.nspname <> ‘pg_catalog‘ AND n.nspname <> ‘information_schema‘ AND n.nspname !~ ‘^pg_toast‘ AND pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1, 2) LOOP IF obj.relhaspkey THEN PERFORM pglogical.replication_set_add_table(set_name := ‘default‘, relation := obj.relname :: regclass); ELSE PERFORM pglogical.replication_set_add_table(set_name := ‘default_insert_only‘, relation := obj.relname :: regclass); END IF; END LOOP; END; $BODY$ LANGUAGE plpgsql VOLATILE COST 100
PG10-235=SELECT pglogical.create_node(node_name := ‘subscriber‘, dsn := ‘host=192.168.1.235 port=5432 dbname=lottu‘);
create_node
-------------
2941155235
pglogical 可以同步表/序列结构;在创建订阅者 ‘pglogical.create_subscription‘ ; 里面参数synchronize_structure - 指定是否将提供者与订阅者之间的结构同步,默认为false。可以同步表/序列/索引。
PG10-235=SELECT pglogical.create_subscription(subscription_name := ‘subscription‘, provider_dsn := ‘host=192.168.1.221 port=5432 dbname=lottu‘, synchronize_structure := true, synchronize_data := false); create_subscription --------------------- 2875150205 (1 row)
上一步我们没同步数据。所以参数synchronize_data我们选择false。虽然把表/序列/索引结构同步过来;但是业务代码(函数/插件)没同步过来;还要考虑这些业务代码是否需要改写优化。因为新的版本往往有新特性。
pglogical有将所有未同步表都在单个操作中同步
语法:
pglogical.alter_subscription_synchronize(subscription_name name, truncate bool)
参数:
PG10-235=SELECT pglogical.alter_subscription_synchronize(subscription_name := ‘subscription‘, truncate := false); alter_subscription_synchronize -------------------------------- t (1 row)
经过上一步,两个数据库数据达到一致。
PG10-235=select * from pglogical.show_subscription_table(subscription_name := ‘subscription‘, relation := ‘tbl_lottu01‘::regclass); nspname | relname | status ---------+-------------+-------------- public | tbl_lottu01 | synchronized (1 row)
PG96-221=select count(1) from tbl_lottu01; count ------- 10000 (1 row) PG10-235=select count(1) from tbl_lottu01; count ------- 10000 (1 row)
比对数据一致;可以将业务切换到升级后的数据库。
这步是可选的;保证升级后的数据库正常支持业务。不存在数据丢失的情况下。可以删除pglogical配置。
删除步骤:
PG10-235=select pglogical.drop_subscription(subscription_name := ‘subscription‘,ifexists := true); drop_subscription ------------------- 1 (1 row) PG10-235=select pglogical.drop_node(node_name := ‘subscriber‘, ifexists := true); drop_node ----------- t (1 row) PG96-221=select pglogical.drop_node(node_name := ‘provider‘, ifexists := true); drop_node ----------- t (1 row)
PostgreSQL逻辑复制之pglogical篇
标签:nextval property sign 奇数 系统 style RoCE 之间 syn