Advantages of using queuing:
2. Oracle AQ package (DBMS_AQ) which provides interface for enqueue and de-queue messages into the queue.
- Oracle’s AQ (Advanced Queuing) is one of the extremely powerful Oracle Database message queuing based on Oracle Streams. This is scalable and reliable as weblogic’s JMS.
- This allows us to achieve de(or loose)-coupling and agility in real world. De-coupling is the measurement of dependency between the actual service provider and the service consumer(s). For a better function in SOA, this level of dependency should be low as possible. By Queing, the service consumer can publish the request & make them available for the provider which may or may not be available at when the request is published.
- AQ in-build Packages : There are two in-build packages which we need to know before we jump in
2. Oracle AQ package (DBMS_AQ) which provides interface for enqueue and de-queue messages into the queue.
Lets start:
Step1: Create the needed db-user and provide the required grants.
create user shop_keeper identified by shop_keeper;
grant create session, resource, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to shop_keeper;
grant execute on DBMS_AQADM to shop_keeper;
grant execute on DBMS_AQ to shop_keeper;
Step2: Define the structure of message which is passing through the queue.
Type is defined as Object (obviously it will have unique name) having multiple fields (defined using varchar, date, timeStamp,etc).
CREATE OR REPLACE TYPE ITEM_TYPE AS OBJECT (
ITEM_ID NUMBER(4),
ITEM_NAME VARCHAR2(30),
ITEM_PUBLISHER VARCHAR2(30)
);
desc ITEM_TYPE
Step3: Create a queue table for persisting the messages and its associated Queue
exec DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'SHOP_KEEPER.news_magazine_t', queue_payload_type => 'SHOP_KEEPER.ITEM_TYPE');
exec DBMS_AQADM.CREATE_QUEUE(queue_name => 'SHOP_KEEPER.news_magazine_q', queue_table => 'SHOP_KEEPER.news_magazine_t');
Step4: Start the queue using DBMS_AQADM package
exec SYS.DBMS_AQADM.START_QUEUE('SHOP_KEEPER.news_magazine_q');
Query to see the table and queue in DB:
select * from all_queues where name like '%NEWS_MAGAZINE_Q%'
Stumbled issue 1:
While executing below command, get the error
exec DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'SHOP_KEEPER.news_magazine_t', queue_payload_type => 'SHOP_KEEPER.ITEM_TYPE');
Error :
Error starting at line : 9 in command -
exec DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'SHOP_KEEPER.news_magazine_t', queue_payload_type => 'SHOP_KEEPER.ITEM_TYPE')
Error report -
ORA-01950: no privileges on tablespace 'USERS'
ORA-06512: at "SYS.DBMS_AQADM", line 115
ORA-06512: at line 1
01950. 00000 - "no privileges on tablespace '%s'"
*Cause: User does not have privileges to allocate an extent in the
specified tablespace.
*Action: Grant the user the appropriate system privileges or grant the user
space resource on the tablespace.
Solution:
Just executed this command.
GRANT UNLIMITED TABLESPACE TO <user name>;
Step6: Create an
AQ Connection Factory
Stumbled issue 2:
An error occurred during activation of changes, please see
the log for details.
Message icon - Error java.lang.IllegalArgumentException: /app/Oracle/Middleware/Oracle_Home/soa/soa/Plan.xml
(A file or directory in the path name does not exist.)
Message icon - Error
/app/Oracle/Middleware/Oracle_Home/soa/soa/Plan.xml (A file or directory in the
path name does not exist.)
Solution:
Move that plan.xml file from node 1 to node 2
and then activate the changes.
Step7: Create Enqueue adapter
XSD used:
<schema targetNamespace="http://xmlns.oracle.com/xdb/SHOP_KEEPER" xmlns="http://www.w3.org/2001/XMLSchema" xmlns:SHOP_KEEPER="http://xmlns.oracle.com/xdb/SHOP_KEEPER" elementFormDefault="unqualified" attributeFormDefault="qualified">
<complexType name="ITEM_TYPE">
<sequence>
<element name="ITEM_ID" type="double" nillable="true" minOccurs="0"/>
<element name="ITEM_NAME" nillable="true" minOccurs="0">
<simpleType>
<restriction base="string">
<maxLength value="30"/>
</restriction>
</simpleType>
</element>
<element name="ITEM_PUBLISHER" nillable="true" minOccurs="0">
<simpleType>
<restriction base="string">
<maxLength value="30"/>
</restriction>
</simpleType>
</element>
</sequence>
</complexType>
<element name="ITEM_TYPE" type="SHOP_KEEPER:ITEM_TYPE" />
</schema>
Step8: oneway BPEL invoke and transform to send data to AQ
Now Deploy and test.
During deploy we get below Metadata error for connection
factory,
Stumbled issue 3:
There was an error deploying the composite on soa_server2:
Operation failed - Member(Id=6 Timestamp=2018-12-04 23:44:26.071
Address=10.48.10.134:39640 MachineId=32196
Location=machine:testMachine process:13238290 member:soa_server1
Role=soa_cluster):Error while validating JCA Reference Binding meta data during
composite deployment. : JCA deployment validation errors for
'Adapters/aqPublish_aq.jca'
Solution:
Restart
both the SOA servers.
Test From Em console:
Query to manually test/enqueuer the message in the Queue table:
DECLARE
queue_option dbms_aq.enqueue_options_t;
msg_properties dbms_aq.message_properties_t;
payload item_type;
msg_id VARCHAR(50);
BEGIN
payload := item_type(1, 'The Times of India', 'The Times Group');
dbms_aq.ENQUEUE(queue_name => 'NEWS_MAGAZINE_Q',
enqueue_options => queue_option,
message_properties => msg_properties,
payload => payload
, msgid => msg_id);
--dbms_output.put_line('Message successfully Enqueued WITH messageId:'|| msg_id);
COMMIT;
END;
/
Step9: Dequeue from the AQ