代码拉取完成,页面将自动刷新
同步操作将从 宏哥/mq2 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
--Message queue implemented by PG
--@Author Anthony Chen
--@Copyright reserved
--2016-12-30
CREATE TABLE mq_config (
table_name name not null unique,
channel name primary key
);
CREATE TABLE mq_base (
msg_id bigserial not null,
type_id bigint not null default 0,
sent boolean not null default false ,
sent_at timestamp with time zone not null default now(),
payload jsonb not null,
delivered_at timestamp with time zone
,CONSTRAINT pk_mq_base PRIMARY KEY (msg_id)
);
--Use of this index SET "enable_seqscan = OFF" should be executed to avoid seq scan
CREATE UNIQUE INDEX idx_mq_base_not_sent
ON mq_base USING btree (msg_id,type_id) WHERE sent = false;
CREATE OR REPLACE FUNCTION mq_trigger_notify() RETURNS TRIGGER
LANGUAGE PLPGSQL AS
$$
DECLARE t_channel name;
BEGIN
SELECT channel INTO t_channel FROM mq_config
WHERE table_name = TG_RELNAME;
EXECUTE 'NOTIFY ' || quote_ident(t_channel) || ', '
|| quote_literal(NEW.msg_id);
RETURN NEW;
END;
$$;
CREATE OR REPLACE FUNCTION mq_create_queue
(in_channel text)
RETURNS mq_config
LANGUAGE PLPGSQL VOLATILE SECURITY DEFINER AS $$
DECLARE
out_val mq_config%ROWTYPE;
t_table_name name;
BEGIN
t_table_name := 'mq_queue_' || in_channel;
INSERT INTO mq_config (table_name, channel)
VALUES (t_table_name, in_channel) returning * into out_val;
EXECUTE 'CREATE TABLE ' || quote_ident(t_table_name) || '(
like mq_base INCLUDING ALL )';
EXECUTE ' GRANT ALL ON TABLE '||quote_ident(t_table_name) || ' TO public';
EXECUTE 'CREATE TRIGGER tr_mq_notify
AFTER INSERT ON ' || quote_ident(t_table_name) || '
FOR EACH ROW EXECUTE PROCEDURE mq_trigger_notify()';
RETURN out_val;
END;
$$;
REVOKE EXECUTE ON FUNCTION mq_create_queue(text) FROM public;
CREATE OR REPLACE FUNCTION mq_drop_queue(in_channel name) RETURNS bool
LANGUAGE plpgsql VOLATILE SECURITY DEFINER AS $$
declare t_table_name name;
BEGIN
SELECT table_name INTO t_table_name FROM mq_config
WHERE channel = in_channel;
EXECUTE 'DROP TABLE ' || quote_ident(t_table_name) || ' CASCADE';
DELETE FROM mq_config WHERE channel = in_channel;
RETURN FOUND;
END;
$$;
REVOKE EXECUTE ON FUNCTION mq_drop_queue(in_channel name) FROM public;
CREATE OR REPLACE FUNCTION mq_send
(in_channel text, in_payload jsonb,in_type bigint DEFAULT 0 )
RETURNS mq_base
LANGUAGE PLPGSQL VOLATILE
AS $$
DECLARE channel_entry mq_config%ROWTYPE;
out_val mq_base%ROWTYPE;
BEGIN
SELECT * INTO channel_entry FROM mq_config
WHERE channel = in_channel;
IF NOT FOUND THEN
RAISE EXCEPTION 'Channel Not Found';
END IF;
EXECUTE 'INSERT INTO ' || quote_ident(channel_entry.table_name)
|| ' (type_id, payload) VALUES ( '
|| in_type ||','
|| quote_literal(in_payload) || '::jsonb )
RETURNING msg_id,type_id,sent, sent_at,payload, delivered_at'
INTO out_val ;
RETURN out_val;
END;
$$;
CREATE OR REPLACE FUNCTION mq_recv(
in_channel name, --Channel Name
in_num_msgs bigint DEFAULT 1, --Number of message to get
in_type_id bigint DEFAULT NULL::bigint --Message type id
)
RETURNS SETOF mq_base LANGUAGE PLPGSQL VOLATILE
AS $$
DECLARE channel_entry mq_config%ROWTYPE;
BEGIN
SELECT * INTO channel_entry FROM mq_config
WHERE channel = in_channel;
IF in_type_id is NULL THEN
RETURN QUERY EXECUTE
$e$ UPDATE $e$ || quote_ident(channel_entry.table_name) || $e$
SET delivered_at = now(),sent=true
WHERE msg_id IN (SELECT msg_id
FROM $e$ || quote_ident(channel_entry.table_name) ||
$e$ WHERE sent=false
LIMIT $e$ || in_num_msgs || $e$
FOR UPDATE SKIP LOCKED )
RETURNING msg_id, type_id ,sent ,sent_at, payload, delivered_at $e$;
ELSE
RETURN QUERY EXECUTE
$e$ UPDATE $e$ || quote_ident(channel_entry.table_name) || $e$
SET delivered_at = now(),sent=true
WHERE msg_id IN (SELECT msg_id
FROM $e$ || quote_ident(channel_entry.table_name) ||
$e$ WHERE sent=false and type_id= $e$ || in_type_id || $e$
LIMIT $e$ || in_num_msgs || $e$
FOR UPDATE SKIP LOCKED
)
RETURNING msg_id, type_id ,sent ,sent_at, payload, delivered_at $e$;
END IF;
END;
$$;
COMMENT ON FUNCTION mq_send(text, jsonb, bigint) IS 'SET enable_seqscan = OFF; --TO BOOST PERFORMANCE--';
CREATE OR REPLACE FUNCTION mq_recv(
in_channel name, --Channel Name
in_type_rs text, --Message type id
in_num_msgs bigint DEFAULT 1 --Number of message to get
)
RETURNS SETOF mq_base LANGUAGE PLPGSQL VOLATILE
AS $$
DECLARE channel_entry mq_config%ROWTYPE;
BEGIN
SELECT * INTO channel_entry FROM mq_config
WHERE channel = in_channel;
RETURN QUERY EXECUTE
$e$ UPDATE $e$ || quote_ident(channel_entry.table_name) || $e$
SET delivered_at = now(),sent=true
WHERE msg_id IN (SELECT msg_id
FROM $e$ || quote_ident(channel_entry.table_name) ||
$e$ WHERE sent=false and type_id in $e$ || in_type_rs || $e$
LIMIT $e$ || in_num_msgs || $e$
FOR UPDATE SKIP LOCKED
)
RETURNING msg_id, type_id ,sent ,sent_at, payload, delivered_at $e$;
END;
$$;
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。