Доп требование к очереди - ее архивное хранение и ограничение по используемой СУБД - Oracle.
Путей решения виделось несколько:
1. Использование готового решения от Оракл - AQ
2. Создание сервера приложений - эдакого контроллера очереди, посредством которого доступ к данным будет сериализован
3. Использование механизма конкурентного доступа и блокировок самой СУБД.
По ряду обстоятельств(технические сложности работы а AQ и полнейшее с ним незнакомство, трудоемкость создания доп слоя...) был выбран третий путь(учитывая FB-шное прошлое он показался более близким, чтоль:)
Суть метода в том, что обработчик помечает пакет записей заблокированными на обработку - в это время никто не должен иметь к ним доступа, а после обработки блокировка снимается и сообщения помечается обработанным.
В FB такое можно сделать посредством подобной селективной процедуры, привычно ставя блокировку холостым апдейтом.
CREATE PROCEDURE QUERY_SEL(row_count integer) RETURNS (ID_row INTEGER)
AS
begin
for select first(:row_count) sq.id_row,
from query sq
where sq.id_send_state=1
into :id_row do
begin
update sgt_query sq set sq.id_mess_state=2
where sq.id_row=:id_row;
suspend;
when any do
begin
row_count = row_count +1;
end
end
end
Оракл оказался богаче - недокументированный for update skip locked; подходил идеально(похоже это и есть механизм реализации AQ), но слушая Кайта с негодованием отметаем подобный "ugly hack" и пытаемся сделать это вручную, посредством pipeline функции.
Столкнувшись с невозможностью запуска апдейтов внутри селекта, познакомился с автономными транзакциями.
Выделив апдейт(блокировку) в отдельную процедуру задача в общих чертах решилась.
create or replace procedure sgt_queue_lock(
row_count in integer default 10,
locker_id_ in integer )
as
lockedCount integer := 0;
ERROR_LOCK EXCEPTION;
PRAGMA EXCEPTION_INIT(ERROR_LOCK, -54);
PRAGMA AUTONOMOUS_TRANSACTION;
begin
-- receive all unlocked and unsended messages and lock them
-- from competitive lockers
for r1 in (select id_mess from SGT_QUEUE t where t.ID_mess_STATE=1
for update nowait)
loop
begin
-- long time lock for processing time
update SGT_QUEUE t set t.ID_mess_STATE=2,t.LOCKER_ID=locker_id_
where t.ID_mess=r1.id_mess;
lockedCount := lockedCount+1;
if(row_count=lockedCount) then
exit;
end if;
-- if row is locked - skip it
exception when error_lock then
null;
end;
end loop;
commit;
end sgt_queue_lock;
Конечно оборачивание всего в функцию необязательно, но чем меньше вариантов использования для прикладников - тем лучше, не так ли?
create or replace function queue_get
(row_count integer, locker_id integer)
return queue_table_type PIPELINED
as
suspendedCount integer := 0;
out_rec queue_type := queue_type(null,null,null,null);
begin
-- долговременно и монопольно блокируем пакет записей для
-- обработки определенным процессом
BEGIN
-- проверяем, есть ли еще необработанные/не отосланные сообщения в очереди
-- если есть - то новых блокировать не будем
-- из-за отсутсвия exists проверям наличие записей очень извратно
select 0 into suspendedCount from SGT_QUEUE t
where t.LOCKER_ID=locker_id and
t.ID_MESS_STATE=2 and rownum=1;
EXCEPTION WHEN NO_DATA_FOUND THEN
begin
SGT_QUEUE_LOCK(row_count,locker_id);
end;
END;
-- выше использовали как временную заглушку
suspendedCount := 0;
-- выбираем записи помеченные на обработку для этого процесса
-- (не провайдера, а именно процесса)
for r1 in (select id_MESS, MESS_DATA from SGT_QUEUE t
where t.LOCKER_ID=locker_id and t.ID_MESS_STATE=2
order by t.MESS_PRIORITY DESC,t.RECEIVE_TIME ASC)
loop
out_rec.id_mess := r1.id_mess;
out_rec.mess_data := r1.mess_data;
PIPE ROW(out_rec);
suspendedCount := suspendedCount+1;
if(row_count=suspendedCount) then
exit;
end if;
end loop;
return;
end queue_get;
Из недостатков видится не максимальная скорость работы, но учитывая то, что записи на обработку будут забираться пакетно - особой проблемы это не составит.
Может кто еще какие сложности/ошбики видит?
Да, как оказалось, подобные вопросы встречаются регулярно :)
Выбрать первую незаблокированную запись из таблицы, как?
http://www.sql.ru/forum/actualthread.aspx?tid=389300&hl=aq
Таблица oracle как очередь
http://www.sql.ru/forum/actualthread.aspx?tid=309047&hl=aq
Параллельная работа с записями.
http://www.sql.ru/forum/actualthread.aspx?tid=307911&pg=-1&hl=aq
Select for update - как получить незаблокированные данные
http://www.sql.ru/forum/actualthread.aspx?tid=264566&pg=-1&hl=aq
Таблица-буфер. Уникальная выборка строк для каждого процесса.
http://www.sql.ru/forum/actualthread.aspx?tid=239337&hl=aq