Dequeuing Data from Intermediate Streams

Introduction

You can dequeue data from an intermediate stream. This feature is for debugging only, to examine the value of data at an intermediate point in the application, before it has been fully processed and before it has arrived on an output stream. This feature is not for inserting new data into the middle of an application. Use an input stream to declare an entry point for data.

Tip

The recommended way to generate a new StreamBase dequeuer client is to use the StreamBase Java Client wizard, as described in Creating Java Clients.

Note: To enable this feature, start StreamBase Server in its standard mode, and set the following argument to true in the jvm-args or sysproperty element of the server configuration file:

-Dstreambase.codegen.intermediate-stream-dequeue=true

The default for this JVM argument is false.

In a large application, you can limit the number of intermediate streams that the application exposes in debug mode by specifying a regular expression that matches the names of the streams you want to expose. Make this specification with the following entry in the jvm-args or sysproperty element of the server configuration file:

-Dstreambase.codegen.intermediate-stream-dequeue-regex=pattern

For example, the regular expression pattern Map\\d allows any intermediate stream whose name contains the string "Map" followed by a digit to be exposed as a dequeueable stream. All other intermediate streams normally available in debug mode are not available.

-Dstreambase.codegen.intermediate-stream-dequeue-regex=Map\\d

Intermediate Streams in StreamBase Studio

To expose intermediate streams when running applications in StreamBase Studio,

Naming Convention

You can dequeue data from any output port of any operator in an application using a string of the form:

out:operatorName_N

where operatorName is the name of the operator and N is the number of the output port for that operator.

With intermediate stream dequeuing enable, your client can now dequeue from intermediate streams in the same way that you dequeue from output streams. This is a feature that you will use in a client with the StreamBase API.

Consider the Split.sbapp application, which is one of the operator samples installed with the StreamBase kit:

To verify the intermediate stream names, use an sbc list -a command when the application is running. For example:

stream  INTRUSION_TooManyIPsForUser
stream  INTRUSION_TooManyUsersForIP
stream  IPandUserLogin
stream  out:CheckIPsForAUser_1
stream  out:CheckUsersInAnIP_1
stream  out:ProcessIPFirst_1
stream  out:ProcessIPFirst_2
schema  schema:IPandUserLogin
operator        CheckIPsForAUser
operator        CheckUsersInAnIP
operator        IPCountExceeded
operator        ProcessIPFirst
operator        UserCountExceeded
container       default
container       system

Compare the EventFlow diagram and the sbc list output. Notice the following:

  • Because we are running the server in debug mode (or with JVM arguments set as described above), intermediate streams are known.

  • Examples of the default naming convention for intermediate streams are seen in the list. An example: out:ProcessIPFirst_2.

  • The output ports for the Filter operators, UserCountExceeded and IPCountExceeded, are connected to actual output streams, and thus do not have intermediate streams. Of course, your client can dequeue from intermediate streams or actual output streams.

  • In StreamBase Studio, you can view the XML source of your EventFlow application to verify the names of intermediate and output streams. You can also use the sbc list -a command in a StreamBase Command Prompt or UNIX terminal. If you view the XML source, look for the stream values associated with output ports. For example:

    <box name="ProcessIPFirst" type="split">
      <input port="1" stream="IPandUserLogin"/>
      <output port="1" stream="out:ProcessIPFirst_1"/>
      <output port="2" stream="out:ProcessIPFirst_2"/>
      <param name="output-count" value="2"/>
    

We mention the discovery of stream names for a reason. In some cases, the name of an intermediate stream can vary from the default convention, depending on the history of edits to the application. For example, consider this next installed sample, AggregateByDim.sbapp:

When you run this sample in debug mode (or with JVM argument set, as above), an sbc list -a command returns:

stream  AvgPricePSOut
stream  OutputStream1
stream  TradesIn
schema  schema:TradesIn
operator        Aggregate2Dimensions
operator        ConvertTimeToSeconds
container       default
container       system

By default, the intermediate stream between the Aggregate2Dimensions and ConvertTimeToSeconds operators would have been named out:Aggregate2Dimensions_1. But at some point in this application's history, the Aggregate2Dimensions operator was connected to an output stream named OutputStream1, and was subsequently disconnected from that output stream (which was either removed or renamed to AvgPricePSOut).

Despite the edits, the original output stream name (OutputStream1) defined for Aggregate2Dimensions is still in use. In other words, don't assume that each operator's output stream name follows the default convention. Check the value with an sbc list -a command or view the XML source.

Related Topics

For related information, see: