Message Queueing


Overview

One of the bugbears of users of applications is simply how long it takes to do things. Putting it simply, users get bored. They always will. I know I do. So, as developers, anything we can do to improve the performance of applications has to be a good thing. The problem is that, in any given environment, there is always a minimum time to do anything. It takes a finite amount of time to INSERT a table, UPDATE a column, etc. and usually most business logic involves many steps.

So, with systems where business logic takes time, what can we do to improve performance? Well, a common and effective approach is to split the work done by an application into the components which need to be done in order to allow the user to carry on with the work flow, and those which can be done "behind the scenes".

It's a simple approach really, if there's a large amount of work which needs to be done, but the user does not need to see the results of, then if you could simply schedule (or defer) the work to be done, rather than doing it, then the perceived performance of the application increases. A good example is a web page, where the user submits some details and subsequent processes create users / send emails etc. The user does not want to wait until all this is finished. They're fine with a "you're request has been received, an email will be sent when your account has been authorised"-type message.

This is where techniques such as Message Queueing (MQ) are used. The premise is simple,
  1. Application creates (or enqueues) a message with information for enabling the business logic
  2. A seperate process receives (or dequeues) the message and executes the business logic
There are many mechanisms for implementing this type of approach. This section will cover the most common.

Simple INSERT

This is perhaps the simplest approach, and one that, with the advent of Autonomous Transactions, becomes very easy to implement. Now, the application only has to insert a row into the table (i.e. enqueue) and the business logic gets enabled.

In reality, of course, the job should lock the row upon selecting to ensure other jobs don't get the same message.

There is a SQL statement which makes the selecting of non-locked rows extremely easy, and that is SELECT .. FROM ... FOR UPDATE SKIP LOCKED (usually in conjunction with and rownum=1 ;-)). This command (which until 9i was undocumented, but is certainly available in 8i) is the reason why Advanced Queueing (AQ) is so efficient, and allows you to build much more light-weight mechanisms than the alternatives, such as looping through the rows in the messages table using the NOWAIT syntax (which, of course, raises an exception if the row is locked).

Advanced Queueing

Advanced Queueing (AQ) is the umbrella term for a series of techniques, the components of which are provided by Oracle. The basic concept is the same as the
Simple INSERT, but the capabilities have been vastly extended, such that the message can be read by multiple recipients (via the publish-subscriber concept), and allows prioritization of messages.. It has the additional advantages of being able to be called from PL/SQL, Java, C and even hook into other message queueing systems via the Oracle Messaging Gateway.

Messages, therefore, are the basic unit of AQ (as you might expect). Each message consists of a header, which contains metadata about message priority, ordering information, expiry times etc., and a body, which contains user data in the form of an OBJECT type.

Queues, therefore, are simply repositories of messages. Under the Advanced Queueing umbrella, there are two types of Queues, user queues and exception queues. A user queue is for normal messaging functionality, and messages which cannot be processed or retrieved from a user queue is transferred to the exception queue.

Queue Tables, are the place where queues are stored. A given queue table may contain one or many queues. Each queue table contains a default exception queue. Each column in a queue table is a seperate queue, and rows represent individual messages.

Agents are the users of a queue. There are two types of Agent, Producers and Consumers. Producers put messages onto the queue ( or Enqueue ), Consumers read the messages ( or Dequeue ). The packages which are used to enable the AQ functionality, are DBMS_AQ and DBMS_AQADM, and it's surprisingly easy to set up the environment such that your application can take advantage of this functionality.

Time Manager is the process responsible for managing the expiration of messages. The number of Time Managers is controlled by the PFILE parameter, AQ_TM_PROCESSES which can be set to a value between 1 and 10.

Messages are created using the DBMS_AQ.ENQUEUE procedure, and are read by the DBMS_AQ.DEQUEUE procedure.

There are 2 roles associated with AQ, AQ_USER_ROLE and AQ_ADMINISTRATOR_ROLE. Basically, the difference is that AQ_USER_ROLE only has execute privileges on the DBMS_AQ, basically allowing ENQUEUE and DEQUEUE type operations, whereas, AQ_ADMINISTRATOR_ROLE has execute privs on both DBMS_AQ and DBMS_AQADM, allowing creation of Queue Tables etc.

