From 195161c909313e0b634a427c504e746dab07c375 Mon Sep 17 00:00:00 2001 From: rolefhei Date: Wed, 20 Sep 2017 10:29:44 +0200 Subject: [PATCH 1/6] --data-directory added to Makefiles of samples --- samples/JMSSample/Makefile | 2 +- samples/JmsWithXmlParse/Makefile | 2 +- samples/JmsWithXmlParseBytes/Makefile | 2 +- samples/KafkaConsistentRegionConsumerParallel/Makefile | 2 +- samples/KafkaSASLSample/Makefile | 2 +- samples/KafkaSample/Makefile | 2 +- samples/MqttSample/Makefile | 2 +- samples/RabbitMQSample/Makefile | 2 +- samples/XMSSample/Makefile | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/samples/JMSSample/Makefile b/samples/JMSSample/Makefile index d8f11dc..35ef7cf 100644 --- a/samples/JMSSample/Makefile +++ b/samples/JMSSample/Makefile @@ -4,7 +4,7 @@ STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging -SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} +SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} --data-directory=data SPLC = $(STREAMS_INSTALL)/bin/sc diff --git a/samples/JmsWithXmlParse/Makefile b/samples/JmsWithXmlParse/Makefile index 49e54e6..89948a1 100644 --- a/samples/JmsWithXmlParse/Makefile +++ b/samples/JmsWithXmlParse/Makefile @@ -4,7 +4,7 @@ STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging -SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} +SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} --data-directory=data SPLC = $(STREAMS_INSTALL)/bin/sc diff --git a/samples/JmsWithXmlParseBytes/Makefile b/samples/JmsWithXmlParseBytes/Makefile index b3acc60..61b3739 100644 --- a/samples/JmsWithXmlParseBytes/Makefile +++ b/samples/JmsWithXmlParseBytes/Makefile @@ -4,7 +4,7 @@ STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging -SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} +SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} --data-directory=data SPLC = $(STREAMS_INSTALL)/bin/sc diff --git a/samples/KafkaConsistentRegionConsumerParallel/Makefile b/samples/KafkaConsistentRegionConsumerParallel/Makefile index 60b28f2..b78b199 100644 --- a/samples/KafkaConsistentRegionConsumerParallel/Makefile +++ b/samples/KafkaConsistentRegionConsumerParallel/Makefile @@ -19,4 +19,4 @@ data: mkdir data clean: - $(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE) \ No newline at end of file + $(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE) diff --git a/samples/KafkaSASLSample/Makefile b/samples/KafkaSASLSample/Makefile index 17380ae..c3518dc 100644 --- a/samples/KafkaSASLSample/Makefile +++ b/samples/KafkaSASLSample/Makefile @@ -4,7 +4,7 @@ STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging:$(STREAMS_STUDIO_SPL_PATH) -SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} +SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} --data-directory=data SPLC = $(STREAMS_INSTALL)/bin/sc diff --git a/samples/KafkaSample/Makefile b/samples/KafkaSample/Makefile index d28776d..9a62619 100644 --- a/samples/KafkaSample/Makefile +++ b/samples/KafkaSample/Makefile @@ -4,7 +4,7 @@ STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging:$(STREAMS_STUDIO_SPL_PATH) -SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} +SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} --data-directory=data SPLC = $(STREAMS_INSTALL)/bin/sc diff --git a/samples/MqttSample/Makefile b/samples/MqttSample/Makefile index b19412e..adf8fd5 100644 --- a/samples/MqttSample/Makefile +++ b/samples/MqttSample/Makefile @@ -4,7 +4,7 @@ STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging -SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} +SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} --data-directory=data SPLC = $(STREAMS_INSTALL)/bin/sc diff --git a/samples/RabbitMQSample/Makefile b/samples/RabbitMQSample/Makefile index bd070e0..d3f923e 100644 --- a/samples/RabbitMQSample/Makefile +++ b/samples/RabbitMQSample/Makefile @@ -4,7 +4,7 @@ STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging -SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} +SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} --data-directory=data SPLC = $(STREAMS_INSTALL)/bin/sc diff --git a/samples/XMSSample/Makefile b/samples/XMSSample/Makefile index 8acd499..df6b631 100644 --- a/samples/XMSSample/Makefile +++ b/samples/XMSSample/Makefile @@ -5,7 +5,7 @@ STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging -SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} +SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} --data-directory=data SPLC = $(STREAMS_INSTALL)/bin/sc From fc8d01c6b4966ea062846206388742c9ebf0be7e Mon Sep 17 00:00:00 2001 From: rolefhei Date: Wed, 20 Sep 2017 10:59:43 +0200 Subject: [PATCH 2/6] issue #319 --- .../messaging/jms/JMSConnectionHelper.java | 47 +++++++++++++++---- .../ibm/streamsx/messaging/jms/JMSSource.java | 5 +- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java index be7da17..f134029 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java @@ -4,16 +4,17 @@ *******************************************************************************/ package com.ibm.streamsx.messaging.jms; -import com.ibm.streams.operator.metrics.Metric; -import com.ibm.streamsx.messaging.common.PropertyProvider; - +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; + import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; @@ -24,12 +25,15 @@ import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; + import com.ibm.streams.operator.logging.LogLevel; +import com.ibm.streams.operator.metrics.Metric; +import com.ibm.streamsx.messaging.common.PropertyProvider; /* This class contains all the connection related information, creating maintaining and closing a connection to the JMSProvider * Sending and Receiving JMS messages */ -class JMSConnectionHelper { +class JMSConnectionHelper implements ExceptionListener { // variables required to create connection // connection factory @@ -182,7 +186,8 @@ private Connection getConnect() { // logger to get error messages private Logger logger; - + private Logger tracer = Logger.getLogger(this.getClass().getName()); + // This constructor sets the parameters required to create a connection for // JMSSource JMSConnectionHelper(ConnectionDocumentParser connectionDocumentParser,ReconnectionPolicies reconnectionPolicy, @@ -226,11 +231,27 @@ private Connection getConnect() { } - // Method to create the initial connection + + /** + * Called asynchronously to notify problems with the connection + * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) + */ + @Override + public void onException (JMSException ex) { + tracer.log(LogLevel.ERROR, "onException: " + ex.toString()); + try { + this.recoverSession(); + } catch (JMSException | ConnectionException e) { + tracer.log(LogLevel.ERROR, "onException: " + e.toString()); + } catch (InterruptedException e) { + // ignore interruption of notification thread + } + } + + // Method to create the initial connection public void createInitialConnection() throws ConnectionException, InterruptedException { createConnection(); - return; } @@ -369,7 +390,8 @@ private boolean connect(boolean isProducer) throws JMSException { setConnect(connFactory.createConnection(userPrincipal, userCredential)); else setConnect(connFactory.createConnection()); - + getConnect().setExceptionListener (this); + // Create session from connection; false means // session is not transacted. @@ -394,6 +416,7 @@ private boolean connect(boolean isProducer) throws JMSException { getProducerCR().setTimeToLive(TimeUnit.MILLISECONDS.convert(7L, TimeUnit.DAYS)); getProducerCR().setDeliveryMode(DeliveryMode.PERSISTENT); // start the connection + tracer.log (LogLevel.INFO, "going to start the connection for producer in client acknowledge mode ..."); getConnect().start(); } @@ -415,8 +438,10 @@ private boolean connect(boolean isProducer) throws JMSException { // Its JMSSource, So we will create a consumer setConsumer(getSession().createConsumer(dest, messageSelector)); // start the connection + tracer.log (LogLevel.INFO, "going to start consumer connection ..."); getConnect().start(); } + tracer.log (LogLevel.INFO, "connection successfully created"); // create connection is successful, return true return true; } @@ -506,6 +531,7 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce } } catch (JMSException e) { + tracer.log (LogLevel.ERROR, e.toString()); // If the JMSSource operator was interrupted in middle if (e.toString().contains("java.lang.InterruptedException")) { //$NON-NLS-1$ throw new java.lang.InterruptedException(); @@ -523,7 +549,6 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce synchronized (getSession()) { return (getConsumer().receive(timeout)); } - } } @@ -583,10 +608,13 @@ public void recoverSession() throws JMSException, ConnectionException, Interrupt try { synchronized (getSession()) { + tracer.log(LogLevel.INFO, "recoverSession"); getSession().recover(); + tracer.log(LogLevel.INFO, "recoverSession - session recovered"); } } catch (JMSException e) { + tracer.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); //$NON-NLS-1$ setConnect(null); createConnection(); @@ -594,7 +622,6 @@ public void recoverSession() throws JMSException, ConnectionException, Interrupt synchronized (getSession()) { getSession().recover(); } - } } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index cdf2e09..3b4dd6b 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -9,13 +9,10 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.UnsupportedEncodingException; -import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; -import java.util.Set; -import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.JMSException; @@ -746,7 +743,7 @@ protected void process() throws UnsupportedEncodingException, throw e1; } - long timeout = isInConsistentRegion ? JMSSource.RECEIVE_TIMEOUT : 0; + long timeout = JMSSource.RECEIVE_TIMEOUT; long sessionCreationTime = 0; while (!Thread.interrupted()) { From 2f3518b3e7c0f791f942cb6649bcf664a1a104a4 Mon Sep 17 00:00:00 2001 From: rolefhei Date: Wed, 20 Sep 2017 14:12:28 +0200 Subject: [PATCH 3/6] fixed crash of JMSSink on connection loss --- .../messaging/jms/JMSConnectionHelper.java | 41 ++++++----- .../ibm/streamsx/messaging/jms/JMSSink.java | 69 +++++++++++-------- 2 files changed, 65 insertions(+), 45 deletions(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java index f134029..2b237a0 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java @@ -4,8 +4,6 @@ *******************************************************************************/ package com.ibm.streamsx.messaging.jms; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -186,7 +184,7 @@ private Connection getConnect() { // logger to get error messages private Logger logger; - private Logger tracer = Logger.getLogger(this.getClass().getName()); + private static final Logger tracer = Logger.getLogger(JMSConnectionHelper.class.getName()); // This constructor sets the parameters required to create a connection for // JMSSource @@ -238,11 +236,11 @@ private Connection getConnect() { */ @Override public void onException (JMSException ex) { - tracer.log(LogLevel.ERROR, "onException: " + ex.toString()); + tracer.log(LogLevel.ERROR, "onException: " + ex.toString()); //$NON-NLS-1$ try { this.recoverSession(); } catch (JMSException | ConnectionException e) { - tracer.log(LogLevel.ERROR, "onException: " + e.toString()); + tracer.log(LogLevel.ERROR, "onException: " + e.toString()); //$NON-NLS-1$ } catch (InterruptedException e) { // ignore interruption of notification thread } @@ -416,7 +414,7 @@ private boolean connect(boolean isProducer) throws JMSException { getProducerCR().setTimeToLive(TimeUnit.MILLISECONDS.convert(7L, TimeUnit.DAYS)); getProducerCR().setDeliveryMode(DeliveryMode.PERSISTENT); // start the connection - tracer.log (LogLevel.INFO, "going to start the connection for producer in client acknowledge mode ..."); + tracer.log (LogLevel.INFO, "going to start the connection for producer in client acknowledge mode ..."); //$NON-NLS-1$ getConnect().start(); } @@ -438,10 +436,10 @@ private boolean connect(boolean isProducer) throws JMSException { // Its JMSSource, So we will create a consumer setConsumer(getSession().createConsumer(dest, messageSelector)); // start the connection - tracer.log (LogLevel.INFO, "going to start consumer connection ..."); + tracer.log (LogLevel.INFO, "going to start consumer connection ..."); //$NON-NLS-1$ getConnect().start(); } - tracer.log (LogLevel.INFO, "connection successfully created"); + tracer.log (LogLevel.INFO, "connection successfully created"); //$NON-NLS-1$ // create connection is successful, return true return true; } @@ -531,7 +529,7 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce } } catch (JMSException e) { - tracer.log (LogLevel.ERROR, e.toString()); + tracer.log (LogLevel.ERROR, "receiveMessage - " + e.toString()); //$NON-NLS-1$ // If the JMSSource operator was interrupted in middle if (e.toString().contains("java.lang.InterruptedException")) { //$NON-NLS-1$ throw new java.lang.InterruptedException(); @@ -608,13 +606,13 @@ public void recoverSession() throws JMSException, ConnectionException, Interrupt try { synchronized (getSession()) { - tracer.log(LogLevel.INFO, "recoverSession"); + tracer.log(LogLevel.INFO, "recoverSession"); //$NON-NLS-1$ getSession().recover(); - tracer.log(LogLevel.INFO, "recoverSession - session recovered"); + tracer.log(LogLevel.INFO, "recoverSession - session recovered"); //$NON-NLS-1$ } } catch (JMSException e) { - tracer.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); + tracer.log(LogLevel.INFO, "attempting to reconnect"); //$NON-NLS-1$ logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); //$NON-NLS-1$ setConnect(null); createConnection(); @@ -639,14 +637,25 @@ public void roolbackSession() throws JMSException { } } - // close the connection - public void closeConnection() throws JMSException { + // close and invalidate the connection + public void closeConnection() { if (getSession() != null) { - getSession().close(); + try { + getSession().close(); + } catch (JMSException e) { + // ignore + } } if (getConnect() != null) { - getConnect().close(); + try { + getConnect().close(); + } catch (JMSException e) { + // ignore + } + finally { + setConnect(null); + } } } } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java index 96c808d..7e59478 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java @@ -8,12 +8,8 @@ import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; import java.util.logging.Logger; import javax.jms.Message; @@ -60,6 +56,7 @@ public class JMSSink extends AbstractOperator implements StateHandler{ */ private static Logger logger = Logger.getLogger(LoggerNames.LOG_FACILITY + "." + CLASS_NAME, "com.ibm.streamsx.messaging.jms.JMSMessages"); //$NON-NLS-1$ //$NON-NLS-2$ + private static final Logger tracer = Logger.getLogger(CLASS_NAME); // property names used in message header public static final String OP_CKP_NAME_PROPERTITY = "StreamsOperatorCkpName"; //$NON-NLS-1$ @@ -721,47 +718,61 @@ private void registerForDataGovernance(String providerURL, String destination) { } @Override - public void process(StreamingInput stream, Tuple tuple) - throws InterruptedException, ConnectionException, - UnsupportedEncodingException, ParserConfigurationException, - TransformerException, Exception { + public void process(StreamingInput stream, Tuple tuple) throws Exception { boolean msgSent = false; - - // Create the initial connection for the first time only - // This is only called if the operator is NOT in a consistent region. - if(consistentRegionContext == null) { - if (isInitialConnection) { - jmsConnectionHelper.createInitialConnection(); - isInitialConnection = false; - } - } - - // Construct the JMS message based on the message type taking the - // attributes from the tuple. - Message message = mhandler.convertTupleToMessage(tuple, - jmsConnectionHelper.getSession()); - // Send the message - // If an exception occured while sending , drop the particular tuple. - if(consistentRegionContext == null) { // Operator is not in a consistent region - msgSent = jmsConnectionHelper.sendMessage(message); + if (isInitialConnection) { + jmsConnectionHelper.createInitialConnection(); + isInitialConnection = false; + } + + Message message; + try { + // Construct the JMS message based on the message type taking the + // attributes from the tuple. If the session is closed, we will be thrown out by JMSException. + message = mhandler.convertTupleToMessage(tuple, + jmsConnectionHelper.getSession()); + msgSent = jmsConnectionHelper.sendMessage(message); + } catch (UnsupportedEncodingException | ParserConfigurationException | TransformerException e) { + tracer.log (LogLevel.ERROR, "Failure creating JMS message from input tuple: " + e.toString()); //$NON-NLS-1$ + // no further action; tuple is dropped and sent to error port if present + } catch (/*JMS*/Exception e) { + tracer.log (LogLevel.ERROR, "failure creating or sending JMS message: " + e.toString() + + ". Trying to reconnect and re-send message"); //$NON-NLS-1$ + try { + isInitialConnection = true; + jmsConnectionHelper.closeConnection(); + jmsConnectionHelper.createInitialConnection(); + isInitialConnection = false; + message = mhandler.convertTupleToMessage(tuple, jmsConnectionHelper.getSession()); + msgSent = jmsConnectionHelper.sendMessage(message); + } catch (Exception finalExc) { + tracer.log (LogLevel.ERROR, "Tuple dropped. Final failure re-sending tuple: " + finalExc.toString()); //$NON-NLS-1$ + // no further action; tuple is dropped and sent to error port if present + } + } } else { + // consistent region + // Construct the JMS message based on the message type taking the + // attributes from the tuple. + // propagate all exceptions to the runtime to restart the consistent region in case of failure + Message message = mhandler.convertTupleToMessage(tuple, + jmsConnectionHelper.getSession()); msgSent = jmsConnectionHelper.sendMessageNoRetry(message); } if (!msgSent) { - + if (tracer.isLoggable(LogLevel.FINE)) tracer.log(LogLevel.FINE, "tuple dropped"); //$NON-NLS-1$ logger.log(LogLevel.ERROR, "EXCEPTION_SINK"); //$NON-NLS-1$ if (hasErrorPort) { + // throws Exception sendOutputErrorMsg(tuple, Messages.getString("EXCEPTION_SINK")); //$NON-NLS-1$ } - } - } // Method to send the error message to the error output port if one is From 8ae8907e22fe32a5701eae69c2ee06caed97832e Mon Sep 17 00:00:00 2001 From: rolefhei Date: Thu, 21 Sep 2017 10:24:17 +0200 Subject: [PATCH 4/6] recoverSession() removed from exception handler --- .../streamsx/messaging/jms/JMSConnectionHelper.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java index 2b237a0..7e62ca1 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java @@ -231,19 +231,13 @@ private Connection getConnect() { /** - * Called asynchronously to notify problems with the connection + * Called asynchronously to notify problems with the connection. + * Here we have no implementation except tracing the problem. * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ @Override public void onException (JMSException ex) { tracer.log(LogLevel.ERROR, "onException: " + ex.toString()); //$NON-NLS-1$ - try { - this.recoverSession(); - } catch (JMSException | ConnectionException e) { - tracer.log(LogLevel.ERROR, "onException: " + e.toString()); //$NON-NLS-1$ - } catch (InterruptedException e) { - // ignore interruption of notification thread - } } // Method to create the initial connection From 863bc17e780b1ebac48dd7bd0a88b6b97d41fe69 Mon Sep 17 00:00:00 2001 From: rolefhei Date: Fri, 29 Sep 2017 10:08:46 +0200 Subject: [PATCH 5/6] connection handling for XMSSource and XMSSink improved --- .../Common/XMSCommon.h | 538 ++++---- .../XMSSink/XMSSink_cpp.cgt | 1227 +++++++++-------- .../XMSSource/XMSSource_cpp.cgt | 147 +- .../XMSSource/XMSSource_h.cgt | 5 +- 4 files changed, 1013 insertions(+), 904 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/Common/XMSCommon.h b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/Common/XMSCommon.h index 7394852..878b55c 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/Common/XMSCommon.h +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/Common/XMSCommon.h @@ -9,45 +9,45 @@ #include "MessagingResource.h" private: - - // Configuration parameters - xms::String * pInitialContext; - xms::String * pConnectionFactory; - xms::String * pDestination; - xms::String * pQueueURI; - xms::String * pTopicURI; - xmsINT iDeliveryMode; - xms::String * pUserID; - xms::String * pPassword; - xms::String * pNamespaceURI; - xmsMESSAGE_TYPE msgType; - - // XMS api objects - xms::ConnectionFactory * pConnFact; - xms::Destination * pDest; - xms::Connection conn; - xms::Session sess; - xms::Destination dest; - xms::MessageProducer producer; - xms::MessageConsumer consumer; - xms::MessageListener * pListener; - - // State - SPL::boolean fatalError; - SPL::boolean connected; - + +// Configuration parameters +xms::String * pInitialContext; +xms::String * pConnectionFactory; +xms::String * pDestination; +xms::String * pQueueURI; +xms::String * pTopicURI; +xmsINT iDeliveryMode; +xms::String * pUserID; +xms::String * pPassword; +xms::String * pNamespaceURI; +xmsMESSAGE_TYPE msgType; + +// XMS api objects +xms::ConnectionFactory * pConnFact; +xms::Destination * pDest; +xms::Connection conn; +xms::Session sess; +xms::Destination dest; +xms::MessageProducer producer; +xms::MessageConsumer consumer; +xms::MessageListener * pListener; + +// State +SPL::boolean fatalError; +SPL::boolean connected; + public: - //To sum up the initialization error messages - ostringstream InitializationErrMsg; - +//To sum up the initialization error messages +ostringstream InitializationErrMsg; + public: /******************************************************************* * Finalize: Close connection and delete resources * *******************************************************************/ void finalizeOperator() { - SPLAPPTRC(L_DEBUG, "Entry: finalizeOperator", "XMSCommon"); + SPLAPPTRC(L_DEBUG, "Entry: finalizeOperator", "XMSCommon"); /* * Close the connection. This will automatically close and delete dependent objects @@ -59,7 +59,7 @@ void finalizeOperator() { // indicate that we don't have a Connection any more connected = false; - + /* * Clean up any allocated resources */ @@ -122,8 +122,8 @@ xmsINT createAdminObjects() { nRC = XMS_E_BAD_PARAMETER; } else{ - SPLAPPTRC(L_TRACE, "Looked up Connection Factory " << *pConnectionFactory, "XMSCommon"); - pConnFact = dynamic_cast (pPC); + SPLAPPTRC(L_TRACE, "Looked up Connection Factory " << *pConnectionFactory, "XMSCommon"); + pConnFact = dynamic_cast (pPC); } // Lookup & create the destination, if we are using an administered object @@ -141,7 +141,7 @@ xmsINT createAdminObjects() { } else SPLAPPTRC(L_TRACE, "Looked up Destination " << *pDestination, "XMSCommon"); - pDest = dynamic_cast (pPC); + pDest = dynamic_cast (pPC); } } catch (xms::Exception & ex) { // Unable to lookup connection factory or destination. @@ -184,10 +184,10 @@ xmsINT createAdminObjects() { * Create XMS objects needed to send messages via XMS * * This assumes that the Administered Objects have been created * *******************************************************************/ - - #define PRODUCER 1 - #define CONSUMER 2 - + +#define PRODUCER 1 +#define CONSUMER 2 +#define OPERATOR_SHUTDOWN_IN_PROGRESS 5555 xmsINT createXMS(const xmsINT producerOrConsumer,const xmsINT reconnectionPolicy, const xmsINT reconnectionBound, const xmsFLOAT period ) { @@ -196,207 +196,236 @@ xmsINT createXMS(const xmsINT producerOrConsumer,const xmsINT reconnectionPolicy xmsINT nRC = XMS_OK; try { - SPLAPPLOG(L_INFO, MSGTK_XMS_CONNECT, "XMSCommon"); - - //set the reconnectionAttemptDelay to period - float reconnectionAttemptDelay = period; - - - nConnectionAttempts++; - updatePerformanceCounters(); - - if ((nRC=connect())!=XMS_OK) - { - //Get the reconnectionPolicy value, 1=Bounded retry, 2= NoRetry, 3= InfiniteRetry + SPLAPPLOG(L_INFO, MSGTK_XMS_CONNECT, "XMSCommon"); - //Check if ReconnectionPolicy is noRetry, then abort - if(reconnectionPolicy==2){ - SPLAPPLOG(L_ERROR, MSGTK_CONNECTION_FAILURE_NORETRY,"XMSCommon"); - if (isErrorPortSpecified==true) - { - InitializationErrMsg<= reconnectionBound) + if(counter >= reconnectionBound) { break; + } + getPE().blockUntilShutdownRequest((double)reconnectionAttemptDelay); + if (getPE().getShutdownRequested()) { + return OPERATOR_SHUTDOWN_IN_PROGRESS; + } + SPLAPPTRC(L_INFO, "createXMS - trying to connect", "XMSCommon"); } + } - if (counter==reconnectionBound){ - // Bounded number of tries has exceeded - throw new xms::Exception(); - } + if (counter==reconnectionBound){ + SPLAPPTRC(L_ERROR, "createXMS - #conectionAttempts exceeds reconnection bound (" << reconnectionBound << ")", "XMSCommon"); + // Bounded number of tries has exceeded + throw new xms::Exception(); + } - else { - //We have got a successful connection - } + else { + //We have got a successful connection + } - } + } - //Check if ReconnectionPolicy is infiniteRetry, then try once in interval defined by period - else if(reconnectionPolicy==3){ - //infinitely reconnect - nConnectionAttempts++; - updatePerformanceCounters(); - while ((nRC=connect())!=XMS_OK){ - nConnectionAttempts++; - updatePerformanceCounters(); - SPL::rstring logmsg = MSGTK_CONNECTION_FAILURE_INFINITERETRY(reconnectionAttemptDelay); - SPLAPPLOG(L_INFO, logmsg,"XMSCommon"); - getPE().blockUntilShutdownRequest((double)reconnectionAttemptDelay); - } + //Check if ReconnectionPolicy is infiniteRetry, then try once in interval defined by period + else if(reconnectionPolicy==3){ + //infinitely reconnect + getPE().blockUntilShutdownRequest((double)reconnectionAttemptDelay); + if (getPE().getShutdownRequested()) { + return OPERATOR_SHUTDOWN_IN_PROGRESS; + } + xmsINT counter=1; + nConnectionAttempts++; + updatePerformanceCounters(); + SPLAPPTRC(L_INFO, "createXMS - trying to connect", "XMSCommon"); + while ((nRC=connect())!=XMS_OK){ + ++counter; + SPLAPPTRC(L_WARN, "createXMS - " << counter << ". connect failed. rc = " << nRC, "XMSCommon"); + nConnectionAttempts++; + updatePerformanceCounters(); + SPL::rstring logmsg = MSGTK_CONNECTION_FAILURE_INFINITERETRY(reconnectionAttemptDelay); + SPLAPPLOG(L_INFO, logmsg,"XMSCommon"); + getPE().blockUntilShutdownRequest((double)reconnectionAttemptDelay); + if (getPE().getShutdownRequested()) { + return OPERATOR_SHUTDOWN_IN_PROGRESS; } + SPLAPPTRC(L_INFO, "createXMS - trying to connect", "XMSCommon"); } + } + } + + SPLAPPTRC(L_INFO, "createXMS - connection successfully created: " << nRC, "XMSCommon"); + SPLAPPLOG(L_INFO, MSGTK_CONNECTION_SUCCESSFUL, "XMSCommon"); + SPLAPPTRC(L_DEBUG, "Now creating other XMS api objects", "XMSCommon"); + + try { + SPLAPPTRC(L_DEBUG, "About to create Session", "XMSCommon"); + + // Create the session + + sess = conn.createSession(xmsFALSE, XMSC_AUTO_ACKNOWLEDGE); + SPLAPPTRC(L_DEBUG, "Session created", "XMSCommon"); - SPLAPPLOG(L_INFO, MSGTK_CONNECTION_SUCCESSFUL, "XMSCommon"); - SPLAPPTRC(L_TRACE, "Now creating other XMS api objects", "XMSCommon"); - - try { - SPLAPPTRC(L_TRACE, "About to create Session", "XMSCommon"); - - // Create the session - - sess = conn.createSession(xmsFALSE, XMSC_AUTO_ACKNOWLEDGE); - SPLAPPTRC(L_TRACE, "Session created", "XMSCommon"); - - try { - // Create the destination if not already retrieved from initial context - - if (pDest != NULL) - dest = *pDest; - else if (pQueueURI != NULL) { - SPLAPPTRC(L_TRACE, "About to create Queue destination", "XMSCommon"); - dest = sess.createQueue(*pQueueURI); - } - else if (pTopicURI != NULL) { - SPLAPPTRC(L_TRACE, "About to create Topic destination", "XMSCommon"); - dest = sess.createTopic(*pTopicURI); - } - else - throw xms::InvalidDestinationException(); - - SPLAPPTRC(L_DEBUG, "Using Destination " << dest.toString(), "XMSCommon"); - - if (producerOrConsumer == PRODUCER){ - try { - SPLAPPTRC(L_TRACE, "About to create Producer", "XMSCommon"); - // Create the producer - producer = sess.createProducer(dest); - - SPLAPPTRC(L_TRACE, "Producer created", "XMSCommon"); - - try { - SPLAPPTRC(L_TRACE, "About to set Producer's Delivery Mode", "XMSCommon"); - producer.setIntProperty(XMSC_DELIVERY_MODE, - iDeliveryMode); - } catch (xms::Exception & ex) { - // Unable to set the delivery mode - SPL::rstring logmsg = MSGTK_SET_DELIVERY_MODE_FAILURE(ex.getErrorString().c_str()); - SPLAPPLOG(L_ERROR, logmsg, "XMSCommon"); - if (isErrorPortSpecified==true) - { - //Added to append to the InitializationErrMsg - InitializationErrMsg<createConnection(*pUserID, *pPassword); } else { - SPLAPPTRC(L_TRACE, "Calling createConnection without parameters", "XMSCommon"); + SPLAPPTRC(L_DEBUG, "Calling createConnection without parameters", "XMSCommon"); conn = pConnFact->createConnection(); } } catch (xms::Exception & ex) { // Unable to create the connection SPL::rstring logmsg = MSGTK_CREATE_CONNECTION_EXCEPTION(ex.getErrorString().c_str()); SPLAPPLOG(L_ERROR, logmsg, "XMSCommon"); + ostringstream ost; + ex.dump (ost); + SPLAPPTRC (L_ERROR, "connect(): " << ost.str(), "XMSCommon"); if (isErrorPortSpecified==true) { //Append to the InitializationErrMsg @@ -429,7 +461,7 @@ xmsINT connect() { } processException(ex); nRC = ex.getErrorCode(); - } + } return nRC; } @@ -439,7 +471,7 @@ xmsINT connect() { * * *******************************************************************/ xmsVOID processException(const xms::Exception & ex) { - dumpError(ex.getHandle()); + dumpError(ex.getHandle()); } @@ -476,59 +508,59 @@ xmsVOID dumpError(xmsHErrorBlock hError) switch (jmsexception) { - case XMS_X_NO_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_NONE"; - break; + case XMS_X_NO_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_NONE"; + break; - case XMS_X_GENERAL_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_GENERALEXCEPTION"; - break; + case XMS_X_GENERAL_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_GENERALEXCEPTION"; + break; - case XMS_X_ILLEGAL_STATE_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_ILLEGALSTATEEXCEPTION"; - break; + case XMS_X_ILLEGAL_STATE_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_ILLEGALSTATEEXCEPTION"; + break; - case XMS_X_INVALID_CLIENTID_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_INVALIDCLIENTIDEXCEPTION"; - break; + case XMS_X_INVALID_CLIENTID_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_INVALIDCLIENTIDEXCEPTION"; + break; - case XMS_X_INVALID_DESTINATION_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_INVALIDDESTINATIONEXCEPTION"; - break; + case XMS_X_INVALID_DESTINATION_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_INVALIDDESTINATIONEXCEPTION"; + break; - case XMS_X_INVALID_SELECTOR_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_INVALIDSELECTOREXCEPTION"; - break; + case XMS_X_INVALID_SELECTOR_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_INVALIDSELECTOREXCEPTION"; + break; - case XMS_X_MESSAGE_EOF_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_MESSAGEEOFEXCEPTION"; - break; + case XMS_X_MESSAGE_EOF_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_MESSAGEEOFEXCEPTION"; + break; - case XMS_X_MESSAGE_FORMAT_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_MESSAGEFORMATEXCEPTION"; - break; + case XMS_X_MESSAGE_FORMAT_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_MESSAGEFORMATEXCEPTION"; + break; - case XMS_X_MESSAGE_NOT_READABLE_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_MESSAGENOTREADABLEEXCEPTION"; - break; + case XMS_X_MESSAGE_NOT_READABLE_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_MESSAGENOTREADABLEEXCEPTION"; + break; - case XMS_X_MESSAGE_NOT_WRITEABLE_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_MESSAGENOTWRITEABLEEXCEPTION"; - break; + case XMS_X_MESSAGE_NOT_WRITEABLE_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_MESSAGENOTWRITEABLEEXCEPTION"; + break; - case XMS_X_RESOURCE_ALLOCATION_EXCEPTION: - pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_RESOURCEALLOCATIONEXCEPTION"; - break; + case XMS_X_RESOURCE_ALLOCATION_EXCEPTION: + pszExceptionType = (xmsCHAR *) "XMS_JMSEXP_TYPE_RESOURCEALLOCATIONEXCEPTION"; + break; - default: - pszExceptionType = (xmsCHAR *) ""; - break; + default: + pszExceptionType = (xmsCHAR *) ""; + break; } - + sprintf(szErrorMsg,"Error Block \n -> JMSException = %d (%s)\n -> Error Code = %d (%s)\n -> Error Data = %s\n\n", - - jmsexception, pszExceptionType, nReason, szText, szData); + + jmsexception, pszExceptionType, nReason, szText, szData); SPL::rstring logmsg = MSGTK_XMS_JMS_EXCEPTION(szErrorMsg); @@ -544,11 +576,11 @@ xmsVOID dumpError(xmsHErrorBlock hError) /* - * Get the next linked error, and act recursively - */ + * Get the next linked error, and act recursively + */ xmsErrorGetLinkedError(hError, &xmsLinkedError); - + dumpError(xmsLinkedError); } } diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt index 9ef3f2d..d4dbfcd 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt @@ -6,9 +6,9 @@ %> <% - my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion"); +my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion"); %> - + <% # Set up commonly used variables my $connTag = 'XMS'; @@ -27,12 +27,12 @@ use File::Basename; use Cwd 'realpath'; my $modelroot = $model->getContext()->getOperatorDirectory(); unshift @INC, dirname($modelroot) . "/Common"; - require Connection; - require Access; - require Edge; - require MessagingResource; +require Connection; +require Access; +require Edge; +require MessagingResource; + - # Set up the objects that interface with this operator's connection specification and access specification. my ($conn, $access) = Edge::connectionSetup($model, $connTag, $accessTag); @@ -85,16 +85,16 @@ else{ } else { if ( $reconnectionPolicy->getValueAt(0)->getSPLExpression() eq "NoRetry" ){ - #reconnectionPolicy is NoRetry, reconnectionBound is 0 +#reconnectionPolicy is NoRetry, reconnectionBound is 0 $reconnectionBound=0; } else { if ( $reconnectionPolicy->getValueAt(0)->getSPLExpression() eq "BoundedRetry" ){ - #reconnectionPolicy is BoundedRetry and reconnectionBound is absent, default value 5 is used +#reconnectionPolicy is BoundedRetry and reconnectionBound is absent, default value 5 is used $reconnectionBound=5; } else { - #reconnectionPolicy is infiniteRetry +#reconnectionPolicy is infiniteRetry $reconnectionBound=-1; } } @@ -135,132 +135,132 @@ using namespace streams_boost; <%SPL::CodeGen::implementationPrologue($model);%> <% - ### Consistent region ERROR message ### - my $crContext = $model->getContext()->getOptionalContext("ConsistentRegion"); - if($crContext && $crContext->isStartOfRegion()) { - SPL::CodeGen::exitln(MessagingResource::MSGTK_OP_CANNOT_BE_IN_CONSISTENT_REGION("XMSSink")); - } +### Consistent region ERROR message ### +my $crContext = $model->getContext()->getOptionalContext("ConsistentRegion"); +if($crContext && $crContext->isStartOfRegion()) { + SPL::CodeGen::exitln(MessagingResource::MSGTK_OP_CANNOT_BE_IN_CONSISTENT_REGION("XMSSink")); +} %> // Constructor MY_OPERATOR::MY_OPERATOR() : nTruncatedInserts(0),nTruncatedInsertsPC( getContext().getMetrics().getCustomMetricByName("nTruncatedInserts")),nFailedInserts(0),nFailedInsertsPC( getContext().getMetrics().getCustomMetricByName("nFailedInserts")),nConnectionAttempts(0),nConnectionAttemptsPC( getContext().getMetrics().getCustomMetricByName("nConnectionAttempts")) { SPLAPPTRC(L_DEBUG, "Entry: Constructor", "XMSSink"); - + <%if ($isInConsistentRegion) {%> _crContext = static_cast (getContext().getOptionalContext(CONSISTENT_REGION)); - + getContext().registerStateHandler(*this); <%}%> - + std::ostringstream ErrMsg; - + //Check if error port is specified and likewise set the isErrorPortSpecified variable <%if ($model->getNumberOfOutputPorts() == 1) { %> - isErrorPortSpecified=true; + isErrorPortSpecified=true; <% } else{%> - isErrorPortSpecified=false; + isErrorPortSpecified=false; <%}%> - + //Initialize member variables using values from the Connection document - <% - - # InitialContext and ConnectionFactory are guaranteed to be there by the XML Schema - my $ic =$conn->getAttributeByName('initial_context'); - - use URI; - - # support relative binding file path. - if(!defined ($ic) || $ic eq '') { - SPL::CodeGen::exitln(MessagingResource::MSGTK_VALUE_MUST_BE_SPECIFIED_FOR_INITIAL_CONTEXT_ATTRIB_IN_CONNECTION_DOC()); - } - - my $url = new URI($ic); - - if($url->scheme eq '') { - SPL::CodeGen::exitln(MessagingResource::MSGTK_INVALID_INITIAL_CONTEXT_FORMAT_DETECTED()); - } - - if($url->scheme eq 'file') { - my $path = $url->path; - - # This is a relative path - if(substr($path, 0, 1) ne '/') { - $ic = $path; - } - # if it ends with a / then append .bindings - if(substr($ic,-1) eq '/') { - $ic .= '.bindings'; - } - } - - # Append .bindings to the URL, if it is a file based one that refers to a directory. The XMS client requires this - # If it starts with file:/// and it ends with a / then append .bindings - #if ((substr($ic,0,8) eq 'file:///') && (substr($ic,-1) eq '/')){ - # $ic .= '.bindings'; - # } - # - %> - - std::string initContext = "<%=$ic %>"; - - // process relative file path - if(initContext.find("file:") == std::string::npos) { - streams_boost::filesystem::path filePath(initContext); - - if(filePath.is_relative()) { - // if relative, convert to absolute path using the application directory as the base - filePath = streams_boost::filesystem::absolute(filePath, getPE().getApplicationDirectory()); - initContext = filePath.string(); - } - - initContext = "file://" + initContext; - } - - - pInitialContext = new xms::String(initContext); - pConnectionFactory = new xms::String("<%=$conn->getAttributeByName('connection_factory'); %>"); - - - // Obtain the User id and password - <% if ($conn->hasAttributeName('user')) { %> - pUserID = new xms::String("<%=$conn->getAttributeByName('user'); %>"); - <% } - else { %> - pUserID = NULL; - <% } %> - - <% if ($conn->hasAttributeName('password')) { %> - pPassword = new xms::String("<%=$conn->getAttributeByName('password'); %>"); - <% } - else { %> - pPassword = NULL; - <% } %> - - - // message_class tells us what type of XMS message is to be output - <% - my $msgType = $access->getAttributeByName('message_class'); - %> - - <% - - my $nparm = $access->getNumberOfNativeSchemaAttributes(); - #Retrieve the attributes from the incoming tuple and create a list of these after some checks. + +# InitialContext and ConnectionFactory are guaranteed to be there by the XML Schema + my $ic =$conn->getAttributeByName('initial_context'); + + use URI; + +# support relative binding file path. + if(!defined ($ic) || $ic eq '') { + SPL::CodeGen::exitln(MessagingResource::MSGTK_VALUE_MUST_BE_SPECIFIED_FOR_INITIAL_CONTEXT_ATTRIB_IN_CONNECTION_DOC()); + } + + my $url = new URI($ic); + + if($url->scheme eq '') { + SPL::CodeGen::exitln(MessagingResource::MSGTK_INVALID_INITIAL_CONTEXT_FORMAT_DETECTED()); + } + + if($url->scheme eq 'file') { + my $path = $url->path; + +# This is a relative path + if(substr($path, 0, 1) ne '/') { + $ic = $path; + } +# if it ends with a / then append .bindings + if(substr($ic,-1) eq '/') { + $ic .= '.bindings'; + } + } + +# Append .bindings to the URL, if it is a file based one that refers to a directory. The XMS client requires this +# If it starts with file:/// and it ends with a / then append .bindings +#if ((substr($ic,0,8) eq 'file:///') && (substr($ic,-1) eq '/')){ +# $ic .= '.bindings'; +# } +# + %> + + std::string initContext = "<%=$ic %>"; + + // process relative file path + if(initContext.find("file:") == std::string::npos) { + streams_boost::filesystem::path filePath(initContext); + + if(filePath.is_relative()) { + // if relative, convert to absolute path using the application directory as the base + filePath = streams_boost::filesystem::absolute(filePath, getPE().getApplicationDirectory()); + initContext = filePath.string(); + } + + initContext = "file://" + initContext; + } + + + pInitialContext = new xms::String(initContext); + pConnectionFactory = new xms::String("<%=$conn->getAttributeByName('connection_factory'); %>"); + + + // Obtain the User id and password + <% if ($conn->hasAttributeName('user')) { %> + pUserID = new xms::String("<%=$conn->getAttributeByName('user'); %>"); + <% } + else { %> + pUserID = NULL; + <% } %> + + <% if ($conn->hasAttributeName('password')) { %> + pPassword = new xms::String("<%=$conn->getAttributeByName('password'); %>"); + <% } + else { %> + pPassword = NULL; + <% } %> + + + // message_class tells us what type of XMS message is to be output + <% + my $msgType = $access->getAttributeByName('message_class'); + %> + + + <% + + my $nparm = $access->getNumberOfNativeSchemaAttributes(); +#Retrieve the attributes from the incoming tuple and create a list of these after some checks. my $parmlist = []; for ( my $i=0; $i < $nparm; $i++ ){ my $parm = {}; - $$parm{_name} = $access->getNativeSchemaAttributeNameAt($i); - $$parm{_type} = $access->getNativeSchemaAttributeTypeAt($i); - $$parm{_length} = $access->getNativeSchemaAttributeLengthAt($i); - push @$parmlist, $parm; + $$parm{_name} = $access->getNativeSchemaAttributeNameAt($i); + $$parm{_type} = $access->getNativeSchemaAttributeTypeAt($i); + $$parm{_length} = $access->getNativeSchemaAttributeLengthAt($i); + push @$parmlist, $parm; } %> - + //Initialize variables that are to do with the access spec pDestination = new xms::String("<%=$access->getAttributeByName('identifier'); %>"); pQueueURI = NULL; @@ -270,26 +270,32 @@ MY_OPERATOR::MY_OPERATOR() : nTruncatedInserts(0),nTruncatedInsertsPC( getContex fatalError = false; connected = false; - + //Set the periodValue periodVal = <%=$periodVal%>; //Now attempt to create the XMS objects - if (createAdminObjects() != XMS_OK) + if (createAdminObjects() != XMS_OK) { fatalError = true; - else if (createXMS(PRODUCER,<%=$reconnectionPolicy%>,<%=$reconnectionBound%>,(xmsFLOAT)periodVal) == XMS_OK) + } + // In the constructor we try to connect with policy NoRetry to avoid long or even endless blocking of + // createXMS (...) if one of the other reconnection policies has been specified. Blocking constructor + // might cause timeouts in the runtime on job submission. + // + // In tuple processing we can wait forever when we reconnect. + else if (createXMS(PRODUCER, /*NoRetry*/2, 0, (xmsFLOAT)periodVal) == XMS_OK) { connected = true; //In case of initial connection failure, we need to set a flag to avoid reconnection when the MQ is down from the beginning, to distinguish the case for transient conection failure isInitialConnectionFailure=false; - + } else { isInitialConnectionFailure=true; } - + SPLAPPTRC(L_DEBUG, "Exit: Constructor", "XMSSink"); } @@ -297,20 +303,20 @@ MY_OPERATOR::MY_OPERATOR() : nTruncatedInserts(0),nTruncatedInsertsPC( getContex //send the output to the error port if error port is specified and tuple information is present void MY_OPERATOR::sendOutputErrorMsg(ostringstream& ErrMsg, Tuple const & tuple){ - + //Append the initialization error message ostringstream PrintMessage; PrintMessage<getAttributes()}) { my $errorAttributeName = $errorAttribute->getName(); my $errorAttributeType = $errorAttribute->getSPLType(); - - # Input Tuple + +# Input Tuple if (SPL::CodeGen::Type::isTuple($errorAttributeType)) { print "// Copy tuple from input data as is;\n"; print "otupleError.set_$errorAttributeName(tuple) ;\n"; @@ -322,7 +328,7 @@ void MY_OPERATOR::sendOutputErrorMsg(ostringstream& ErrMsg, Tuple const & tuple) print" submit(otupleError, 0); \n"; } %> - + } @@ -337,348 +343,353 @@ MY_OPERATOR::~MY_OPERATOR() void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) { ErrMsg.flush(); - SPLAPPTRC(L_TRACE, "Entry: process0(const <%=$inStream->getSPLTupleType()%>", "XMSSink"); + SPLAPPTRC(L_DEBUG, "Entry: process0(const <%=$inStream->getSPLTupleType()%>", "XMSSink"); SPL::rstring logmsg; // = PROCESSING_TUPLE(tuple); //SPLAPPLOG(L_INFO, logmsg, "XMSSink"); - - + + if (fatalError == true){ SPLAPPLOG(L_ERROR, MSGTK_PREVIOUS_ERROR, "XMSSink"); <% if (defined $operatorErrorPort ) { %> - SPLAPPLOG(L_INFO, MSGTK_SEND_TUPLE_ERROR_PORT, "XMSSink"); - sendOutputErrorMsg(ErrMsg,tuple); + SPLAPPLOG(L_INFO, MSGTK_SEND_TUPLE_ERROR_PORT, "XMSSink"); + sendOutputErrorMsg(ErrMsg,tuple); <% }%> } else { if (connected == false) { - - //We need not try to re-connect if its initial connection failure for the first tuple - if(isInitialConnectionFailure==true) { - isInitialConnectionFailure=false; + + // Recreate the XMS objects if we don't have any (this could happen after a connection failure) + // Note, that createXMS can block until shutdown request with reconnectionPolicy == InfiniteRetry + if (createXMS(PRODUCER,<%=$reconnectionPolicy%>,<%=$reconnectionBound%>,(xmsFLOAT)periodVal) == XMS_OK) { + connected = true; } else { - // Recreate the XMS objects if we don't have any (this could happen after a connection failure) - if (createXMS(PRODUCER,<%=$reconnectionPolicy%>,<%=$reconnectionBound%>,(xmsFLOAT)periodVal) == XMS_OK) - connected = true; + if (getPE().getShutdownRequested()) { + SPLAPPTRC(L_INFO, "process (tuple): shutdown requested. Stopping reconnection attempt and dropping tuple", "XMSSink"); + return; + } + // we came out of createXMS(...) with error other than OPERATOR_SHUTDOWN_IN_PROGRESS + // We could not reconnect with configured policy --> throw exception and terminate PE + throw xms::Exception(); } } - - + + // Carry on only if we have managed to get a connection if (connected == false){ <% if (defined $operatorErrorPort ) { %> - ErrMsg << MSGTK_MESSAGE_DROPPED; - sendOutputErrorMsg(ErrMsg,tuple); + ErrMsg << MSGTK_MESSAGE_DROPPED; + sendOutputErrorMsg(ErrMsg,tuple); <% }%> SPLAPPLOG(L_ERROR, MSGTK_MESSAGE_DROPPED, "XMSSink"); nFailedInserts++; updatePerformanceCounters(); SPLAPPTRC(L_TRACE, "Exit: process0(const <%=$inStream->getSPLTupleType()%>)", "XMSSink"); - return; + return; } - - + + /* - * In this version of the operator, the message classes that would be supported are: map, stream, bytes, xml, wbe, wbe22 and empty. - * The data types that would be supported by all of these message classes are uint8, int8, uint16, int16, uint32, int32, int64, float32, float64, boolean and rstring - * The data type blob would be supported by map, stream, xml and bytes. - * The truncation of rstring attribute would happen in accordance to the length specified in the native schema in map, stream, bytes and xml. - */ - + * In this version of the operator, the message classes that would be supported are: map, stream, bytes, xml, wbe, wbe22 and empty. + * The data types that would be supported by all of these message classes are uint8, int8, uint16, int16, uint32, int32, int64, float32, float64, boolean and rstring + * The data type blob would be supported by map, stream, xml and bytes. + * The truncation of rstring attribute would happen in accordance to the length specified in the native schema in map, stream, bytes and xml. + */ + try { - + // Used to increment the performance counter nDroppedMessages boolean truncated = false; - const IPort0Type& t = (const IPort0Type&) tuple; + const IPort0Type& t = (const IPort0Type&) tuple; // Generate the code to create and populate the XMS message. This code depends on the message class - + <% if ( $msgType eq 'map' || $msgType eq 'stream') { my $mesgType = $msgType eq 'map' ? 'Map' : 'Stream'; my $setWrite = $msgType eq 'map' ? 'set' : 'write'; - + %> xms::<%=$mesgType%>Message tempmsg = sess.create<%=$mesgType%>Message(); xms::<%=$mesgType%>Message& msg = tempmsg; - + <% - foreach my $attribute (@$parmlist) { - my $name = $$attribute{_name}; - my $type = $$attribute{_type}; - my $length = $$attribute{_length}; - + foreach my $attribute (@$parmlist) { + my $name = $$attribute{_name}; + my $type = $$attribute{_type}; + my $length = $$attribute{_length}; + my $optName = $msgType eq 'map' ? "\"$name\"," : ""; print "SPLAPPTRC(L_TRACE, \"Processing attribute $name\", \"XMSSink\");\n"; - - #Special code is needed to handle byte arrays + +#Special code is needed to handle byte arrays if ($type eq "Bytes"){ %> - { - const SPL::blob& b = t.get_<%=$name%>(); - uint64_t size; - const unsigned char *data = b.getData (size); - - //Handle zero length in native schema - <% if($length==0){ %> - size = 0; + { + const SPL::blob& b = t.get_<%=$name%>(); + uint64_t size; + const unsigned char *data = b.getData (size); + + //Handle zero length in native schema + <% if($length==0){ %> + size = 0; + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } + <%}%> + + if(size>0){ + <% if ($length > 0){ %> + if(size><%=$length%>){ if(!truncated){ nTruncatedInserts++; truncated = true; } - <%}%> - - if(size>0){ - <% if ($length > 0){ %> - if(size><%=$length%>){ - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - size = (size > <%=$length%>) ? <%=$length%>: size; - <% } %> - SPLAPPTRC(L_TRACE, "Processing blob of length "<Bytes(<%=$optName%>(xmsSBYTE *)data,(xmsINT)size); - } - } + } + size = (size > <%=$length%>) ? <%=$length%>: size; + <% } %> + SPLAPPTRC(L_TRACE, "Processing blob of length "<Bytes(<%=$optName%>(xmsSBYTE *)data,(xmsINT)size); + } + } <% } - #special code is needed to handle strings +#special code is needed to handle strings elsif($type eq "String"){ %> - { - const SPL::rstring data = t.get_<%=$name%>(); - uint64_t size = data.string().length(); - <% if ($length >= 0){ %> - if(size><%=$length%>){ - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - size = (size > <%=$length%>) ? <%=$length%>: size; - <% } %> - - string Strdata = (rstring)(data.string().substr(0,size)); - const SPL::rstring data2 = rstring(Strdata); - SPLAPPTRC(L_TRACE, "Processing string of length "<String(<%=$optName%>data2); + { + const SPL::rstring data = t.get_<%=$name%>(); + uint64_t size = data.string().length(); + <% if ($length >= 0){ %> + if(size><%=$length%>){ + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } } + size = (size > <%=$length%>) ? <%=$length%>: size; + <% } %> + + string Strdata = (rstring)(data.string().substr(0,size)); + const SPL::rstring data2 = rstring(Strdata); + SPLAPPTRC(L_TRACE, "Processing string of length "<String(<%=$optName%>data2); + } <% } - - # Other datatypes can be simply copied across + +# Other datatypes can be simply copied across else{ %> - msg.<%=$setWrite%><%=$type%>(<%=$optName%> t.get_<%=$name%>()); + msg.<%=$setWrite%><%=$type%>(<%=$optName%> t.get_<%=$name%>()); <% } } } - - - - elsif ($msgType eq 'bytes') + + + + elsif ($msgType eq 'bytes') { print 'xms::BytesMessage tempmsg = sess.createBytesMessage();'; print 'xms::BytesMessage& msg = tempmsg;'; - + foreach my $attribute (@$parmlist) { my $name = $$attribute{_name}; my $type = $$attribute{_type}; my $length = $$attribute{_length}; - + print "SPLAPPTRC(L_TRACE, \"Processing attribute $name\", \"XMSSink\");\n"; - - - # Special code is needed to handle byte arrays + + +# Special code is needed to handle byte arrays if ($type eq "Bytes"){ %> - { - const SPL::blob& b = t.get_<%=$name%>(); - uint64_t length; - const unsigned char *data = b.getData (length); - - //Handle zero length in native schema - <% if($length==0){ %> - length = 0; - if(!truncated){ + { + const SPL::blob& b = t.get_<%=$name%>(); + uint64_t length; + const unsigned char *data = b.getData (length); + + //Handle zero length in native schema + <% if($length==0){ %> + length = 0; + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } + <% } %> + + if(length>0){ + <% if ($length > 0){ %> + if(length> <%=$length%>){ + if(!truncated){ nTruncatedInserts++; - truncated = true; + truncated = true; + } + } + length = (length > <%=$length%>) ? <%=$length%> : length; + <% } %> + + + <% if($length==-2){ %> + if(length >65535){ + length = 65535; + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } + } + msg.writeShort(length); + <% } %> + + <% if($length==-4){ %> + if(length >4294967295){ + length = 4294967295; + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } + } + msg.writeInt(length); + <% } %> + + <% if($length==-8){ %> + if(length > 18446744073709551615L){ + length = 18446744073709551615L; + if(!truncated){ + nTruncatedInserts++; + truncated = true; } - <% } %> - - if(length>0){ - <% if ($length > 0){ %> - if(length> <%=$length%>){ - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - length = (length > <%=$length%>) ? <%=$length%> : length; - <% } %> - - - <% if($length==-2){ %> - if(length >65535){ - length = 65535; - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - msg.writeShort(length); - <% } %> - - <% if($length==-4){ %> - if(length >4294967295){ - length = 4294967295; - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - msg.writeInt(length); - <% } %> - - <% if($length==-8){ %> - if(length > 18446744073709551615L){ - length = 18446744073709551615L; - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - msg.writeLong(length); - <% } %> - SPLAPPTRC(L_TRACE, "Processing blob of length "< 0) { %> - for (int i=length; i<<%=$length%>; i++) - msg.writeByte(' '); - <% } %> - } + msg.writeLong(length); + <% } %> + SPLAPPTRC(L_TRACE, "Processing blob of length "< 0) { %> + for (int i=length; i<<%=$length%>; i++) + msg.writeByte(' '); + <% } %> + } + } <% } - - - #Special code is needed to handle strings + + +#Special code is needed to handle strings elsif ($type eq 'String'){ %> - { - uint64_t length; - //const char *cStr = (t.get_<%=$name%>()).c_str(); - //length = strlen(cStr); - const SPL::rstring& str = t.get_<%=$name%>(); - const char *cStr = str.c_str(); - length = str.size(); - - - //Handle zero length in native schema - <% if($length==0){ %> - length = 0; + { + uint64_t length; + //const char *cStr = (t.get_<%=$name%>()).c_str(); + //length = strlen(cStr); + const SPL::rstring& str = t.get_<%=$name%>(); + const char *cStr = str.c_str(); + length = str.size(); + + + //Handle zero length in native schema + <% if($length==0){ %> + length = 0; + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } + <% } %> + + if (length>0){ + <% if ($length > 0){ %> + if(length><%=$length%>){ + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } + } + length = (length > <%=$length%>) ? <%=$length%> : length; + <% } %> + + + + <% if($length==-2){ %> + if(length >65535){ + length = 65535; + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } + } + msg.writeShort(length); + <% }%> + + <% if($length==-4){ %> + if(length >4294967295){ + length = 4294967295; + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } + } + msg.writeInt(length); + <% }%> + + <% if($length==-8){%> + if(length > 18446744073709551615L){ + length = 18446744073709551615L; if(!truncated){ nTruncatedInserts++; truncated = true; } + } + msg.writeLong(length); <% } %> - if (length>0){ - <% if ($length > 0){ %> - if(length><%=$length%>){ - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - length = (length > <%=$length%>) ? <%=$length%> : length; - <% } %> - - - - <% if($length==-2){ %> - if(length >65535){ - length = 65535; - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - msg.writeShort(length); - <% }%> - - <% if($length==-4){ %> - if(length >4294967295){ - length = 4294967295; - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - msg.writeInt(length); - <% }%> - - <% if($length==-8){%> - if(length > 18446744073709551615L){ - length = 18446744073709551615L; - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - msg.writeLong(length); - <% } %> - - SPLAPPTRC(L_TRACE, "Processing string of length "< 0){ %> - for (int i=length; i<<%=$length%>; i++) - msg.writeByte(' '); - <% } %> - } - } + SPLAPPTRC(L_TRACE, "Processing string of length "< 0){ %> + for (int i=length; i<<%=$length%>; i++) + msg.writeByte(' '); + <% } %> + } + } <% } - - #other datatypes can be copied across + +#other datatypes can be copied across else { %> - msg.write<%=$type%>(t.get_<%=$name%>()); + msg.write<%=$type%>(t.get_<%=$name%>()); <% } } } - - - elsif ($msgType eq 'wbe' || $msgType eq 'wbe22') + + + elsif ($msgType eq 'wbe' || $msgType eq 'wbe22') { - # Syntax for WBE message is msg.setType("name", tuple.get_name()); +# Syntax for WBE message is msg.setType("name", tuple.get_name()); print 'xms::TextMessage msg = sess.createTextMessage();'; - print 'WBEMessage *wmsg = new WBEMessage("',$inStream->getCppTupleName(),'");'; + print 'WBEMessage *wmsg = new WBEMessage("',$inStream->getCppTupleName(),'");'; foreach my $attribute (@$parmlist) { my $name = $$attribute{_name}; my $type = $$attribute{_type}; print "SPLAPPTRC(L_TRACE, \"Processing attribute $name\", \"XMSSink\");\n"; %> wmsg->set<%=$type%>("<%=$name%>",t.get_<%=$name%>()); - <%} + <%} print 'msg.setText(wmsg->toString());'; - } - - + } + + elsif ($msgType eq 'xml') { - # Syntax for XML message is msg.setType("name", tuple.get_name()); +# Syntax for XML message is msg.setType("name", tuple.get_name()); print 'xms::TextMessage tempmsg = sess.createTextMessage();'; print 'xms::TextMessage& msg = tempmsg;'; - + print 'XMLMessage *xmsg = new XMLMessage();'; - + foreach my $attribute (@$parmlist) { my $name = $$attribute{_name}; my $type = $$attribute{_type}; @@ -687,136 +698,140 @@ void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) print "SPLAPPTRC(L_TRACE, \"Processing attribute $name\", \"XMSSink\");\n"; if($type eq "Bytes"){ %> - { - const SPL::blob& b = t.get_<%=$name%>(); - uint64_t size; - const unsigned char *data = b.getData (size); - - //Handle zero length in native schema - <% if($length==0){ %> - size = 0; + { + const SPL::blob& b = t.get_<%=$name%>(); + uint64_t size; + const unsigned char *data = b.getData (size); + + //Handle zero length in native schema + <% if($length==0){ %> + size = 0; + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } + <% } %> + + if(size>0){ + <% if ($length > 0){ %> + if(size><%=$length%>){ if(!truncated){ nTruncatedInserts++; truncated = true; } - <% } %> - - if(size>0){ - <% if ($length > 0){ %> - if(size><%=$length%>){ - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - size = (size > <%=$length%>) ? <%=$length%> : size; - <% } %> - SPLAPPTRC(L_TRACE, "Processing blob of length "<set<%=$type%>("<%=$name%>","<%=$SPLType%>",(xmsSBYTE *) data,(xmsINT)size); } + size = (size > <%=$length%>) ? <%=$length%> : size; + <% } %> + SPLAPPTRC(L_TRACE, "Processing blob of length "<set<%=$type%>("<%=$name%>","<%=$SPLType%>",(xmsSBYTE *) data,(xmsINT)size); } + } <% } - - + + elsif ($type eq 'String'){ %> - { - //const char *cStr = (t.get_<%=$name%>()).c_str(); - //int length = strlen(cStr); - const SPL::rstring& str = t.get_<%=$name%>(); - const char *cStr = str.c_str(); - uint64_t length = str.size(); - - <% if ($length >= 0){ %> - if(length><%=$length%>){ - if(!truncated){ - nTruncatedInserts++; - truncated = true; - } - } - length = (length > <%=$length%>) ? <%=$length%> : length; - <%}%> - - string data = cStr; - data = data.substr(0,length); - const char * data2 = data.c_str(); - SPLAPPTRC(L_TRACE, "Processing string of length "<set<%=$type%>("<%=$name%>","<%=$SPLType%>",data2); + { + //const char *cStr = (t.get_<%=$name%>()).c_str(); + //int length = strlen(cStr); + const SPL::rstring& str = t.get_<%=$name%>(); + const char *cStr = str.c_str(); + uint64_t length = str.size(); + + <% if ($length >= 0){ %> + if(length><%=$length%>){ + if(!truncated){ + nTruncatedInserts++; + truncated = true; + } } + length = (length > <%=$length%>) ? <%=$length%> : length; + <%}%> + + string data = cStr; + data = data.substr(0,length); + const char * data2 = data.c_str(); + SPLAPPTRC(L_TRACE, "Processing string of length "<set<%=$type%>("<%=$name%>","<%=$SPLType%>",data2); + } <% } - + else{ %> - xmsg->set<%=$type%>("<%=$name%>","<%=$SPLType%>", t.get_<%=$name%>()); + xmsg->set<%=$type%>("<%=$name%>","<%=$SPLType%>", t.get_<%=$name%>()); <% } }%> msg.setText(xmsg->toString()); - <%} - - - else + <%} + + + else { - # Default to a standard message with no message body. +# Default to a standard message with no message body. print 'xms::Message msg = sess.createMessage();'; } %> - + //Now send the message + SPLAPPTRC (L_DEBUG, "going to send message ...", "XMSSink"); producer.send(msg); logmsg = MSGTK_SENT_MESSAGE(msg.getJMSMessageID().c_str()); SPLAPPLOG(L_INFO, logmsg, "XMSSink"); - + } catch (xms::Exception & ex) { // Unable to send + ostringstream ost; + ex.dump (ost); + SPLAPPTRC (L_ERROR, "Sending message failed: " << ost.str(), "XMSSink"); nFailedInserts++; logmsg = MSGTK_EXCEPTION(ex.getErrorString().c_str(),ex.getErrorCode()); SPLAPPLOG(L_ERROR, logmsg, "XMSSink"); processException(ex); <% if (defined $operatorErrorPort ) { %> - ErrMsg << logmsg; - sendOutputErrorMsg(ErrMsg,tuple); + ErrMsg << logmsg; + sendOutputErrorMsg(ErrMsg,tuple); <% }%> - + // Close the connection. This will automatically close and delete dependent objects - try { + try { conn.close(); } catch (...) { SPLAPPTRC(L_WARN, "The connection could not be closed. An exception occured during close of connection", "XMSSink"); } connected = false; // indicate that we don't have a Connection any more - + } catch (DistilleryException & ex) { nFailedInserts++; // Streams runtime exception logmsg = MSGTK_STREAMS_EXCEPTION(ex.what(),ex.getExplanation()); <% if (defined $operatorErrorPort ) { %> - ErrMsg << logmsg; - sendOutputErrorMsg(ErrMsg,tuple); + ErrMsg << logmsg; + sendOutputErrorMsg(ErrMsg,tuple); <% }%> - + SPLAPPLOG(L_ERROR, logmsg, "XMSSink"); - + } catch (std::exception & ex) { nFailedInserts++; // Some other exception logmsg = MSGTK_OTHER_EXCEPTION(ex.what()); <% if (defined $operatorErrorPort ) { %> - ErrMsg << logmsg; - sendOutputErrorMsg(ErrMsg,tuple); + ErrMsg << logmsg; + sendOutputErrorMsg(ErrMsg,tuple); <% }%> SPLAPPLOG(L_ERROR, logmsg, "XMSSink"); - + } catch (...) { nFailedInserts++; // Some unknown exception <% if (defined $operatorErrorPort ) { %> - ErrMsg << MSGTK_UNKNOWN_EXCEPTION; - sendOutputErrorMsg(ErrMsg,tuple); + ErrMsg << MSGTK_UNKNOWN_EXCEPTION; + sendOutputErrorMsg(ErrMsg,tuple); <% }%> SPLAPPLOG(L_ERROR, MSGTK_UNKNOWN_EXCEPTION, "XMSSink"); } } - + // Now update our performance metrics. updatePerformanceCounters(); SPLAPPTRC(L_TRACE, "Exit: process0(const <%=$inStream->getSPLTupleType()%>)", "XMSSink"); @@ -842,75 +857,75 @@ void MY_OPERATOR::updatePerformanceCounters() { * Class to help build the special XML format used for WBE 2.2 messages * ***********************************************************************/ - MY_OPERATOR::WBEMessage::WBEMessage(const char* eventName) { - closed = false; - result<<""; - } +MY_OPERATOR::WBEMessage::WBEMessage(const char* eventName) { + closed = false; + result<<""; +} - void MY_OPERATOR::WBEMessage::setFloat(const char* name,const xmsFLOAT value) { - if (closed == false) { - std::ostringstream ostr; - SPL::serializeWithPrecision(ostr,value); - result<<""<"; - } +void MY_OPERATOR::WBEMessage::setFloat(const char* name,const xmsFLOAT value) { + if (closed == false) { + std::ostringstream ostr; + SPL::serializeWithPrecision(ostr,value); + result<<""<"; } +} - void MY_OPERATOR::WBEMessage::setDouble(const char* name,const xmsDOUBLE value) { - if (closed == false) { - std::ostringstream ostr; - SPL::serializeWithPrecision(ostr,value); - result<<""<"; - } +void MY_OPERATOR::WBEMessage::setDouble(const char* name,const xmsDOUBLE value) { + if (closed == false) { + std::ostringstream ostr; + SPL::serializeWithPrecision(ostr,value); + result<<""<"; } +} - void MY_OPERATOR::WBEMessage::setByte(const char* name,const unsigned int value) { - if (closed == false) { - result<<""<"; - } +void MY_OPERATOR::WBEMessage::setByte(const char* name,const unsigned int value) { + if (closed == false) { + result<<""<"; } +} - void MY_OPERATOR::WBEMessage::setShort(const char* name,const xmsSHORT value) { - if (closed == false) { - result<<""<"; - } +void MY_OPERATOR::WBEMessage::setShort(const char* name,const xmsSHORT value) { + if (closed == false) { + result<<""<"; } +} - void MY_OPERATOR::WBEMessage::setInt(const char* name,const xmsINT value) { - if (closed == false) { - result<<""<"; - } +void MY_OPERATOR::WBEMessage::setInt(const char* name,const xmsINT value) { + if (closed == false) { + result<<""<"; } +} - void MY_OPERATOR::WBEMessage::setLong(const char* name,const xmsLONG value) { - if (closed == false) { - result<<""<"; - } +void MY_OPERATOR::WBEMessage::setLong(const char* name,const xmsLONG value) { + if (closed == false) { + result<<""<"; } +} - void MY_OPERATOR::WBEMessage::setString(const char* name,const rstring& value) { - if (closed == false) { - result<<""<"; - } +void MY_OPERATOR::WBEMessage::setString(const char* name,const rstring& value) { + if (closed == false) { + result<<""<"; } +} - void MY_OPERATOR::WBEMessage::setBoolean(const char* name,const xmsBOOL value) { - if (closed == false) { - if (value==true) - result<<""<<"true"<<""; +void MY_OPERATOR::WBEMessage::setBoolean(const char* name,const xmsBOOL value) { + if (closed == false) { + if (value==true) + result<<""<<"true"<<""; - else - result<<""<"; + else + result<<""<"; - } } +} - xms::String MY_OPERATOR::WBEMessage::toString() { - if (closed == false) { - result<<""; - closed = true; - } - return (result.str()); +xms::String MY_OPERATOR::WBEMessage::toString() { + if (closed == false) { + result<<""; + closed = true; } + return (result.str()); +} <% } @@ -920,75 +935,75 @@ elsif ($msgType eq 'wbe') { %> * Class to help build the special XML format used for WBE 6.2 messages * ***********************************************************************/ - MY_OPERATOR::WBEMessage::WBEMessage(const char* eventName) { - closed = false; - evName = eventName; - result<<"<"<"; - } +MY_OPERATOR::WBEMessage::WBEMessage(const char* eventName) { + closed = false; + evName = eventName; + result<<"<"<"; +} - void MY_OPERATOR::WBEMessage::setFloat(const char* name,const xmsFLOAT value) { - if (closed == false) { - std::ostringstream ostr; - SPL::serializeWithPrecision(ostr,value); - result<<"<"<"<"; - } +void MY_OPERATOR::WBEMessage::setFloat(const char* name,const xmsFLOAT value) { + if (closed == false) { + std::ostringstream ostr; + SPL::serializeWithPrecision(ostr,value); + result<<"<"<"<"; } - - void MY_OPERATOR::WBEMessage::setDouble(const char* name,const xmsDOUBLE value) { - if (closed == false) { - std::ostringstream ostr; - SPL::serializeWithPrecision(ostr,value); - result<<"<"<"<"; - } +} + +void MY_OPERATOR::WBEMessage::setDouble(const char* name,const xmsDOUBLE value) { + if (closed == false) { + std::ostringstream ostr; + SPL::serializeWithPrecision(ostr,value); + result<<"<"<"<"; } +} - void MY_OPERATOR::WBEMessage::setByte(const char* name,const unsigned int value) { - if (closed == false) { - result<<"<"<"<"; - } +void MY_OPERATOR::WBEMessage::setByte(const char* name,const unsigned int value) { + if (closed == false) { + result<<"<"<"<"; } +} - void MY_OPERATOR::WBEMessage::setShort(const char* name,const xmsSHORT value) { - if (closed == false) { - result<<"<"<"<"; - } +void MY_OPERATOR::WBEMessage::setShort(const char* name,const xmsSHORT value) { + if (closed == false) { + result<<"<"<"<"; } +} - void MY_OPERATOR::WBEMessage::setInt(const char* name,const xmsINT value) { - if (closed == false) { - result<<"<"<"<"; - } +void MY_OPERATOR::WBEMessage::setInt(const char* name,const xmsINT value) { + if (closed == false) { + result<<"<"<"<"; } +} - void MY_OPERATOR::WBEMessage::setLong(const char* name,const xmsLONG value) { - if (closed == false) { - result<<"<"<"<"; - } +void MY_OPERATOR::WBEMessage::setLong(const char* name,const xmsLONG value) { + if (closed == false) { + result<<"<"<"<"; } +} - void MY_OPERATOR::WBEMessage::setString(const char* name,const rstring& value) { - if (closed == false) { - result<<"<"<"<"; - } +void MY_OPERATOR::WBEMessage::setString(const char* name,const rstring& value) { + if (closed == false) { + result<<"<"<"<"; } +} - void MY_OPERATOR::WBEMessage::setBoolean(const char* name,const xmsBOOL value) { - if (closed == false) { - if (value==true) - result<<"<"<true"; - else - result<<"<"<false"; - } +void MY_OPERATOR::WBEMessage::setBoolean(const char* name,const xmsBOOL value) { + if (closed == false) { + if (value==true) + result<<"<"<true"; + else + result<<"<"<false"; } +} - xms::String MY_OPERATOR::WBEMessage::toString() { - if (closed == false) { - result<<""; - closed = true; - } - return (result.str()); +xms::String MY_OPERATOR::WBEMessage::toString() { + if (closed == false) { + result<<""; + closed = true; } - + return (result.str()); +} + <% } @@ -997,92 +1012,92 @@ elsif ($msgType eq 'xml') { %> * Class to help build the "generic" XML format messages * ***********************************************************************/ - MY_OPERATOR::XMLMessage::XMLMessage() { - closed = false; - result<<""; - } +MY_OPERATOR::XMLMessage::XMLMessage() { + closed = false; + result<<""; +} - void MY_OPERATOR::XMLMessage::setFloat(const char* name,const char* type,const xmsFLOAT value) { - if (closed == false) { - std::ostringstream ostr; - SPL::serializeWithPrecision(ostr,value); - result<<""<"; - } +void MY_OPERATOR::XMLMessage::setFloat(const char* name,const char* type,const xmsFLOAT value) { + if (closed == false) { + std::ostringstream ostr; + SPL::serializeWithPrecision(ostr,value); + result<<""<"; } - - void MY_OPERATOR::XMLMessage::setDouble(const char* name, const char* type,const xmsDOUBLE value) { - if (closed == false) { - std::ostringstream ostr; - SPL::serializeWithPrecision(ostr,value); - result<<""<"; - } +} + +void MY_OPERATOR::XMLMessage::setDouble(const char* name, const char* type,const xmsDOUBLE value) { + if (closed == false) { + std::ostringstream ostr; + SPL::serializeWithPrecision(ostr,value); + result<<""<"; } +} - void MY_OPERATOR::XMLMessage::setByte(const char* name, const char* type,const unsigned int value) { - if (closed == false) { - result<<""<"; - } +void MY_OPERATOR::XMLMessage::setByte(const char* name, const char* type,const unsigned int value) { + if (closed == false) { + result<<""<"; } +} - void MY_OPERATOR::XMLMessage::setBytes(const char* name, const char* type,const xmsSBYTE* value, const xmsINT length) { - if (closed == false) { - char pVal[length*2]; - char nvalue[length]; - char b; - for(int y=0, x=0; y> 4)); - pVal[x] = (char)(b > 9 ? b + 0x37 : b + 0x30); - b = ((nvalue[y] & 0xF)); - pVal[++x] = (char)(b > 9 ? b + 0x37 : b + 0x30); - } - string modStrVal = pVal; - string newStr = modStrVal.substr(0,length*2); - result<<""<"; +void MY_OPERATOR::XMLMessage::setBytes(const char* name, const char* type,const xmsSBYTE* value, const xmsINT length) { + if (closed == false) { + char pVal[length*2]; + char nvalue[length]; + char b; + for(int y=0, x=0; y> 4)); + pVal[x] = (char)(b > 9 ? b + 0x37 : b + 0x30); + b = ((nvalue[y] & 0xF)); + pVal[++x] = (char)(b > 9 ? b + 0x37 : b + 0x30); } + string modStrVal = pVal; + string newStr = modStrVal.substr(0,length*2); + result<<""<"; } - - void MY_OPERATOR::XMLMessage::setShort(const char* name, const char* type,const xmsSHORT value) { - if (closed == false) { - result<<""<"; - } +} + +void MY_OPERATOR::XMLMessage::setShort(const char* name, const char* type,const xmsSHORT value) { + if (closed == false) { + result<<""<"; } +} - void MY_OPERATOR::XMLMessage::setInt(const char* name, const char* type,const xmsINT value) { - if (closed == false) { - result<<""<"; - } +void MY_OPERATOR::XMLMessage::setInt(const char* name, const char* type,const xmsINT value) { + if (closed == false) { + result<<""<"; } +} - void MY_OPERATOR::XMLMessage::setLong(const char* name, const char* type,const xmsLONG value) { - if (closed == false) { - result<<""<"; - } +void MY_OPERATOR::XMLMessage::setLong(const char* name, const char* type,const xmsLONG value) { + if (closed == false) { + result<<""<"; } +} - void MY_OPERATOR::XMLMessage::setString(const char* name, const char* type,const rstring& value) { - if (closed == false) { - result<<""<"; - } - } - - void MY_OPERATOR::XMLMessage::setBoolean(const char* name, const char* type,const xmsBOOL value) { - if (closed == false) { - if (value==true) - result<<""<<"true"<<""; - else - result<<""<<"false"<<""; - } +void MY_OPERATOR::XMLMessage::setString(const char* name, const char* type,const rstring& value) { + if (closed == false) { + result<<""<"; + } +} + +void MY_OPERATOR::XMLMessage::setBoolean(const char* name, const char* type,const xmsBOOL value) { + if (closed == false) { + if (value==true) + result<<""<<"true"<<""; + else + result<<""<<"false"<<""; } +} - string MY_OPERATOR::XMLMessage::toString() { - if (closed == false) { - result<<""; - closed = true; - } - return (result.str()); +string MY_OPERATOR::XMLMessage::toString() { + if (closed == false) { + result<<""; + closed = true; } + return (result.str()); +} <% } %> @@ -1095,16 +1110,16 @@ void MY_OPERATOR::checkpoint(Checkpoint & ckpt) void MY_OPERATOR::reset(Checkpoint & ckpt) { - SPLAPPTRC(L_TRACE, "Reset: " << ckpt.getSequenceId(), "CONSISTENT"); + SPLAPPTRC(L_TRACE, "Reset: " << ckpt.getSequenceId(), "CONSISTENT"); } void MY_OPERATOR::resetToInitialState() { - SPLAPPTRC(L_TRACE, "Reset to Initial State. ", "CONSISTENT"); + SPLAPPTRC(L_TRACE, "Reset to Initial State. ", "CONSISTENT"); } void MY_OPERATOR::drain() { - SPLAPPTRC(L_TRACE, "Drain Operator", "CONSISTENT"); + SPLAPPTRC(L_TRACE, "Drain Operator", "CONSISTENT"); } void MY_OPERATOR::retireCheckpoint(int64_t id) { diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt index f355a85..03ef6b0 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt @@ -350,29 +350,13 @@ MY_OPERATOR::MY_OPERATOR(): nMessagesReadPC( getContext().getMetrics().getCustom pQueueURI = NULL; pTopicURI = NULL; pDest = NULL; - - iDeliveryMode = XMSC_DELIVERY_NOT_PERSISTENT; - - fatalError = false; connected = false; - + iDeliveryMode = XMSC_DELIVERY_NOT_PERSISTENT; periodVal = <%=$periodVal%>; //Now attempt to create the XMS objects - if (createAdminObjects() != XMS_OK) - fatalError = true; - else if (createXMS(CONSUMER,<%=$reconnectionPolicy%>,<%=$reconnectionBound%>,(xmsFLOAT)periodVal) == XMS_OK) - { - connected = true; - //In case of initial connection failure, we need to set a flag to avoid reconnection when the MQ is down from the beginning, to distinguish the case for transient conection failure - isInitialConnectionFailure=false; - - } - else - { - isInitialConnectionFailure=true; - } + fatalError = createAdminObjects() != XMS_OK; SPLAPPTRC(L_DEBUG, "Exit: Constructor", "XMSSource"); - + if (fatalError) throw xms::Exception(); } @@ -399,10 +383,7 @@ void MY_OPERATOR::allPortsReady() { // Notifies that all ports are ready. No tuples should be submitted before // this. Source operators can use this method to spawn threads. - - createThreads(1); // Create source thread - } @@ -418,39 +399,45 @@ void MY_OPERATOR::initialize() // Processing for source and threaded operators void MY_OPERATOR::process(uint32_t idx) { + SPLAPPTRC(L_TRACE, "Entry: process()", "XMSSource"); SPL::rstring logmsg; ostringstream ErrMsg; initialize(); - SPLAPPTRC(L_TRACE, "Entry: process()", "XMSSource"); - - if (fatalError == true){ - - SPLAPPLOG(L_ERROR, MSGTK_PREVIOUS_ERROR, "XMSSource"); - <% if (defined $operatorErrorPort ) { %> - ErrMsg< - } - - else{ - try - { - consumer.setMessageListener(this); - conn.start(); - SPLAPPTRC(L_TRACE, "Message Listener attached", "XMSSource"); - getPE().blockUntilShutdownRequest(); + if (!getPE().getShutdownRequested()) { + try { + // createXMS(...) can block for a long time dependent on reconnection policy; + // in case of infinite retry even until a shutdown has been requested. + // Thats why we check for shutdown request after return of this method. + xmsINT rc = createXMS(CONSUMER,<%=$reconnectionPolicy%>,<%=$reconnectionBound%>,(xmsFLOAT)periodVal); + if (!getPE().getShutdownRequested()) { + if (rc == XMS_OK) { + connected = true; + conn.setExceptionListener (this); + SPLAPPTRC(L_DEBUG, "Exception Listener attached", "XMSSource"); + consumer.setMessageListener(this); + SPLAPPTRC(L_DEBUG, "Message Listener attached", "XMSSource"); + conn.start(); + SPLAPPTRC(L_DEBUG, "Connection started", "XMSSource"); + getPE().blockUntilShutdownRequest(); + } + else { + isInitialConnectionFailure=true; + throw xms::Exception(); + } + } } catch (xms::Exception & ex) { logmsg = MSGTK_MESSAGE_LISTENER_ERROR(ex.getErrorString().c_str()); SPLAPPLOG(L_ERROR, logmsg, "XMSSource"); <% if (defined $operatorErrorPort ) { %> - ErrMsg< processException(ex); + throw; } - SPLAPPTRC(L_TRACE, "Exit: process()", "XMSSource"); - } + } + SPLAPPTRC(L_TRACE, "Exit: process()", "XMSSource"); } @@ -469,6 +456,78 @@ inline std::string MY_OPERATOR::xms2std(const xms::String & pString) return std::string(pString.c_str()); } +// ExceptionListener implementation. +xmsVOID MY_OPERATOR::onException (xms::Exception * pExp) { + ostringstream ost; + pExp->dump (ost); + SPLAPPTRC (L_ERROR, "onException: " << ost.str(), "XMSSource"); + processException (*pExp); + delete pExp; + + // immediate reconnection would fail - sleep for reconnection or initial period + double sleepTime = <%=$initDelayParmValue%> > 0? <%=$initDelayParmValue%>: periodVal; + SPLAPPTRC (L_INFO, "onException: waiting for "<< sleepTime << " seconds before reconnection attempt", "XMSSource"); + getPE().blockUntilShutdownRequest (sleepTime); + if (getPE().getShutdownRequested()) { + return; + } + + try { + conn.stop(); + } catch (xms::Exception & ex) { /*ignore*/ } + try { + conn.close(); + } catch (xms::Exception & ex) { /*ignore*/ } + // createXMS can block a longer - time dependent on reconnection policy, + // in case of infinite retry until a shutdown has been requested. + // Thats why we check for shutdown request after return of this method. + xmsINT resultCode = createXMS (CONSUMER,<%=$reconnectionPolicy%>,<%=$reconnectionBound%>,(xmsFLOAT)periodVal); + + if (getPE().getShutdownRequested()) { + return; + } + + if (resultCode == XMS_OK) { + try { + conn.setExceptionListener (this); + SPLAPPTRC(L_DEBUG, "Exception Listener attached", "XMSSource"); + consumer.setMessageListener(this); + SPLAPPTRC(L_DEBUG, "Message Listener attached", "XMSSource"); + conn.start(); + SPLAPPTRC(L_DEBUG, "Connection started", "XMSSource"); + } + catch (xms::Exception & ex) { + SPL::rstring logmsg; + ostringstream ErrMsg; + ex.dump (ErrMsg); + logmsg = MSGTK_MESSAGE_LISTENER_ERROR(ex.getErrorString().c_str()); + SPLAPPLOG(L_ERROR, logmsg, "XMSSource"); + SPLAPPTRC(L_ERROR, "exception handling failed: " << ErrMsg.str(), "XMSSource"); + <% if (defined $operatorErrorPort ) { %> + ErrMsg< + processException(ex); + } + } + else { + SPL::rstring logmsg; + ostringstream ErrMsg; + logmsg = MSGTK_PREVIOUS_ERROR; + SPLAPPLOG(L_ERROR, logmsg, "XMSSource"); + SPLAPPTRC(L_ERROR, logmsg << ": " << resultCode, "XMSSource"); + <% if (defined $operatorErrorPort ) { %> + ErrMsg< + // Reconnecting failed. The operator will never again receive data from QM due to an error condition. + // The PE would appear healthy, but this operator would not be functional any more. + // To indicate this serious problem, shutdown the PE by throwing an exception. + throw xms::Exception(); + } +} + + /* * SYNOPSIS: Overloaded virtual method for the message listener. This * method will be called when a message is received. diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_h.cgt b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_h.cgt index 6b0d9cb..ddf4884 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_h.cgt +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_h.cgt @@ -11,7 +11,7 @@ <%SPL::CodeGen::headerPrologue($model);%> -class MY_OPERATOR : public MY_BASE_OPERATOR ,public xms::MessageListener { +class MY_OPERATOR : public MY_BASE_OPERATOR ,public xms::MessageListener, public xms::ExceptionListener { public: @@ -40,6 +40,9 @@ public: */ virtual xmsVOID onMessage(xms::Message * pMsg); + // Exception listener implementation + virtual xmsVOID onException (xms::Exception * pExp); + // Performance Metrics SPL::int64 nMessagesRead; Metric & nMessagesReadPC; From 9aeb839d4c71a3c4f24d8915680c8c6e4257fed5 Mon Sep 17 00:00:00 2001 From: Norbert Schulz Date: Thu, 5 Oct 2017 11:55:17 +0200 Subject: [PATCH 6/6] Bump version to v5.3.3 --- com.ibm.streamsx.messaging/info.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index bd2dae7..7c7557c 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -680,7 +680,7 @@ The <attribute> element has three possible attributes: * composite types * xml - 5.3.2 + 5.3.3 4.2.0.0