Hi,
I try to share my experience/knowledge on reading the messages from (IBM) MQ in client mode, while I was working I try to gather the details through different sources but get very less information in order to finish my task on MQ part. Initially I struggle a lot it made me to write this blog, if you want read the IBM MQ messages using java client you can use this following code...
Reading the MQ Messages in Client mode:
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQManagedObject;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
public class QueueReader{
private MQQueueManager _queueManager = null;
public String qManager = null;
public String inputQName = null;
static Properties prop = null;
//For Backout queue
private int[] selectors = {CMQC.MQIA_BACKOUT_THRESHOLD,CMQC.MQCA_BACKOUT_REQ_Q_NAME };
private int[] intAttrs = new int[1];
private byte[] charAttrs = new byte[CMQC.MQ_Q_NAME_LENGTH];
static final Logger logger = Logger.getLogger(QueueReader.class);
static{
InputStream input = null;
try{
PropertyConfigurator.configure("log4j.properties");
input = new FileInputStream("config.properties");
prop = new Properties();
prop.load(input);
logger.info("dbFeed configurations loaded ");
}
catch(FileNotFoundException fnfe){
logger.error("Error while Loading config.properties:"+fnfe.getMessage());
}
catch(IOException ioe){
logger.error("Error while reading config.properties:"+ioe.getMessage());
}
}
public QueueReader{
super();
}
private void init(Properties props) throws IllegalArgumentException {
MQEnvironment.hostname=props.getProperty("HOSTNAME");
MQEnvironment.channel=props.getProperty("CHANNEL");
MQEnvironment.port=Integer.parseInt(props.getProperty("PORT"));
MQEnvironment.properties.put(CMQC.TRANSPORT_PROPERTY, CMQC.TRANSPORT_MQSERIES);
inputQName= props.getProperty("QUEUENAME");
logger.info("QName:"+inputQName);
qManager= props.getProperty("QUEUEMANAGER");
logger.info("dbFeed configurations loaded: qManager "+qManager);
}
public static void main(String[] args){
QueueReaderNew readQ = new QueueReaderNew();
try{
MQException.log = null;
logger.info(" before initialization");
readQ.init(prop);
readQ.selectQMgr();
logger.info(" Before calling the read()");
readQ.read(prop);
logger.info("after calling read from Queue ::");
}
catch(MQException e) {
logger.error("Error Message::"+e.getMessage());
}
catch(Exception ex) {
logger.info("Got the exception:"+ex.getMessage());
}
}
private void selectQMgr() throws MQException {
logger.info("qManager:"+qManager);
_queueManager = new MQQueueManager(qManager);
}
private void read(Properties props) throws MQException,Exception {
byte byteMsg[]=null;
String STATUS="";
logger.info("::: In MQ Read method :::");
int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF | CMQC.MQOO_INQUIRE | CMQC.MQOO_SAVE_ALL_CONTEXT;
MQQueue queue = _queueManager.accessQueue(inputQName, openOptions, null, null, null);
logger.info("MQRead v1.0 connected.");
int depth = queue.getCurrentDepth();
logger.info("Current Queue:" +depth );
MQManagedObject moMyQueue = (MQManagedObject)queue;
moMyQueue.inquire(selectors, intAttrs, charAttrs);
String myPutQname = new String(charAttrs);
logger.info("Backout Queue name: " + myPutQname);
logger.info(".... Reading messages one by one ....");
logger.info(".... Reading messages one by one ....");
MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.options = CMQC.MQGMO_SYNCPOINT | CMQC.MQGMO_WAIT ;
getOptions.waitInterval = CMQC.MQWI_UNLIMITED;
MQMessage message = new MQMessage();
while(true){
try {
message.clearMessage();
message.correlationId = CMQC.MQCI_NONE;
message.messageId = CMQC.MQMI_NONE;
depth = queue.getCurrentDepth();
logger.info("current depth"+depth);
queue.get(message, getOptions);
logger.info("Before RedFully");
byteMsg = new byte[message.getDataLength()];
message.readFully(byteMsg);
logger.info("Message read into byte array");
/* PROCESS THE MQ MESSAGE by calling any other method[processMQMessage()], after processing the message
this method will return the Flag like SUCCESS or FAIL, if STATUS flag is SUCCESS commit the message on
Queue or else(FAIL) message will be send to backout queue*/
STATUS = processMQMessage(byteMsg);
logger.info(" STATUS in QueueReader ::"+STATUS);
if(STATUS!=null && !STATUS.equals("SUCCESS") ){
/*commit the message on Main queue, to consume the message from queue and
sending the message to backout queue. */
_queueManager.commit();
logger.info("Message is poison, moving to " + myPutQname);
int openPutOptions = CMQC.MQOO_OUTPUT+CMQC.MQOO_FAIL_IF_QUIESCING+CMQC.MQOO_PASS_ALL_CONTEXT;
MQQueue boQueue = _queueManager.accessQueue(myPutQname, openPutOptions, null, null, null);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = pmo.options | CMQC.MQPMO_SYNCPOINT|CMQC.MQPMO_PASS_ALL_CONTEXT;
pmo.contextReference = queue;
boQueue.put(message, pmo);
_queueManager.commit();
boQueue.close();
logger.info("messages sent to backout");
}
else{
_queueManager.commit();
logger.info("...messages processed successfuly....");
}
}//end of try
catch (MQException e) {
logger.info("Error Message::"+e.getMessage());
if (e.completionCode == 2 && e.reasonCode == MQException.MQRC_NO_MSG_AVAILABLE) {
if (depth > 0)
logger.info("All messages read.");
}
else
logger.info("MQException: Completion Code = " + e.completionCode + " : Reason Code = " + e.reasonCode);
break;
}
byteMsg=null;
}
/** As this process is going to running continuously no need of closing the Q & Q manager */
queue.close();
_queueManager.commit();
_queueManager.disconnect();
}
}
**
This will read the MQ messages continually even if the queue depth is zero this will wait for the next message come-in to process it. If you terminate the process when the queue depth is zero you can replace the following two lines..
getOptions.options = CMQC.MQGMO_SYNCPOINT | CMQC.MQGMO_WAIT ; getOptions.waitInterval = CMQC.MQWI_UNLIMITED
with the following..
getOptions.options = CMQC.MQGMO_SYNCPOINT | CMQC.MQGMO_NO_WAIT ;
with the following..
getOptions.options = CMQC.MQGMO_SYNCPOINT | CMQC.MQGMO_NO_WAIT ;
if you have any comments let know...
Comments
Post a Comment