Contents
This topic describes the StreamBase External Adapters for JMS, which allow StreamBase to integrate with a JMS-compliant message bus. The Java Message Service (JMS) API is a Java message-oriented middleware API for sending messages between two or more clients.
Note
The StreamBase JMS adapter installation kit installs four adapters:
-
JMS Embedded Input Adapter
-
JMS Embedded Output Adapter
-
JMS External Input Adapter
-
JMS External Output Adapter
This page documents the two JMS external adapters. For information on the embedded JMS adapters, see JMS Input and Output Adapters.
The StreamBase JMS adapter works with a JMS-compliant message bus. Java Message Service (JMS) is an API defined by Sun Microsystems for invoking operations on enterprise messaging systems. The JMS specification allows for multiple implementations called JMS providers. The StreamBase JMS adapter works with the JMS implementation of any JMS provider, but must be configured differently for each provider. This document uses configuration examples from two JMS providers:
-
TIBCO Enterprise Messaging Service™ (EMS)
-
Apache ActiveMQ
If you are connecting to a JMS server from another JMS provider, you must adapt the configuration examples and discussion on this page for the particulars of your JMS provider.
There are two elements to the StreamBase JMS adapter, an enqueue (input) adapter and a dequeue (output) adapter. The enqueue adapter consumes JMS messages and enqueues them to StreamBase Servers. Conversely, the dequeue adapter dequeues messages from StreamBase Servers and publishes them to JMS message buses.
The StreamBase JMS adapter performs two primary functions: it translates messages and it routes messages.
The enqueue adapter subscribes to JMS destinations. When the enqueue adapter receives a message from a JMS destination, it translates it from a JMS message to a StreamBase tuple. The enqueue adapter then enqueues this tuple to one or more StreamBase streams.
Similarly, the dequeue adapter subscribes to StreamBase streams. When the adapter dequeues a StreamBase tuple, it translates this tuple into a JMS message. Once this is done, the dequeue adapter sends this JMS message to one or more JMS destinations.
For the JMS adapter, StreamBase streams and JMS destinations have a many-to-many relationship. A StreamBase tuple can be dequeued from a StreamBase stream and eventually be routed to zero, one, or many JMS destinations. Similarly, a JMS message can be received from a JMS destination and eventually be routed to zero, one, or many StreamBase streams.
A routing from a StreamBase stream to a JMS destination is referred to here as a subscription. In the configuration file for the JMS dequeue adapter, a route from a stream to a destination is created by creating a subscription that relates the stream to the destination. Once this has been established, any messages dequeued from the stream will be sent to the specified destination.
The same is true on the enqueue side: a subscription creates a routing from a JMS destination to a StreamBase stream.
The JMS adapter kits installs startup scripts for both the enqueue and dequeue
adapters, named sb-ems-enqueue and sb-ems-dequeue respectively. They can be found in the bin directory of the StreamBase installation. Both the
JMS enqueue and dequeue adapters require only one argument on startup: the location
of their configuration file. Executing either script with a –h option displays usage text.
You must add your JMS provider's JAR files to the Java classpath when invoking either the enqueue or dequeue adapter. It may also be useful to extend the classpath so that a customized log4j.properties file can be provided. Make sure these JARs are specified in the CLASSPATH environment variable, or in the Java command line that invokes either adapter.
- Examples for ActiveMQ users:
-
The classpath must contain the following entries to successfully connect to an ActiveMQ server:
-
activemq_installdir/lib/activemq-core-5.0.0.jar -
activemq_installdir/lib/geronimo-j2ee-management_1.0_spec-1.0.jar -
activemq_installdir/lib/commons-logging-1.1.jar
-
- Examples for TIBCO EMS users:
-
The classpath must contain the following entry to successfully connect to a TIBCO EMS server:
-
TIBCO_EMS_installdir/ems/clients/java/tibjms.jar
-
One of the primary functions of the JMS adapter is to translate messages. Briefly, the adapter translates JMS messages to StreamBase tuples and StreamBase tuples to JMS messages.
There are five JMS message types:
-
BytesMessage
-
MapMessage
-
ObjectMessage
-
StreamMessage
-
TextMessage
Of these types, MapMessage is the one into which tuples translate most readily. Like a tuple, a MapMessage is a set of name-value pairs, making the translation of these two message types relatively straightforward.
Except for MapMessage, it is difficult to provide a default implementation for the JMS message types. The JMS adapter supports the other JMS message types through a plug-in architecture. A class that implements the interface com.streambase.sb.adapter.common.jms.enqueue.FromJMSMessageConverter can be used to convert a JMS message to a StreamBase tuple. This interface allows callers to translate a JMS message using just one method, with the following signature:
Object convertFromJMSMessage(javax.jms.Message message)
throws StreamBaseException
Similarly, com.streambase.sb.adapter.common.jms.dequeue.ToJMSMessageConverter specifies an interface that can be implemented in order to translate StreamBase tuples (or other objects) into JMS messages. To accomplish this the interface includes the following method:
javax.jms.Message convertToJMSMessage(Object object) throws
StreamBaseException
In addition to these conversion methods, both interfaces have methods used to set and
get parameters specified in the adapter configuration file. These parameters are
placed in a class called com.streambase.sb.adapter.common.jms.dequeue.ToJMSMessageConverter
for the dequeue adapter and com.streambase.sb.adapter.common.jms.enqueue.FromJMSMessageConverter
for the enqueue adapter. The methods are simple accessors for instances of those
classes.
For the dequeue adapter:
public ToJMSMessageConverterConfig getConfig()
public void setConfig(ToJMSMessageConverterConfig
config)
For the enqueue adapter:
public FromJMSMessageConverterConfig getConfig()
public void setConfig(FromJMSMessageConverterConfig
config)
To provide an implementation of either of these interfaces, you must first specify its type in the configuration file. The implementation must then be available to the JMS adapter when it starts. That is, it must be available in a jar or as a class file on the classpath. A message translator is specified in the configuration file as part of the subscription definition. Recall that a subscription associates a JMS destination with a StreamBase stream, or vice versa. The assumption is that different JMS destinations may receive messages with different formats, and different StreamBase streams may have different schemas. Therefore, a translation at the granularity of a subscription is required.
StreamBase tuples and JMS MapMessages are essentially collections of name-value pairs. The JMS enqueue adapter pairs the fields held by a StreamBase stream with fields in a JMS MapMessage that have the same names and compatible types. Similarly, the JMS dequeue adapter creates MapMessages with the same set of fields that exist in a given tuple. A best effort will be made to convert StreamBase data types to compatible JMS data types. Also, field names will be the same unless specified otherwise in a name map (see next section for details.)
A JMS message consists of two primary pieces, a header and a payload (body). In addition, properties can be set on a JMS message.
By default, the JMS adapter maps the fields of a StreamBase stream to entries in the body of a MapMessage. The JMS adapter looks for entries in the payload of a MapMessage that have the same names as the fields of the StreamBase stream.
The JMS adapter also provides facilities for handling other mappings, specified in the configuration file. For example, a name map can be used to map a field on a StreamBase stream to an entry in a JMS MapMessage that has a different name. Similarly, the fields in the header of a JMS message can be mapped to and from the fields of a StreamBase stream. In the same way, JMS properties can also be mapped to and from the fields of a StreamBase stream.
Note
Some JMS message headers are read-only. As such, the dequeue adapter will not be
able to change them even if there is an entry to that effect in the jms-header-maps section. When configuring header maps for use
by the dequeue adapter, keep in mind that only the following headers are mutable:
-
JMSCorrelationID
-
JMSReplyTo
-
JMSType
An attempt to map any other headers for the dequeue adapter will result in an error.
Because the enqueue adapter simply reads the JMS headers, all headers can be mapped.
The StreamBase JMS adapter supports dynamically adding and removing subscriptions. This facility is referred to here as dynamic subscription management. A StreamBase application is used to manage subscriptions dynamically.
Dynamic subscription management is accomplished by enqueuing StreamBase tuples to a stream on one of the configured StreamBase Servers. The StreamBase Server and stream to be used for subscription management are specified in the configuration file. A tuple specifying the subscription to add or remove is enqueued onto this stream. One, or both, of the JMS adapters will subscribe to this stream. It will dequeue this tuple and add or remove the specified subscription.
A StreamBase tuple used for subscription management must have a specific set of fields. In particular, the JMS adapter will look for string fields in the tuple with the following names:
| Field Name | Description | Required |
|---|---|---|
| command |
Indicates whether the subscription should be added or removed. Its valid
values are simply add and remove.
|
Yes |
| jmsServerName |
Name of the JMS server on which the destination can be found. The name should
match one specified in the jms-servers
section of the configuration file.
|
Yes |
| destinationName |
The name of the JMS destination to which to subscribe. The name should match
one specified in the jms-endpoints section of
the configuration file.
|
Yes |
| sbServerName |
The name of the StreamBase Server instance that holds the
stream. The name should match one specified in the sb-servers section of the configuration file.
|
Yes |
| streamName | The name of the StreamBase stream to which to enqueue incoming tuples. | Yes |
| converterClassName | Name of the message converter to use to translate messages | Yes |
| nameMap |
Name of the map to use to translate JMS field names to and from the field
names of a StreamBase stream. If present, this name should match
one specified in the name-maps section of the
configuration file.
|
No (The field must be present but may be null or empty. If a name map is not specified, the default name map will be used. If no default exists, no mapping will occur.) |
| headerMap |
Name of the map to use to translate JMS Header field names to and from the
field names of a StreamBase stream. If present, this name should
match one specified in the jms-header-maps
section of the configuration file.
|
No (The field must be present but may be null or empty. If a header map is not specified, the default header map will be used. If no default exists, no mapping will occur.) |
| propertyMap |
Name of the map to use to translate JMS Property field names to and from the
field names of a StreamBase stream. If present, this name should
match one specified in the jms-property-maps
section of the configuration file.
|
No (The field must be present but may be null or empty. If a property map is not specified, the default property map will be used. If no default exists, no mapping will occur.) |
| timestampFormat |
Specifies (by name) a timestamp format defined in the configuration file,
that is to be used to convert JMS fields that represent timestamps into
StreamBase Timestamp fields. In particular, a timestamp format
is used when the incoming JMS field is a java.lang.String and the StreamBase field is
a Timestamp. If present, this name should match
one specified in the timestamp-formats
section of the configuration file.
|
No (The field must be present but may be null or empty. If a timestamp format
name is not specified, the default timestamp format will be used. If no
default exists, the timestamp will be formatted according to java.text.SimpleDateFormat.)
|
| deliveryTimeField | Specifies the name of a field on the StreamBase stream that will be timestamped with the delivery time of the incoming JMS message | No (The field must be present but may be null or empty. If no delivery time field is specified, no delivery timestamp will be set on the tuple.) |
| truncateStrings |
Specifies whether incoming strings will be truncated to fit the corresponding
StreamBase field in cases where the string is longer than the
maximum size of the string field as declared in the schema. Valid values are
false (the default) or true. If set to false, the string value will not be set and
the corresponding field will be null. If set to true, the string will
truncated to fit and set in the field, and a warning message will be issued
(only once per field.)
|
No (The field may be present but null or empty, or it may be omitted from the
schema. If not specified a value of false will
be assumed.)
|
| customSettings | This represents an arbitrary string to send to the message converter. It can be used to send custom parameters not provided by the current configuration scheme. This string is passed verbatim to the message converter -- it is the sole responsibility of the converter to parse and interpret it. | No (The field may be present but null or empty, or it may be omitted from the schema.) |
The JMS dequeue adapter dequeues tuples from StreamBase streams,
translates them into JMS messages, and routes them to JMS destinations. The routing
function of the dequeue adapter can be specified in one of two ways, referred to here
as static routing and dynamic routing.
The dequeue adapter performs static routing based on the subscriptions specified in the configuration file. In addition to this, the dequeue adapter can perform routing dynamically, based on the contents of the tuple that it has dequeued. That is, a dequeued tuple can itself specify which JMS destination it should be routed to. In order to accomplish this, the tuple must specify two things, the JMS server and the JMS destination to which it should be routed.
These two pieces of information, the JMS server and the JMS destination, need to be
specified in fields held by the tuple. The names of these fields are specified with
the StreamBase Server settings in the configuration file. The two XML
attributes jms-server-field and jms-destination-field are used to specify the fields in the tuple
from which the JMS server and the JMS destination are retrieved.
Also, if only one JMS server is configured, then the field used to hold a value for this can be omitted; the dequeue adapter will default to the only possible JMS server.
Performance of the JMS adapter is discussed here primarily in terms of message translation. The configuration of either the enqueue or dequeue adapter can affect the number of times that messages need to be translated, thereby impacting the performance of the adapter.
Two factors may affect how an incoming JMS message is translated by the enqueue
adapter. These are the implementation of FromJMSMessageConverter that is used and the schema that is
targeted.
If a JMS message passes through two different FromJMSMessageConverters on its way to two streams, then it
will necessarily be translated twice. This will be the case if the two streams
expect the message in different formats. Keep in mind that the use of different
name maps, header maps, property maps, and timestamp formats require the use of a
different converter. The same is true if a different field is specified as the
delivery time field. In short, a JMS message needs to be translated once for each
different tuple that needs to be produced. When the same tuple can be enqueued to
two different streams then this tuple needs to be created only once.
When a MapMessage is converted to a tuple, a StreamBase schema is responsible for instantiating this tuple. If an incoming JMS message is targeted for two StreamBase streams that have different schemas, then two different tuples will be instantiated and two translations will be required.
If, however, an incoming JMS message is targeted for two or more streams that share
a common FromJMSMessageConverter and
StreamBase schema, then only one tuple is instantiated, and only one
translation is needed.
A tuple dequeued from a StreamBase Server will be translated into one or more JMS messages and then delivered to one or more JMS destinations. On the dequeue side, two factors control how many times a given tuple needs to be translated into a JMS message.
As on the enqueue side, if a given tuple will pass through two different
ToJMSMessageConverters on its way to two
destinations, then it will need to be translated twice.
Also, a JMS message is instantiated by the client runtime provided by the JMS vendor. That is, because the JMS message types are interfaces their implementations are provided by the vendor. JMS messages created by one server are not delivered to a destination that exists at a different JMS server.
Message translations, then, can be shared by destinations that share a common ToJMSMessageConverter implementation and a common JMS Server. In this case, a single JMS message can be instantiated and delivered to more than one destination.
The JMS enqueue adapter consumes messages from JMS destinations. The manner in which these messages are acknowledged is configurable within the JMS enqueue adapter. The setting for the acknowledge mode can impact the performance of the enqueue adapter.
The JMS enqueue adapter supports acknowledge modes of AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE. The enqueue adapter also supports the use
of TIBCO's proprietary acknowledge mode, NO_ACKNOWLEDGE. Of course, use of this acknowledge mode implies
that TIBCO's EMS message bus is being used. The acknowledge mode for a given JMS
server is specified in the jms-server section
of the configuration file, using the attribute acknowledge-mode.
Note also that the JMS enqueue adapter makes no effort to prevent or in any way handle the delivery of duplicate JMS messages. If a JMS server delivers the same message twice, the JMS enqueue adapter will simply consume it twice.
The JMS dequeue adapter will create JMS messages and deliver them to a JMS message
bus. JMS messages can be delivered with either of two delivery modes, DeliveryMode.PERSISTENT or DeliveryMode.NON_PERSISTENT. The delivery mode for a JMS
destination is specified in the jms-endpoints section
of the configuration file, using the delivery-mode
attribute.
The semantics of JMS delivery modes is beyond the scope of this discussion. In
general, a delivery mode of PERSISTENT implies a
higher quality of service for the JMS destination. However, some performance
penalty is also implied. In particular, if a delivery mode of PERSISTENT is specified, and there are durable subscribers for
the message, then the JMS message will need to be persisted before the JMS server
can acknowledge the message producer.
Note
for TIBCO EMS users: an additional EMS-specific delivery mode is supported,
RELIABLE_DELIVERY.
A StreamBase JMS adapter is configured using an XML configuration file. This configuration file consists of a series of subscriptions and information related to these subscriptions. Recall that a subscription relates one JMS destination available on a JMS server to one StreamBase stream available on a StreamBase Server.
In this context, a JMS destination available on a JMS server is referred to as a
JMS endpoint. Similarly, a StreamBase
stream available on a StreamBase Server is referred to as a
StreamBase endpoint. So, a subscription relates one JMS endpoint with
one StreamBase endpoint. Further, each of these endpoints is named.
Recall also that a message converter, either FromJMSMessageConverter or ToJMSMessageConverter, can be specified in the configuration
file's subscription definitions.
It may be that many subscriptions need to be specified in the configuration files for the JMS adapter. Where applicable, it is possible to specify a default value for a configuration attribute. The default can then be overridden by specifying a value for a particular subscription or endpoint.
A sample configuration file with additional comments can be displayed by either of
the JMS adapters by using the –s option. Configuration
files for the JMS enqueue and dequeue adapters are very similar. Some configuration
attributes are specific to either the enqueue or dequeue side. If these attributes
are specified for the adapter to which they do not apply, then they are simply
ignored.
<subscriptions>
<default-sb-endpoint-name>MSFTIN</default-sb-endpoint-name>
<default-message-to-tuple-converter>
com.streambase.sb.adapter.common.jms.enqueue.DefaultFromJMSMapMessageConverter
</default-message-to-tuple-converter>
<default-name-map>StandardConversion</default-name-map>
<default-jms-header-map>defaultHeaderMap</default-jms-header-map>
<default-jms-property-map>DefaultJMSPropertyMap</default-jms-property-map>
<default-timestamp-format>defaultTimestampFormat</default-timestamp-format>
<subscription name="Topic1OnStream1On10000" jms-endpoint-name="Topic1OnEMSServer"
sb-endpoint-name="Stream1On1000"
message-to-tuple-converter="com.streambase.sb.adapter.common.jms.common.DefaultMapMessageToTupleConverter"/>
<subscription name="Topic2OnStream1On10000" jms-endpoint-name="Topic2OnEMSServer"
sb-endpoint-name="Stream1On1000"
message-to-tuple-converter="com.streambase.sb.adapter.common.jms.common.DefaultMapMessageToTupleConverter"/>
<subscription name="Topic1OnStream2On10000" jms-endpoint-name="Topic1OnEMSServer"
sb-endpoint-name="Stream2On1000"
message-to-tuple-converter="com.streambase.sb.adapter.common.jms.common.DefaultMapMessageToTupleConverter"/>
</subscriptions>
<jms-header-maps>
<jms-header-map name="defaultHeaderMap">
<mapping jms-name="JMSTimestamp" sb-name="jms_timestamp"/>
</jms-header-map>
<jms-header-map name="secondHeaderMap">
<mapping jms-name="JMSTimestamp" sb-name="jms_timestamp"/>
</jms-header-map>
</jms-header-maps>
<jms-property-maps>
<jms-property-map name="DefaultJMSPropertyMap">
<mapping jms-name="StartTime" sb-name="start_time"/>
<mapping jms-name="EndTime" sb-name="end_time"/>
</jms-property-map>
</jms-property-maps>
<name-maps>
<name-map name="MSFTConversion">
<mapping jms-name="MSFTsymbol" sb-name="ticker_symbol"/>
</name-map>
<name-map name="StandardConversion">
<mapping jms-name="symbol" sb-name="ticker_symbol"/>
</name-map>
</name-maps>
A subscription relates a JMS endpoint to a StreamBase endpoint. Configuration of endpoints is broken out in a separate section in the XML configuration. This is because endpoint configuration may be repeated in more than one subscription. The configuration of a JMS endpoint includes the name of its JMS destination and the name of the JMS server that holds that topic. On the enqueue side, the acknowledge mode is specified for a JMS endpoint, and on the dequeue side the delivery mode is specified for a JMS endpoint. Similarly, the configuration of a StreamBase endpoint includes the name of the StreamBase stream and the name of the StreamBase Server that holds that stream:
<jms-endpoints>
<jms-endpoint name="Topic1OnEMSServer" jms-server-name="EMSServer"
topic-name="Topic1" delivery-mode="NON_PERSISTENT"/>
<jms-endpoint name="Topic2OnEMSServer" jms-server-name="EMSServer"
topic-name="Topic2" delivery-mode="NON_PERSISTENT"/>
</jms-endpoints>
<sb-endpoints>
<sb-endpoint name="Stream1On1000" sb-server-name="SBServerOn10000"
stream-name="InputStream1"/>
<sb-endpoint name="Stream2On1000" sb-server-name="SBServerOn10000"
stream-name="InputStream2"/>
<sb-endpoint name="StreamOn10001" sb-server-name="SBServerOn10001"
stream-name="InputStream"/>
</sb-endpoints>
A JMS endpoint configuration refers to an JMS server by name. As with JMS endpoints, a JMS server requires additional configuration. As with endpoints, the server configuration is broken out in a separate section of the XML configuration file:
<jms-servers>
<jms-server name="EMSServer"
provider-context-factory="com.naming.InitialContextFactory"
provider-url="naming://localhost:7222"
connection-factory-name="GenericConnectionFactory"
acknowledge-mode="NO_ACKNOWLEDGE"/>
</jms-servers>
Similarly with StreamBase Server configuration:
<sb-servers> <sb-server name="SBServerOn10000" uri="sb://localhost:10000"/> <sb-server name="SBServerOn10001" uri="sb://localhost:10001"/> </sb-servers>
Configuration for subscription management has the following form:
<subscription-management-stream sb-server-name="SBServerOn10000" stream-name="SubscriptionManagement"/>
