Очереди


Мне неактуально, разбирайтесь сами. Если готовы поразбираться и поделиться результатами, welcome!


https://t.me/oracle_dba_ru/6090


-- Создаём пользователя (можно пропустить)
create tablespace testqueue_tablespace DATAFILE 'D:\ORACLE\TESTQUEUE.DAT'
SIZE 20M REUSE AUTOEXTEND ON NEXT 2M MAXSIZE 1000M;

create USER testqueue IDENTIFIED BY "testqueue"
default tablespace testqueue_tablespace;

-- Даём гранты
grant create session to testqueue;
alter user testqueue quota unlimited on testqueue_tablespace;
grant create table to testqueue;
grant CREATE TRIGGER to testqueue;
grant create synonym to testqueue;
grant create sequence to testqueue;
grant create procedure to testqueue;
grant create type to testqueue;
grant create view to testqueue;
grant create materialized view to testqueue;
-- Работа с очередью
grant execute on DBMS_AQADM to testqueue;
grant execute on DBMS_AQ to testqueue;

/
-- Создаём тип для очереди
create or replace type QueueMessageType as object
(
  id     number,
  Param1 number,
  param2 number
)
/
begin
  -- Создаём таблицу для очереди
  dbms_aqadm.create_queue_table(queue_table => 'QUEUETABLE',
                                queue_payload_type => 'TESTQUEUE.QUEUEMESSAGETYPE');

  -- Создаём саму очередь
  dbms_aqadm.create_queue(queue_name => 'MSG_QUEUE',
                          queue_table => 'QUEUETABLE');

  -- заупускаем очередь
  dbms_aqadm.start_queue('MSG_QUEUE');
end;
/
-- Создаём обычную таблицу, где будем хранить все сообщения
create table MSGDATA
(
  id      NUMBER generated always as identity,
  paramid NUMBER not null,                       -- Параметры входящего сообщения
  param1  NUMBER not null,                       -- Параметры входящего сообщения
  param2  NUMBER not null,                       -- Параметры входящего сообщения
  result  NUMBER,                                -- Результат работы
  status  VARCHAR2(1) not null,                  -- Статус записи
  updated DATE not null,
  created DATE default sysdate not null
)
/
-- Создаем триггер на нашу таблицу
create or replace trigger msgdata_BI
  before insert or update on msgdata
  for each row
DECLARE
  enqueue_options    dbms_aq.enqueue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle     RAW(16);
  message            queuemessagetype;
begin
  -- Время обновления записи
  :new.updated := sysdate;

  if inserting then
    :new.status  := 'N'; -- New

    -- Создаём сообщение для очерерди
    message := queuemessagetype(:new.paramid, :new.param1, :new.param2);

    -- Добавляем сообщение в очередь
    dbms_aq.enqueue(queue_name         => 'msg_queue',
                    enqueue_options    => enqueue_options,
                    message_properties => message_properties,
                    payload            => message,
                    msgid              => message_handle);
  end if;
end msgdata_BI;
/
-- Создаём процедуру где будем работать с очередью
create or replace procedure DoWork is
  dequeue_options    dbms_aq.dequeue_options_t;
  message_properties dbms_aq.message_properties_t;
  message_handle     RAW(16);
  message            queuemessagetype;
  vResult            number;
BEGIN
  -- Достаём запись из очереди
  DBMS_AQ.DEQUEUE(queue_name         => 'msg_queue',
                  dequeue_options    => dequeue_options,
                  message_properties => message_properties,
                  payload            => message,
                  msgid              => message_handle);

  -- Здесь обрабатываем наше сообщение
  -- ... много много логики
  vResult := message.param1 + message.param2;

  -- Сохраняем результат, обновляем статус, о том что сообщение обработано
  update msgdata
     set result = vResult,
         status = 'D' -- Done
   where paramid = message.id;

end DoWork;
/
-- Вставляем записи в нашу таблицу, они автоматически добавятся в очередь.
insert into msgdata(paramid, param1, param2) values(1, 10, 15);
insert into msgdata(paramid, param1, param2) values(2, 30, 30);
insert into msgdata(paramid, param1, param2) values(3, 100, 100);
/
-- Обрабатываем две записи из трёх
begin
  DoWork;
  DoWork;
end;
/
-- Проверяем результат
select * from msgdata;


https://gist.github.com/dbobylev/64206405cc800fd9dc27077a50076a9b