Introdução ao Oracle Advanced Queue
Olá pessoal,
Hoje falarei um pouco sobre o Oracle AQ (Advanced Queue). Recurso para troca de mensagens do Oracle que não é muito encontrado na maioria dos projetos, mas que pode ser útil. Trata-se de um usuário\processo que “cadastra” mensagem em um fila para que ela seja consumida por um ou mais usuários\processos. Irei mostrar um exemplo simples de utilização.
Primeiro precisamos criar um owner para os objetos da nossa fila AQ. Ele precisará dos privilégios:
GRANT MANAGE ANY QUEUE TO admin_aq;
GRANT EXECUTE ON sys.dbms_aqadm TO admin_aq;
Ele criará indiretamente segmentos que serão mantidos pela utilização da fila:
GRANT UNLIMITED TABLESPACE TO admin_aq;
Antes de criar nossa fila de mensagens, precisamos definir o tipo de mensagem que será enfileirada, como por exemplo:
CREATE OR REPLACE TYPE admin_aq.tp_msg_aq IS OBJECT(
cod NUMBER(3),
dsc VARCHAR2(50));
/
Com isso já podemos criar a fila que terá uma tabela (queue_table) onde as mensagens serão armazenadas. Também será criada uma fila de exceção para as mensagens expiradas:
EXECUTE dbms_aqadm.create_queue_table(queue_table => 'tb_msg_aq', queue_payload_type => 'tp_msg_aq', multiple_consumers => TRUE, comment => 'Tabela de mensagens');
EXECUTE dbms_aqadm.create_queue(queue_name => 'fila_msg_aq', queue_table => 'tb_msg_aq');
EXECUTE dbms_aqadm.create_queue(queue_name => 'fila_msg_aq_excep', queue_table => 'tb_msg_aq', queue_type => dbms_aqadm.EXCEPTION_QUEUE);
SELECT * FROM User_Queues;
Quando iniciamos uma fila, por padrão, ela permite que as mensagens sejam enfileiradas e consumidas, porém para a fila de exceção só precisamos habilitá-la para ser consumida, pois o enfileiramento será automático pelo AQ:
EXECUTE dbms_aqadm.start_queue(queue_name => 'fila_msg_aq');
EXECUTE dbms_aqadm.start_queue(queue_name => 'fila_msg_aq_excep', enqueue => FALSE);
Com a fila criada e disponível podemos começar a utilizá-la. Irei cadastrar dois consumidores nessa fila:
DECLARE
subs1 sys.aq$_agent := sys.aq$_agent('cons1', null, null);
subs2 sys.aq$_agent := sys.aq$_agent('cons2', null, null);
BEGIN
--
dbms_aqadm.add_subscriber(queue_name => 'fila_msg_aq',subscriber => subs1);
--
dbms_aqadm.add_subscriber(queue_name => 'fila_msg_aq',subscriber => subs2);
--
END;
/
SELECT * FROM user_queue_subscribers;
Agora iremos simular um processo que irá enfileirar uma mensagem, para isso é necessário de usuário com os grants abaixo:
GRANT EXECUTE ON admin_aq.tp_msg_aq TO user_aq;
GRANT EXECUTE ON sys.dbms_aq TO user_aq;
BEGIN
dbms_aqadm.grant_queue_privilege (
privilege => 'ALL',
queue_name => 'admin_aq.fila_msg_aq',
grantee => 'user_aq',
grant_option => FALSE);
END;
/
Com o usuário configurado, podemos enfileirar a primeira mensagem:
DECLARE
--
v_msg admin_aq.tp_msg_aq;
eq_opt dbms_aq.enqueue_options_t;
msg_prop dbms_aq.message_properties_t;
msg_handle raw(16);
--
BEGIN
--
-- Mensagem
v_msg := admin_aq.tp_msg_aq(1, 'Um');
--
msg_prop.DELAY := DBMS_AQ.NO_DELAY;
msg_prop.expiration := 15; -- segundos
msg_prop.exception_queue := 'admin_aq.fila_msg_aq_excep';
--
DBMS_AQ.ENQUEUE(queue_name => 'admin_aq.fila_msg_aq',
enqueue_options => eq_opt,
message_properties=> msg_prop,
payload => v_msg,
msgid => msg_handle);
--
COMMIT;
--
END;
/
SELECT a.queue, a.msg_state, a.expiration, a.enq_time, a.deq_time
, a.user_data.cod||' - '||a.user_data.dsc msg, a.consumer_name
FROM admin_aq.aq$tb_msg_aq a;
Essa mensagem fica como lida para todos consumidores cadastrados na fila e disponível para ser processada, como no exemplo abaixo pelo cons1:
DECLARE
--
dopt dbms_aq.dequeue_options_t;
msg_prop dbms_aq.message_properties_t;
msg admin_aq.tp_msg_aq;
msgid RAW(16);
--
BEGIN
--
dopt.consumer_name := 'cons1';
dopt.visibility := dbms_aq.immediate;
dopt.navigation := dbms_aq.first_message;
--
dbms_aq.dequeue(queue_name => 'admin_aq.fila_msg_aq',
dequeue_options => dopt,
message_properties => msg_prop,
payload => msg,
msgid => msgid);
--
dbms_output.put_line(msg.cod||' - '||msg.dsc);
--
END;
/
Repare o status da mensagem com a consulta abaixo. Ela foi processada por apenas um dos consumidores da fila e irá expirar para o outro consumidor:
SELECT a.queue, a.msg_state, a.expiration, a.enq_time, a.deq_time
, a.user_data.cod||' - '||a.user_data.dsc msg, a.consumer_name
FROM admin_aq.aq$tb_msg_aq a;
Caso a mensagem fosse processada por todos os consumidores ela seria removida da fila e não transferida para fila de exceção.
Para retirar um consumidor usamos:
DECLARE
subs2 sys.aq$_agent := sys.aq$_agent('cons2', null, null);
BEGIN
--
dbms_aqadm.remove_subscriber(queue_name => 'admin_aq.fila_msg_aq',
subscriber => subs2);
--
END;
/
Bom Pessoal, esse foi um exemplo bem simples de utilização do AQ. É possível fazer diversas configurações para utilizá-lo de forma que atenda a sua necessidade, como adicionar regras para consumo de mensagens, definir prioridade, utilizar XML entre outros. Para AQ com RAC é preciso tomar alguns cuidados com parametrização (aq_tm_processes) e aplicação de patchs de correções para algumas versões, quantidades de instâncias e etc.
Espero ter despertado interesse aos que nunca tinham visto utilização do AQ.
Abraços
Olá,
Existe alguma maneira de eu ver os dados que estão dentro da fila?
Obrigada!
Olá,
Tem sim, no exemplo a consulta abaixo retorna os dados:
SELECT a.queue, a.msg_state, a.expiration, a.enq_time, a.deq_time
, a.user_data.cod||‘ – ‘||a.user_data.dsc msg, a.consumer_name
FROM admin_aq.aq$tb_msg_aq a;
Olá Danilo, tudo bem?
Primeiramente parabéns pelo blog.
Cara, estou com um problema da tabela da minha queue.
Eu coloco 5 mensagens na tabela da minha queue e essas mensagens são processadas(removida) uma por vez. Porém o correto não seria ele processa tudo de uma vez? Existe algum parâmetro para controlar isso?
Obrigado.
Oi Antonio,
Obrigado =)
Como são cinco mensagens, você deve executar o dbms_aq.dequeue cinco vezes. Verifique se a propriedade expiration do dbms_aq.message_properties_t atende à sua regra de negócio para que o subscriber não “precise” consumir todas as mensagens da fila.
Abraço