So, as AQ administrator, create a queue setup, i.e.

Create the OBJECT type (note, advanced setups will require a much more complex OBJECT type, this is for example only)

SQL> CREATE TYPE message_type AS OBJECT ( message_body  VARCHAR2(50) );
  2  /

Type created.


Now, create the queue table using the new message_type OBJECT :

SQL> BEGIN
  2    dbms_aqadm.create_queue_table(queue_table => 'queue_table',
  3                                  queue_payload_type => 'message_type');
  4  END;
  5  /

PL/SQL procedure successfully completed.


Just prove the queue_table table has been created with our custom payload :

SQL> DESC queue_table
 Name                                      Null?    Type
 ----------------------------------------- -------- ---------------------------
 Q_NAME                                             VARCHAR2(30)
 MSGID                                     NOT NULL RAW(16)
 CORRID                                             VARCHAR2(128)
 PRIORITY                                           NUMBER
 STATE                                              NUMBER
 DELAY                                              DATE
 EXPIRATION                                         NUMBER
 TIME_MANAGER_INFO                                  DATE
 LOCAL_ORDER_NO                                     NUMBER
 CHAIN_NO                                           NUMBER
 CSCN                                               NUMBER
 DSCN                                               NUMBER
 ENQ_TIME                                           DATE
 ENQ_UID                                            NUMBER
 ENQ_TID                                            VARCHAR2(30)
 DEQ_TIME                                           DATE
 DEQ_UID                                            NUMBER
 DEQ_TID                                            VARCHAR2(30)
 RETRY_COUNT                                        NUMBER
 EXCEPTION_QSCHEMA                                  VARCHAR2(30)
 EXCEPTION_QUEUE                                    VARCHAR2(30)
 STEP_NO                                            NUMBER
 RECIPIENT_KEY                                      NUMBER
 DEQUEUE_MSGID                                      RAW(16)
 SENDER_NAME                                        VARCHAR2(30)
 SENDER_ADDRESS                                     VARCHAR2(1024)
 SENDER_PROTOCOL                                    NUMBER
 USER_DATA                                          MESSAGE_TYPE


Now, create a queue using the queue table :

SQL> BEGIN
  2    dbms_aqadm.create_queue( queue_name => 'example_queue',
  3                             queue_table => 'queue_table' );
  4  END;
  5  /

PL/SQL procedure successfully completed.


Now, start the queue :

SQL> BEGIN
  2    dbms_aqadm.start_queue(queue_name=>'example_queue');
  3  END;
  4  /

PL/SQL procedure successfully completed.


Enqueue a message using a procedure similar to this. Note, since roles are not enabled within stored procedures, you will need direct EXECUTE grants on DBMS_AQ to the users, rather than just AQ_USER_ROLE.

SQL> CREATE OR REPLACE PROCEDURE p_enqueue(msg IN VARCHAR2)
  2  AS
  3    PRAGMA AUTONOMOUS_TRANSACTION;
  4    enqueue_options       dbms_aq.enqueue_options_t;
  5    message_properties    dbms_aq.message_properties_t;
  6    message_handle        RAW(16);
  7  BEGIN
  8    dbms_aq.enqueue( queue_name            => 'example_queue',
  9                     enqueue_options       => enqueue_options,
 10                     message_properties    => message_properties,
 11                     payload               => message_type(msg),
 12                     msgid                 => message_handle);
 13    COMMIT;
 14  END;
 15  /

Procedure created.


Dequeue a message using a procedure similar to this :

SQL> CREATE OR REPLACE PROCEDURE p_dequeue
  2  AS
  3    PRAGMA AUTONOMOUS_TRANSACTION;
  4    dequeue_options      dbms_aq.dequeue_options_t;
  5    message_properties   dbms_aq.message_properties_t;
  6    message_handle       RAW(16);
  7    message              message_type;
  8  BEGIN
  9    dbms_aq.dequeue( queue_name => 'example_queue',
 10                     dequeue_options       => dequeue_options,
 11                     message_properties    => message_properties,
 12                     payload               => message,
 13                     msgid                 => message_handle);
 14    dbms_output.put_line('Message : ' || message.message_body);
 15    COMMIT;
 16  END p_dequeue;
 17  /

