Contents
This topic describes the StreamBase embedded adapters for JMS, which allow StreamBase to integrate with a JMS-compliant message bus. The Java Message Service (JMS) API is a Java message-oriented middleware API for sending messages between two or more clients.
Note
The StreamBase JMS adapter installation kit installs four adapters:
-
JMS Embedded Input Adapter
-
JMS Embedded Output Adapter
-
JMS External Input Adapter
-
JMS External Output Adapter
This page documents the JMS Embedded Input and Output Adapters. For information on the two external adapters, see JMS External Adapters.
The StreamBase JMS adapters work with a JMS-compliant message bus. Java Message Service (JMS) is an API defined by Sun Microsystems for invoking operations on enterprise messaging systems. The JMS specification allows for multiple implementations called JMS providers. The StreamBase JMS adapter works with the JMS implementation of any JMS provider, but must be configured differently for each provider. This document uses configuration examples from two JMS providers:
-
TIBCO Enterprise Messaging Service™ (EMS)
-
Apache ActiveMQ
If you are connecting to a JMS server from another JMS provider, you must adapt the configuration examples and discussion on this page for the particulars of your JMS provider.
The JMS Embedded Input Adapter is called the JMS Reader on this page. The JMS Reader is an embedded adapter that subscribes for messages from a JMS message bus.
The JMS Embedded Output Adapter is called the JMS Writer on this page. The JMS Writer is an embedded adapter that publishes messages to a JMS message bus.
An embedded adapter is an adapter that runs in the same process as a StreamBase Server. The JMS Reader listens for messages from a JMS Message bus. When it receives a message, the JMS Reader creates a tuple from the message, and then sends that tuple to the operator that is downstream from it in its StreamBase application. Conversely, the JMS Writer receives tuples from a StreamBase Server, creates a JMS message from it and publishes it to the JMS Message bus.
The JMS Embedded Adapters serialize the messages they receive. That is, the StreamBase Server will process messages in the same order as they are initially received by the JMS Reader, and the JMS broker will be sent messages in the same order that the corresponding tuples are received by the JMS Writer.
Note
These adapters are not provided in the base StreamBase kit, but are available in a separate installation kit. Please contact StreamBase Systems if you are interested in the kit.
When the JMS Reader receives a message, it translates this message into a Tuple and
when the JMS Writer receives a Tuple, it translates it into a JMS message. There
are five JMS message types: BytesMessage,
MapMessage, ObjectMessage, StreamMessage
and TextMessage. Of these types, MapMessage is the one into which tuples translate most readily.
Like a tuple, a MapMessage is a set of name-value
pairs, making the translation of these two message types relatively
straightforward. The JMS Reader includes a class that converts JMS MapMessages to StreamBase tuples, DefaultFromJMSMapMessageConverter. Similarly the JMS Writer
adapter provides DefaultToJMSMapMessageConverter to
translate from a Tuple to a JMS MapMessages.
StreamBase tuples and JMS MapMessages are essentially collections of
name-value pairs. The JMS Reader pairs the fields held by a JMS MapMessage with fields in a StreamBase schema that
have the same names and compatible types. If the name of a field in the
MapMessage doesn't exist in the
StreamBase schema then that field is dropped. Similarly, if the name
of a field in the Schema doesn't exist in the MapMessage then there is no input
value for that field and it is nulled.
In the case of the JMS Writer, all fields in the incoming Tuple are added to the MapMessage.
An interface is used to represent a class that can convert a JMS Message to a
StreamBase tuple. This interface, com.streambase.sb.adapter.common.jms.enqueue.FromJMSMessageConverter,
includes a method, Object convertFromJMSMessage(Message
message) throws StreamBaseException that provides the implementation of the
translation. A different translation class can be implemented, and the JMS Reader
can then be configured to use this implementation. In this way, a converter to
translate one of the other JMS message types can be specified, or an alternate
implementation of a MapMessage converter can be
specified.
As expected, the JMS Writer also provides a similar interface, com.streambase.sb.adapter.common.jms.dequeue.ToJMSMessageConverter
which includes the Message convertToJMSMessage(Object
object) throws StreamBaseException method to do the opposite conversion.
A JMS message consists of two primary pieces, a header and a payload (body). In addition, properties can be set on a JMS message.
By default, the JMS Embedded Adapters map the fields of a MapMessage to the fields of a StreamBase schema. The JMS Reader looks for entries in the payload of a MapMessage that have the same names as the fields of the StreamBase schema. The JMS Writer simply creates entries in the outgoing Message to match all the fields of the incoming StreamBase schema.
The JMS Embedded Adapters also provide facilities for handling other mappings, specified in the configuration file. For example, a name map can be used to map a field on a MapMessage to a field on a StreamBase schema that has a different name. Similarly, the fields in the header of a JMS message can be mapped to the fields of a StreamBase schema. In the same way, JMS properties can also be mapped to the fields of a StreamBase schema.
Note
Some JMS message headers are read-only. As such, the JMS Writer will not be able to change them even if there is an entry to that effect in the header map. When configuring header maps for use by the JMS Writer, keep in mind that only the following headers are mutable:
-
JMSCorrelationID
-
JMSReplyTo
-
JMSType
An attempt to map any other headers for the JMS Writer will result in an error.
Because the JMS Reader simply reads the JMS headers, all headers can be mapped.
The JMS Reader consumes messages from JMS destinations. The manner in which these messages are acknowledged is configurable within the JMS Reader. The setting for the acknowledge mode can impact the performance of the reader.
The JMS Reader supports acknowledge modes of AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE. The reader also supports the use of
Tibco's proprietary acknowledge mode, NO_ACKNOWLEDGE. Of course, use of this acknowledge mode implies
that Tibco's EMS message bus is being used with the reader. The acknowledge mode
for a given JMS server is specified in the jms-server section of the configuration file, using the
attribute acknowledge-mode.
Note also that the JMS Reader makes no effort to prevent or in any way handle the delivery of duplicate JMS messages. If a JMS server delivers the same message twice, the JMS Reader will simply consume it twice.
The JMS Writer will create JMS messages and deliver them to a JMS message bus. JMS
messages can be delivered with either of two delivery modes, DeliveryMode.PERSISTENT or DeliveryMode.NON_PERSISTENT. The delivery mode for a JMS
destination is specified in the destination section of
the configuration file, using the delivery-mode
attribute.
The semantics of JMS delivery modes is beyond the scope of this discussion. In
general, a delivery mode of PERSISTENT implies a
higher quality of service for the JMS destination. Some performance penalty is also
implied, however. In particular, if a delivery mode of PERSISTENT is specified, and there are durable subscribers for
the message, then the JMS message will need to be persisted before the JMS server
can acknowledge the message producer.
Note
for TIBCO EMS users: an additional EMS-specific delivery mode is supported,
RELIABLE_DELIVERY.
The JMS Writer dequeues tuples from StreamBase streams, translates
them into JMS messages, and routes them to JMS destinations. The adapter relies on
two fields in the dequeued tuple to determine which destination on which JMS server
to send the translated message. The two fields must be called __JMSServerName and __JMSDestinationName. These fields must contain the name of a
configured JMS server and destination, respectively. These fields are optional if
only one destination is specified in the configuration; they are required if the
configuration specifies more than one destination (i.e., more than one server or
more than one destination in a single server).
When one of the JMS Embedded Adapters executes, it needs to connect to an executing instance of a JMS message bus. Typically, this bus will be from one of the JMS providers. In order to connect to the JMS provider's JMS bus, JAR files provided by the JMS provider must be made available to the JMS Reader.
This is true whether the adapter is executing from within StreamBase Studio or from within StreamBase Server:
-
If the adapter is started from Studio, then the vendor's JMS JAR file or files need to be imported into the Studio project.
-
If the adapter is executed from StreamBase Server, then the vendor's JAR file or files must be specified in the
java-vmsection of the server's configuration file.
- Examples for ActiveMQ users:
-
The following JAR files must be made available to StreamBase Studio and/or to StreamBase Server in order to successfully connect to an ActiveMQ server:
-
activemq_installdir/lib/activemq-core-5.0.0.jar -
activemq_installdir/lib/geronimo-j2ee-management_1.0_spec-1.0.jar -
activemq_installdir/lib/commons-logging-1.1.jar
-
- Examples for TIBCO EMS users:
-
The following JAR file must be made available to StreamBase Studio and/or to StreamBase Server in order to successfully connect to a TIBCO EMS server:
-
TIBCO_EMS_installdir/ems/clients/java/tibjms.jar
-
There is only one property for either of the JMS Embedded Adapters: its configuration file. Configuration of the adapter is fairly involved, and so is not accomplished through its properties, but rather through its configuration file. Keep in mind that just the name of the configuration file, rather than its path, should be specified in the property. The named file will be searched for using the path specified in the configuration for StreamBase itself.
The JMS Reader is similar to an input stream that gets its input from a JMS Message bus. As with an input stream, a Schema needs to be specified for the JMS Reader. The Schema used by the JMS Reader is specified in the Edit Schema tab of the Properties View in StreamBase Studio.
| Property | Description | Default |
|---|---|---|
| Configuration File (required string) | The name of the adapter configuration file. | None |
The JMS Embedded Adapters are configured using an XML configuration file. This configuration file consists of two types of information, JMS configuration and configuration concerning the translation of JMS messages to tuples and back.
It may be that many JMS destinations need to be specified in the configuration file for the JMS Embedded Adapters. Where applicable, it is possible to specify a default value for a configuration attribute of a destination. The default can then be overridden by specifying a value for a particular destination.
A sample configuration file, skeleton.sbconf is
included with the JMS Reader sample that is included in the StreamBase
distribution. This file includes comments describing the various XML elements and
attributes that make up the configuration. This sample configuration file can be
used as a template for configuration files. The skeleton configuration file can
also be obtained by executing the jmsreader.jar JAR
file. For Linux:
java -jar streambase-install-dir/lib/adapter/jmsreader.jar
For Windows:
java -jar "streambase-install-dir\lib\adapter\jmsreader.jar"
Similarly, the JMS Writer has a sample configuration file which can be displayed by
executing the jmswriter.jar JAR file. For Linux:
java -jar streambase-install-dir/lib/adapter/jmswriter.jar
For Windows:
java -jar "streambase-install-dir\lib\adapter\jmswriter.jar"
Here are some typical settings for connecting to JMS servers from TIBCO EMS and ActiveMQ:
TIBCO EMS (using JNDI)
<jms-servers>
<jms-server name="EMSServer"
provider-context-factory="com.naming.InitialContextFactory"
provider-url="tibjmsnaming://localhost:7222"
connection-factory-name="GenericConnectionFactory"
acknowledge-mode="NO_ACKNOWLEDGE"/>
</jms-servers>
ActiveMQ version 5.x (using JNDI)
<jms-servers>
<jms-server name="ActiveMQServer"
provider-context-factory="org.apache.activemq.jndi.ActiveMQInitialContextFactory"
provider-url="tcp://localhost:61616"
connection-factory-name="org.apache.activemq.ActiveMQConnectionFactory"
acknowledge-mode="NO_ACKNOWLEDGE"/>
</jms-servers>
The JMS Reader performs two checks during typechecking. The JMS Reader ensures that a Schema has been defined and that a value for its configuration file has been specified. The JMS Reader will not attempt to connect to a JMS server until it is started.
The JMS Writer performs two checks during typechecking. It ensures that a value for
its configuration file has been specified and that the input schema contains the
fields __JMSServerName and __JMSDestinationName, if they're required. The JMS Writer will not
attempt to connect to a JMS server until it is started.
On suspend, if either of the JMS Embedded Adapters is currently processing a message, it finishes processing that message or tuple to completion before suspending. That is, the StreamBase application itself finishes processing the message or tuple before the JMS adapter returns from a suspend command. Once this has occurred, the JMS Reader unsubscribes for messages. If any messages (or tuples for the JMS Writer) are delivered while the JMS adapter is suspended, it drops the messages or tuples and fails to acknowledge them.
On resume, the JMS Reader resubscribes for messages. The JMS adapters begin processing newly delivered messages/tuples once it has returned from the resume command.
