вторник, 26 июня 2007 г.

Долговременная очередь в таблице Oracle

Возникла задача организации долговременной очереди, используемой несколькими параллельными конкурирующими обработчиками, которые, в общем случае, могут находиться на разных машинах.
Доп требование к очереди - ее архивное хранение и ограничение по используемой СУБД - Oracle.

Путей решения виделось несколько:
1. Использование готового решения от Оракл - AQ
2. Создание сервера приложений - эдакого контроллера очереди, посредством которого доступ к данным будет сериализован
3. Использование механизма конкурентного доступа и блокировок самой СУБД.

По ряду обстоятельств(технические сложности работы а AQ и полнейшее с ним незнакомство, трудоемкость создания доп слоя...) был выбран третий путь(учитывая FB-шное прошлое он показался более близким, чтоль:)

Суть метода в том, что обработчик помечает пакет записей заблокированными на обработку - в это время никто не должен иметь к ним доступа, а после обработки блокировка снимается и сообщения помечается обработанным.

В FB такое можно сделать посредством подобной селективной процедуры, привычно ставя блокировку холостым апдейтом.

CREATE PROCEDURE QUERY_SEL(row_count integer) RETURNS (ID_row INTEGER)
AS
begin
for select first(:row_count) sq.id_row,
from query sq
where sq.id_send_state=1
into :id_row do
begin
update sgt_query sq set sq.id_mess_state=2
where sq.id_row=:id_row;
suspend;
when any do
begin
row_count = row_count +1;
end
end
end

Оракл оказался богаче - недокументированный for update skip locked; подходил идеально(похоже это и есть механизм реализации AQ), но слушая Кайта с негодованием отметаем подобный "ugly hack" и пытаемся сделать это вручную, посредством pipeline функции.

Столкнувшись с невозможностью запуска апдейтов внутри селекта, познакомился с автономными транзакциями.
Выделив апдейт(блокировку) в отдельную процедуру задача в общих чертах решилась.

create or replace procedure sgt_queue_lock(
row_count in integer default 10,
locker_id_ in integer )
as
lockedCount integer := 0;
ERROR_LOCK EXCEPTION;
PRAGMA EXCEPTION_INIT(ERROR_LOCK, -54);
PRAGMA AUTONOMOUS_TRANSACTION;
begin
-- receive all unlocked and unsended messages and lock them
-- from competitive lockers
for r1 in (select id_mess from SGT_QUEUE t where t.ID_mess_STATE=1
for update nowait)
loop
begin
-- long time lock for processing time
update SGT_QUEUE t set t.ID_mess_STATE=2,t.LOCKER_ID=locker_id_
where t.ID_mess=r1.id_mess;
lockedCount := lockedCount+1;

if(row_count=lockedCount) then
exit;
end if;

-- if row is locked - skip it
exception when error_lock then
null;
end;
end loop;
commit;
end sgt_queue_lock;

Конечно оборачивание всего в функцию необязательно, но чем меньше вариантов использования для прикладников - тем лучше, не так ли?


create or replace function queue_get
(row_count integer, locker_id integer)
return queue_table_type PIPELINED
as
suspendedCount integer := 0;
out_rec queue_type := queue_type(null,null,null,null);
begin
-- долговременно и монопольно блокируем пакет записей для
-- обработки определенным процессом
BEGIN
-- проверяем, есть ли еще необработанные/не отосланные сообщения в очереди
-- если есть - то новых блокировать не будем
-- из-за отсутсвия exists проверям наличие записей очень извратно
select 0 into suspendedCount from SGT_QUEUE t
where t.LOCKER_ID=locker_id and
t.ID_MESS_STATE=2 and rownum=1;
EXCEPTION WHEN NO_DATA_FOUND THEN
begin
SGT_QUEUE_LOCK(row_count,locker_id);
end;
END;

-- выше использовали как временную заглушку
suspendedCount := 0;
-- выбираем записи помеченные на обработку для этого процесса
-- (не провайдера, а именно процесса)
for r1 in (select id_MESS, MESS_DATA from SGT_QUEUE t
where t.LOCKER_ID=locker_id and t.ID_MESS_STATE=2
order by t.MESS_PRIORITY DESC,t.RECEIVE_TIME ASC)
loop
out_rec.id_mess := r1.id_mess;
out_rec.mess_data := r1.mess_data;

PIPE ROW(out_rec);

suspendedCount := suspendedCount+1;
if(row_count=suspendedCount) then
exit;
end if;

end loop;
return;
end queue_get;

Из недостатков видится не максимальная скорость работы, но учитывая то, что записи на обработку будут забираться пакетно - особой проблемы это не составит.
Может кто еще какие сложности/ошбики видит?

Да, как оказалось, подобные вопросы встречаются регулярно :)

Выбрать первую незаблокированную запись из таблицы, как?
http://www.sql.ru/forum/actualthread.aspx?tid=389300&hl=aq

Таблица oracle как очередь
http://www.sql.ru/forum/actualthread.aspx?tid=309047&hl=aq

Параллельная работа с записями.
http://www.sql.ru/forum/actualthread.aspx?tid=307911&pg=-1&hl=aq

Select for update - как получить незаблокированные данные
http://www.sql.ru/forum/actualthread.aspx?tid=264566&pg=-1&hl=aq

Таблица-буфер. Уникальная выборка строк для каждого процесса.
http://www.sql.ru/forum/actualthread.aspx?tid=239337&hl=aq

2 комментария:

i.den комментирует...

Что ты делал на sql.ru? ;)

pnv82 комментирует...

Не, ну хоть в пятницу я могу себе позволить что-то развлекательное? :))))
На самом деле по Ораклу там вполне приличный форум, не чета ФБ-шному :(