Friday, September 6, 2019

Oracle 12c - AQ Adapter Enqueue and Dequeue

Advantages of using queuing:

  • Oracle’s AQ (Advanced Queuing) is one of the extremely powerful Oracle Database message queuing based on Oracle Streams. This is scalable and reliable as weblogic’s JMS.
  • This allows us to achieve de(or loose)-coupling and agility in real world. De-coupling is the measurement of dependency between the actual service provider and the service consumer(s). For a better function in SOA, this level of dependency should be low as possible. By Queing, the service consumer can publish the request & make them available for the provider which may or may not be available at when the request is published.
  • AQ in-build Packages : There are two in-build packages which we need to know before we jump in
    1. Oracle AQ administration package (DBMS_AQADM) which helps us by providing AQ’s         administrative task like managing (create/delete/alter) queue table, privileged programs (grand/revoke), state of queue (start/stop), etc.
   2. Oracle AQ package (DBMS_AQ) which provides interface for enqueue and de-queue messages into the queue.

Lets start:

Step1: Create the needed db-user and provide the required grants.
    create user shop_keeper identified by shop_keeper;
    grant create session, resource, AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE to shop_keeper;
    grant execute on DBMS_AQADM to shop_keeper;
    grant execute on DBMS_AQ to shop_keeper;
    
Step2: Define the structure of message which is passing through the queue. 
Type is defined as Object (obviously it will have unique name) having multiple fields (defined using varchar, date, timeStamp,etc).

CREATE OR REPLACE TYPE ITEM_TYPE AS OBJECT (
ITEM_ID NUMBER(4),
ITEM_NAME VARCHAR2(30),
ITEM_PUBLISHER VARCHAR2(30)
);
desc ITEM_TYPE

Step3: Create a queue table for persisting the messages and its associated Queue
exec DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'SHOP_KEEPER.news_magazine_t', queue_payload_type => 'SHOP_KEEPER.ITEM_TYPE');

exec DBMS_AQADM.CREATE_QUEUE(queue_name => 'SHOP_KEEPER.news_magazine_q', queue_table => 'SHOP_KEEPER.news_magazine_t');

Step4: Start the queue using DBMS_AQADM package
exec SYS.DBMS_AQADM.START_QUEUE('SHOP_KEEPER.news_magazine_q');
Query to see the table and queue in DB:
select * from all_queues where name like '%NEWS_MAGAZINE_Q%'

Stumbled issue 1:
While executing below command, get the error
exec DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'SHOP_KEEPER.news_magazine_t', queue_payload_type => 'SHOP_KEEPER.ITEM_TYPE');

Error :
Error starting at line : 9 in command -
exec DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'SHOP_KEEPER.news_magazine_t', queue_payload_type => 'SHOP_KEEPER.ITEM_TYPE')
Error report -
ORA-01950: no privileges on tablespace 'USERS'
ORA-06512: at "SYS.DBMS_AQADM", line 115
ORA-06512: at line 1
01950. 00000 -  "no privileges on tablespace '%s'"
*Cause:    User does not have privileges to allocate an extent in the
           specified tablespace.
*Action:   Grant the user the appropriate system privileges or grant the user
           space resource on the tablespace.

Solution:
Just executed this command.
GRANT UNLIMITED TABLESPACE TO <user name>;

Step5: Create a Datasource in admin console.








Step6: Create an AQ  Connection Factory









Stumbled issue  2:
An error occurred during activation of changes, please see the log for details.

Message icon - Error java.lang.IllegalArgumentException: /app/Oracle/Middleware/Oracle_Home/soa/soa/Plan.xml (A file or directory in the path name does not exist.)
Message icon - Error /app/Oracle/Middleware/Oracle_Home/soa/soa/Plan.xml (A file or directory in the path name does not exist.)
Solution:
Move that plan.xml file from node 1 to node 2 and then activate the changes.

Step7: Create Enqueue adapter











XSD used:

<schema targetNamespace="http://xmlns.oracle.com/xdb/SHOP_KEEPER" xmlns="http://www.w3.org/2001/XMLSchema" xmlns:SHOP_KEEPER="http://xmlns.oracle.com/xdb/SHOP_KEEPER" elementFormDefault="unqualified" attributeFormDefault="qualified">
   <complexType name="ITEM_TYPE">
      <sequence>
         <element name="ITEM_ID" type="double" nillable="true" minOccurs="0"/>
         <element name="ITEM_NAME" nillable="true" minOccurs="0">
            <simpleType>
               <restriction base="string">
                  <maxLength value="30"/>
               </restriction>
            </simpleType>
         </element>
         <element name="ITEM_PUBLISHER" nillable="true" minOccurs="0">
            <simpleType>
               <restriction base="string">
                  <maxLength value="30"/>
               </restriction>
            </simpleType>
         </element>
      </sequence>
   </complexType>
   <element name="ITEM_TYPE" type="SHOP_KEEPER:ITEM_TYPE" />
</schema>


Step8: oneway BPEL invoke and transform to send data to AQ




Now Deploy and test.
During deploy we get below Metadata error for connection factory,

Stumbled issue 3:
There was an error deploying the composite on soa_server2: Operation failed - Member(Id=6 Timestamp=2018-12-04 23:44:26.071 Address=10.48.10.134:39640 MachineId=32196 Location=machine:testMachine process:13238290 member:soa_server1 Role=soa_cluster):Error while validating JCA Reference Binding meta data during composite deployment. : JCA deployment validation errors for 'Adapters/aqPublish_aq.jca' 
Solution:
Restart both the SOA servers.

Test From Em console:



Query to manually test/enqueuer the message in the Queue table:

DECLARE
  queue_option dbms_aq.enqueue_options_t;
  msg_properties dbms_aq.message_properties_t;
  payload item_type;
  msg_id VARCHAR(50);
BEGIN
  payload := item_type(1, 'The Times of India', 'The Times Group');
dbms_aq.ENQUEUE(queue_name => 'NEWS_MAGAZINE_Q',
enqueue_options => queue_option,
message_properties => msg_properties,
payload => payload
, msgid => msg_id);
--dbms_output.put_line('Message successfully Enqueued WITH messageId:'|| msg_id);
COMMIT;
END;
/

Step9: Dequeue from the AQ








Test and deploy:




No comments:

Post a Comment

Featured Post

11g to 12c OSB projects migration points

1. Export 11g OSB code and import in 12c Jdeveloper. Steps to import OSB project in Jdeveloper:   File⇾Import⇾Service Bus Resources⇾ Se...