Procedure created.


Now, test the queues :

SQL> set serverout on

SQL> EXEC p_enqueue('THIS IS A TEST MESSAGE');

PL/SQL procedure successfully completed.

SQL> EXEC p_dequeue
Message : THIS IS A TEST MESSAGE

PL/SQL procedure successfully completed.


So, the basic functionality of DBMS_AQ, as you can see, it's easy to setup and use.

Dequeueing Modes

AQ gives you the ability to dequeue a message with or without deleting the message from the queue. Basically, you can browse a message ( i.e. read it but don't remove it from the queue) or consume the message (i.e. process and remove). They are all specified by using the dequeue_mode attribute of the dbms_aq.dequeue_options_t object :

SQL> CREATE OR REPLACE PROCEDURE p_dequeue
  2  AS
  3    PRAGMA AUTONOMOUS_TRANSACTION;
  4    dequeue_options      dbms_aq.dequeue_options_t;
  5    message_properties   dbms_aq.message_properties_t;
  6    message_handle       RAW(16);
  7    message              message_type;
  8  BEGIN
  9    dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
 10    dbms_aq.dequeue( queue_name => 'example_queue',
 11                     dequeue_options       => dequeue_options,
 12                     message_properties    => message_properties,
 13                     payload               => message,
 14                     msgid                 => message_handle);
 15    dbms_output.put_line('Message : ' || message.message_body);
 16    COMMIT;
 17  END p_dequeue;


This is a tabular list of possible dequeue_modes and their meanings :

Dequeue Mode Meaning
DBMS_AQ.LOCKED Message is read and locked by the dequeue operation. No other process can see the message until a COMMIT or ROLLBACK occurs.
DBMS_AQ.BROWSE Message is read, but other processes can still see the message. There is no guarantee that a message will be BROWSEd and then available afterwards due to the fact that another session may have removed it
DBMS_AQ.REMOVE Message is read and removed immediately (but can be retained depending upon the retention properties of the queue). This is the default.
DBMS_AQ.REMOVE_NODATA Message is "marked" as read and updated or deleted. This is useful if you've already BROWSEd or LOCKED the message, and now want to remove it, but without the overhead of receiving the full message

For further information, see the Oracle docs at
http://download-west.oracle.com/docs/cd/B10501_01/appdev.920/a96612/t_aq2.htm#1012116.

Dequeueing Wait Times

The decision has to be made about what happens if a dequeue request is made (this of course assumes that PL/SQL notification is not being used), but there are no messages which fulfil the criteria. Under AQ, there are three possibilities which can be specified by the wait attribute of the dbms_aq.dequeue_options_t object :

Wait Mode Meaning
DBMS_AQ.FOREVER Dequeue operation waits until a message of the correct criteria is enqueued. This is the default.
DBMS_AQ.NO_WAIT Does not wait at all. Note, the exception -25228 will be raised if no messages exist. You will probably want to handle this in the dequeue procedure.
Number of Seconds Waits a specified number of seconds before raising the -25228 exception if no messages exist.

For further information, see the Oracle docs at
http://download-west.oracle.com/docs/cd/B10501_01/appdev.920/a96612/t_aq2.htm#1012116.

Mechanisms for Enqueue and Dequeue

Obviously, when using AQ, you could come up with dozens of ways to both enqueue and dequeue messages. Enqueueing is almost always a function of the client, it calls p_enqueue (either directly or indirectly). Dequeueing, however, is a different kettle of fish. What process do we use to dequeue messages and subsequently execute the relevant processes? The most common approaches are :

Dequeue using DBMS_JOB

A common practice is to utilise polling DBMS_JOBs, constantly calling the dequeue mechanism until a message appears. The advantages and disadvantages of this approach are :

Advantages Disadvantages So, while the DBMS_JOB approach is easy, it's not really the approach to use (in my opinion).

"Daemon"-type process

One common practice (and until 9i, in my opinion, the best option), is to simply create N PL/SQL processes which are constantly running and which follow the pseudo-code :
LOOP
  Attempt dequeue with wait FOREVER which blocks until message appears;
  process_message;
END LOOP;
The advantages are : The disadvantages are : In my opinion, this is the only real option at 8i and below.

9i PL/SQL Notification (Callback)

9i added a brilliant new feature which enables a much better mechanism for queueing than previous versions.

The problem with previous pre-9i mechanisms, i.e. DBMS_JOB / daemon, is that you would never develop a GUI application using the same concepts. Imagine it. I have a form with a button on it. I need to run some code when the button is pressed. Using the tenets of the pre-9i methods, I would need a timer / routine which is constantly executing, checking for the state of the button to move from "unpressed" to "pressed", and then execute the relevant code. This is how it, of course, would have to be done in a conventional "procedural" language, such as C, polling for an event to occur.

Callback routines are the equivalent (in a sense) of event-driven programming. In PL/SQL, AQ can be used to automatically call a procedure upon successful enqueue. This, in my opinion, is by far the best mechanism for utilising AQ, since it removes the need for any daemon or job-based mechanisms for dequeueing, thereby SIGNIFICANTLY simplifying the work done in both development and by Oracle in production.

Here's an example of how to use DBMS_AQ with callback :

Note, the queue table HAS to be created using multiple consumers, i.e.

SQL> BEGIN
  2    dbms_aqadm.create_queue_table(queue_table => 'queue_table',
  3                                   queue_payload_type => 'message_type',
  4                                   multiple_consumers=>TRUE);
  5   END;
  6  /



*DEV NOTE*
Attempting this with a user who has a large amount of roles (either direct or indirect) assigned (say a DBA user etc) may result in the following error :

SQL> BEGIN
  2    dbms_aqadm.create_queue_table(queue_table => 'queue_table',
  3                                  queue_payload_type => 'message_type',
  4                                  multiple_consumers=>TRUE);
  5  END;
  6  /
BEGIN
*
ERROR at line 1:
ORA-24166: evaluation context ORAUSER.AQ$_QUEUE_TABLE_V has errors
ORA-01925: maximum of 30 enabled roles exceeded
ORA-06512: at "SYS.DBMS_AQADM_SYS", line 2224
ORA-06512: at "SYS.DBMS_AQADM", line 58
ORA-06512: at line 2
In this case, you need to up the MAX_ENABLED_ROLES initialisation parameter, or reduce the number of roles that are assigned to the user.
Firstly, create a "dequeue" procedure which will get called automatically. Note, the parameters names / types are important and have to be like this.

CREATE OR REPLACE PROCEDURE p_dequeue ( context raw,
                                        reginfo sys.aq$_reg_info,
                                        descr sys.aq$_descriptor,
                                        payload raw,
                                        payloadl number)
as
 dequeue_options    dbms_aq.dequeue_options_t;
 message_properties dbms_aq.message_properties_t;
 message_handle     RAW(16);
 message            message_type;
BEGIN
   dequeue_options.msgid         := descr.msg_id;
   dequeue_options.consumer_name := descr.consumer_name;

   dbms_aq.dequeue(queue_name => descr.queue_name,
                   dequeue_options => dequeue_options,
                   message_properties => message_properties,
                   payload => message,
                   msgid => message_handle);

   /* Add the INSERT into an output table. This is so we can see it working.
      In reality, the logic to execute the process would be here */

   INSERT INTO output_table VALUES ( message.message_body );

   COMMIT;
END;


Now, add the receiving subscriber to the queue. Note, that these can be left "as is", they're more of a formality than anything

SQL> BEGIN
  2    dbms_aqadm.add_subscriber
  3      ( queue_name => 'EXAMPLE_QUEUE',
  4        subscriber => sys.aq$_agent('RECIPIENT', NULL, NULL) );
  5  END;
  6  /

PL/SQL procedure successfully completed.


Now, register the subscriber / procedure combination

SQL> BEGIN
  2    dbms_aq.register
  3      ( sys.aq$_reg_info_list (
  4          sys.aq$_reg_info ( 'EXAMPLE_QUEUE:RECIPIENT',
  5                             dbms_aq.namespace_aq,
  6                             'plsql://p_dequeue',
  7                             HEXTORAW('FF')
  8                           )
  9        ),
 10        1
 11      );
 12* END;
SQL> /

PL/SQL procedure successfully completed.


Now, we can enqueue and see the output (via output_table) immediately, proving that the p_dequeue process has been executed without the need for a seperate polling process.

SQL> EXEC p_enqueue('THIS IS A TEST');

PL/SQL procedure successfully completed.

SQL> SELECT * FROM output_table;

MSG
----------------------------------------------------------------------------------------------------
THIS IS A TEST


*DEV NOTE*
You can register notification off a queue for multiple processes (which can be plsql://, mailto://, web services etc) :

DECLARE
  reginfolist  sys.aq$_reg_info_list = sys.aq$_reg_info_list();
BEGIN
  reginfolist.EXTEND(3);
  reginfolist(1) := sys.aq$_reg_info ( 'EXAMPLE_QUEUE:RECIPIENT', dbms_aq.namespace_aq,'plsql://p_dequeue', HEXTORAW('FF') );
  reginfolist(2) := sys.aq$_reg_info ( 'EXAMPLE_QUEUE:RECIPIENT', dbms_aq.namespace_aq,'mailto://[email protected]', HEXTORAW('FF') );
  reginfolist(3) := sys.aq$_reg_info ( 'EXAMPLE_QUEUE:RECIPIENT', dbms_aq.namespace_aq,'http://webserver/servlet', HEXTORAW('FF') );

-- do the registration
  sys.dbms_aq.register(reginfolist, 3);
END;



Removing queues

It's quite simple really, simply, stop the queue, drop the queue then drop the queue table, i.e.

SQL> begin
  2    dbms_aqadm.stop_queue('EXAMPLE_QUEUE');
  3    dbms_aqadm.drop_queue('EXAMPLE_QUEUE');
  4    dbms_aqadm.drop_queue_table('queue_table');
  5  end;
  6  /

PL/SQL procedure successfully completed.



Monitoring Queues

You can utilise the DBA_QUEUES / DBA_QUEUE_TABLES and V$AQ views to monitor the state of your queues (and all other queues in the oracle database). The following query will show you basic information about the queues (in this case, EXAMPLE_QUEUE that we created earlier) :

SQL> SELECT
  2    a.owner,
  3    a.name,
  4    a.queue_type,
  5    a.queue_table,
  6    a.retention,
  7    a.enqueue_enabled,
  8    a.dequeue_enabled,
  9     b.waiting,
 10     b.ready,
 11     b.expired,
 12     b.total_wait,
 13     b.average_wait
 14   FROM
 15     dba_queues a,
 16     v$aq b
 17   WHERE a.qid = b.qid
 18   AND  a.name = 'EXAMPLE_QUEUE'
 19  /

OWNER                          NAME                           QUEUE_TYPE           QUEUE_TABLE
------------------------------ ------------------------------ -------------------- -----------------
RETENTION                                ENQUEUE DEQUEUE    WAITING      READY    EXPIRED TOTAL_WAIT AVERAGE_WAIT
---------------------------------------- ------- ------- ---------- ---------- ---------- ----------
ORAUSER                        EXAMPLE_QUEUE                  NORMAL_QUEUE         QUEUE_TABLE
0                                          YES     YES            0          0          0          0            0


In order to understand what is being seen here, you have to understand about AQ message states. Basically, each queue table you create has a STATE column which shows what the state of the message is.

The full list of message states is explained in Metalink Note,
102330.1.

However, as an overview, the state column can have a value of (0,1,2,3), the list is :

State Value Explanation
0 READY The message is ready to be processed, i.e. either the message delay time has passed, or there was none specified
1 WAITING The delay specified by message_properties_t.delay has not been reached
2 RETAINED or PROCESSED The message has been successfully dequeued, and will remain in the queue for as long as was specified as the retention time when creating the queue
3 EXPIRED The message has not been successfully dequeued, because either
  1. the time specified by message_properties_t.expiration while executing dbms_aq.enqueue has elapsed, or
  2. the maximum number of dequeue attempts (max_retries) specified for the queue while executing dbms_aqadm.create_queue has been reached

*IMPORTANT*
Messages with a STATE of 2 will not appear in V$AQ.

Queue Performance

It's very difficult to precisely quantify the "performance" of AQ, since every situation will be so different, that one set of benchmarks will probably have no relevance to another situation. However, given what we currently know, there are a few general points which we can make which highlight why AQ is such a good solution :

Session Handling within Queues

It's very worthwhile knowing various things about sessions when dealing with system processes such as DBMS_JOBs and DBMS_AQs.

The common way to get your sessions "session id" is to use one of the following context calls :

SQL> SELECT userenv('sessionid'), sys_context('USERENV', 'SESSIONID')
  2  FROM dual;

USERENV('SESSIONID') SYS_CONTEXT('USERENV','SESSIONID')
-------------------- ---------------------------------------------------
                3006 3006


Note, the differing return datatypes of the functions for sessionid, USERENV returns a numeric, SYS_CONTEXT returns a string.
SYS_CONTEXT is Oracle's preferred method going forward, however, just remember to TO_NUMBER it when you use it, when appropriate.

The sessionid corresponds to the entry in the AUDSID column in V$SESSION for your session, i.e.

SQL> SELECT sid, serial#, audsid, username, program
  2  FROM   v$session
  3  /

     SID    SERIAL#     AUDSID USERNAME   PROGRAM
-------- ---------- ---------- ---------- -------------------
       1          1          0            ORACLE.EXE
       2          1          0            ORACLE.EXE
       3          1          0            ORACLE.EXE
       4          1          0            ORACLE.EXE
       5          1          0            ORACLE.EXE
       6          1          0            ORACLE.EXE
       7          1          0            ORACLE.EXE
       8          1          0            ORACLE.EXE
      11         54          0            ORACLE.EXE
      13        875       3006 ORAUSER    sqlplusw.exe

10 rows selected.


However, as you can see from the above query, all system-type processes run with an AUDSID of 0, and therefore, this isn't much use as a "unique session identifier" which is useful when trying to communicate between distinct processes.

The other thing to bear in mind, is that DBMS_JOBs, DBMS_AQs etc, run as SYS, i.e. these packages are definer's rights packages in the SYS schema. We can show all this, by modifying our P_DEQUEUE procedure that we've got registered against the queue to add to the INSERT into an output table of the user / session id / sid, to see what's actually going on :

CREATE OR REPLACE PROCEDURE plsqlCallback ( context raw,
                                            reginfo sys.aq$_reg_info,
                                            descr sys.aq$_descriptor,
                                            payload raw,
                                            payloadl number)
as
 dequeue_options    dbms_aq.dequeue_options_t;
 message_properties dbms_aq.message_properties_t;
 message_handle     RAW(16);
 message            message_type;
BEGIN
   dequeue_options.msgid         := descr.msg_id;
   dequeue_options.consumer_name := descr.consumer_name;

   DBMS_AQ.DEQUEUE(queue_name => descr.queue_name,
                   dequeue_options => dequeue_options,
                   message_properties => message_properties,
                   payload => message,
                   msgid => message_handle);



   INSERT INTO output_table ( msg )
   SELECT
     user || ' : ' ||
     SYS_CONTEXT('userenv', 'sessionid') || ' : ' ||
     sid
   FROM v$mystat
   WHERE rownum = 1;

   COMMIT;
END;
/

SQL> SELECT *
  2    FROM output_table;

MSG
--------------------------------
SYS : 0 : 18


So, what options do we have? Well, the obvious one, I suppose, is to include the clients session id in the enqueue'd message. The object type will have the session_id in it. This can then be used by the dequeue-ing process to identify any session-specific information which, for example, is stored in a "parameters" table etc. This does have the disadvantage of increasing message size, but typically, AUDSID (or SID, which maybe is better), wouldn't take up a huge amount of space in the object TYPE anyway.