CREATE SCHEMA IF NOT EXISTS fzs;
CREATE TABLE fzs.ddl_command
(
ddl_text TEXT COLLATE pg_catalog."default",
id BIGSERIAL PRIMARY KEY,
EVENT TEXT COLLATE pg_catalog."default",
tag TEXT COLLATE pg_catalog."default",
username CHARACTER VARYING COLLATE pg_catalog."default",
DATABASE CHARACTER VARYING COLLATE pg_catalog."default",
SCHEMA CHARACTER VARYING COLLATE pg_catalog."default",
object_type CHARACTER VARYING COLLATE pg_catalog."default",
object_name CHARACTER VARYING COLLATE pg_catalog."default",
client_address CHARACTER VARYING COLLATE pg_catalog."default",
client_port integer,
event_time timestamp WITH time ZONE,
txid_current CHARACTER VARYING(128) COLLATE pg_catalog."default",
message TEXT COLLATE pg_catalog."default"
);
alter table fzs.ddl_command replica identity full;
CREATE FUNCTION fzs.capture_ddl()
RETURNS event_trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF SECURITY DEFINER
AS $BODY$
declare ddl_text text;
declare max_rows int := 10000;
declare current_rows int;
declare pg_version_95 int := 90500;
declare pg_version_10 int := 100000;
declare current_version int;
declare object_id varchar;
declare alter_table varchar;
declare record_object record;
declare message text;
declare pub RECORD;
DECLARE r RECORD;
begin
select current_query() into ddl_text;
if TG_TAG = 'CREATE TABLE' then
show server_version_num into current_version;
if current_version >= pg_version_95 then
for record_object in (select * from pg_event_trigger_ddl_commands()) loop
if record_object.command_tag = 'CREATE TABLE' then
object_id := record_object.object_identity;
end if;
end loop;
else
select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id;
end if;
if object_id = '' or object_id is null then
message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query();
else
alter_table := 'ALTER TABLE ' || object_id || ' REPLICA IDENTITY FULL';
message := 'alter_sql=' || alter_table;
execute alter_table;
end if;
if current_version >= pg_version_10 then
for pub in (select * from pg_publication where pubname like 'fzs_%') loop
raise notice 'pubname=%',pub.pubname;
BEGIN
execute 'alter publication ' || pub.pubname || ' add table ' || object_id;
EXCEPTION WHEN OTHERS THEN
END;
end loop;
end if;
end if;
SELECT * into r FROM pg_event_trigger_ddl_commands();
insert into fzs.ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message)
values (default,TG_EVENT,TG_TAG,current_user,current_database(),COALESCE(r.schema_name, CURRENT_SCHEMA),r.object_type,
CASE
WHEN POSITION('.' IN r.object_identity) > 0 THEN
SPLIT_PART(r.object_identity, '.', 2)
ELSE
r.object_identity
END, inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message);
select count(id) into current_rows from fzs.ddl_command;
if current_rows > max_rows then
delete from fzs.ddl_command where id in (select min(id) from fzs.ddl_command);
end if;
end
$BODY$;
ALTER FUNCTION fzs.capture_ddl() OWNER TO lightdb;
CREATE EVENT TRIGGER intercept_ddl ON ddl_command_end EXECUTE PROCEDURE fzs.capture_ddl();