Library Articles
Using the StreamBase Java Toolkit to Write a Dequeuer Client
Author: Dr. John Lifter
Contributor: Bingfang Song
StreamBase Systems
10-January-2007
Revised: 10-May-2007
Applicable To: StreamBase 3.7
Topics:
A dequeuer client is an application that retrieves tuples from one, or more, output streams in a StreamBase application. The StreamBase Java Toolkit for Eclipse will generate starting point code for a dequeuer client
In the discussion that follows, a dequeuer client against a very simple StreamBase application will be developed. The schema defined on the input stream includes a single string field, size 10. The application is illustrated in the following figure.

The equivalent application may be written in StreamSQL.
CREATE INPUT STREAM InputStream1 (symbol string(10));
CREATE OUTPUT STREAM OutputStream1 AS
SELECT symbol FROM InputStream1;
CREATE OUTPUT STREAM OutputStream2 AS
SELECT symbol FROM InputStream1;
This article was developed using the StreamBase Java Toolkit for Eclipse, v3.7.2, which supports development of enqueuer and dequeuer clients, custom Java operators, Java functions, and embedded adapters that are compatible with StreamBase 3.5.x or 3.7.x.
Installation of the StreamBase Java Toolkit for Eclipse is detailed in Getting Started with the StreamBase Java Toolkit for Eclipse.
Follow these steps to create an Eclipse project in which to build a StreamBase dequeuer client.
- In Eclipse, create a new Java project by selecting the File > New > Project... menu item, or right-click in the Package Explorer view and select New > Project... from the popup menu.
In the New Project, Select a Wizard window, select Java Project and click the Next command button.
In the New Project, Create a Java Project window, enter a Project name and click the Next command button.
- Use the New Java Project, Java Settings window to add the StreamBase client libraries to the project.
Select the Libraries tab and click the Add Library... command button, which opens the Add Library window.
Highlight the StreamBase Client API entry and click the Next command button.
In the Add Library, StreamBase Client API Library window, select the version of StreamBase against which this dequeuer client will run and click the Finish command button.

Complete the process by clicking the Finish command button.

If you already have a Java project to which you want to add StreamBase Java client coding, you can add the StreamBase Client Library to the project by right-clicking on the project icon and selecting Build Path > Add Libraries... from the popup menu, which opens the Add Library window. Follow the preceding instructions to add the StreamBase Client Library to your existing project.
Alternatively, you could right-click on the project icon and select Properties from the popup menu. In the Properties window, select the Java Build Path entry and then the Libraries tab. Click the Add Library... command button and continue as described previously.
Follow these steps to generate starting point code using the StreamBase Java Toolkit.
- Highlight the icon corresponding to your new Java project, right-click and select New > Other... from the popup menu. Alternatively, highlight the project and select the File > New > Other... menu entry.
The New, Select a Wizard window opens; expand the options under the StreamBase entry, highlight the StreamBase Client selection and click the Next command button.

- In the StreamBase Client, StreamBase Client window, as the Type: entry select the Dequeuer radio button, confirm that the Source folder: text box references your project directory, enter the package name for your dequeuer client class into the Package: text box and the class name into the Name: text box, as shown in the following figure. (You can enter any package and class name you want.)

Note that the Target: radio button corresponding to your target version is preselected. If you want to change the target version, select the other radio button and the window will display a link to the Build Path dialog window.

- Click the Next command button to move to the New StreamBase Client, New StreamBase Client Dequeuer window. If you want this client to retrieve tuples from all of the output streams of a StreamBase application, select the "All Streams" radio button in the Dequeue From: group. To explicitly specify which output streams the client should use, select the "Specified Streams" radio button, which enables the Add a Stream: and Streams: controls.
Enter the name of an output stream into the Add a Stream: text box, then click the Add command button to add this entry to the listing in the Streams: listing. If desired, specify multiple output streams.
After adding an entry to the Streams: listing, you can specify a predicate, which will limit the tuples retrieved from an output stream. When entering a string value into a predicate, use single quotation marks ('...') rather than double quotation marks ("...").

If desired, use the Up and Down command button to reorder the entries in the Streams: listing. Into the Host: and Port: text boxes enter the name of the computer running the StreamBase application and the TCP/IP port on which to access the output stream(s). Click the Finish command button to generate the starting point code
The starting point code generated by the StreamBase Java Toolkit is a complete application that will compile and run. The application will simply print out the contents of each retrieved tuple identifying the output stream from which the tuple was retrieved. To extend this application, you will need to make small edits in several places and modify the content of the processTuple method to carry out your intended processing.
The starting point code includes four instance variables
private final static String SB_URI = "sb://localhost:10000";
private static StreamBaseClient client = null;
private String[] streamNames = { "OutputStream1", "OutputStream2" };
private String[] predicate = { "symbol='IBM'", "" };
And five methods: main, a constructor, run, startDequeuing, and processTuple.
If you did not specify the output streams by name, the streamNames array will be set to null and the predicate array would not be included in the generated code.
The main method checks whether you have provided a string URI as a command line argument when starting the application. If so, this string is used to create an instance of the StreamBaseURI class. If you do not provide a URI, a string URI derived from your entries on the New StreamBase Client, New StreamBase Client Dequeuer window, which have been incorporated into the SB_URI instance variable, is used to create an instance of the StreamBaseURI class. The main method then calls your dequeuer client's constructor method. It is in the client's constructor that the proxy to the StreamBase application is initialized using the URI class. You should not need to make any changes to the code in the main method.
After creating and initializing an instance of the proxy in the class' constructor, the main method calls the run method.
public static void main(String[] args) {
try {
StreamBaseURI uri = null;
if (args.length > 0) {
uri = new StreamBaseURI(new String(args[0]));
} else
uri = new StreamBaseURI(SB_URI);
MyDequeuer cMyDequeuer= new MyDequeuer(uri);
cMyDequeuer.run();
} catch (StreamBaseException e) {
System.out.println("StreamBaseException: +" + e.getMessage());
e.printStackTrace();
} finally {
if (client != null) {
try {
client.close();
} catch (StreamBaseException e) {
System.out.println
("StreamBaseException while closing client: "
+ e.getMessage());
e.printStackTrace();
}
}
}
}
public MyDequeuer(StreamBaseURI uri) throws StreamBaseException {
System.out.println("Connecting to " + uri.toString() + "...");
client = new StreamBaseClient(uri);
System.out.println("Connected to " + uri.toString());
}
In the run method, the code iterates through the collection of output streams and the proxy subscribes to each stream. If a predicate has been specified for a stream, it is used to filter the number of tuples that will be passed to this client. Note that if a predicate has been defined, the code will use the three argument version of the subscribe method that sets up a filter subscribe. In the call to subscribe, the second argument is a unique name through which the StreamBase application identifies this dequeuer client. For convenience, this argument is the same as the name of the output stream. If you want to run multiple filtered dequeuer clients against this StreamBase server, you will need to change this argument to a unique value. If you do not specify a predicate, the code uses the one argument version of subscribe, which does not provide filtering.
After subscribing to the output streams, the code calls the startDequeuing method.
public void run() {
try {
// Subscribe to all specified streams
for (int i = 0; i < streamNames.length; i++) {
if (predicate[i] != null && predicate[i].length() > 0)
client.subscribe(streamNames[i], streamNames[i],
predicate[i]);
else
client.subscribe(streamNames[i]);
}
// Start dequeue
startDequeuing();
} catch (StreamBaseException e) {
System.out.println("StreamBaseException: +" + e.getMessage());
e.printStackTrace();
} finally {
if (client != null) {
try {
client.close();
} catch (StreamBaseException e) {
System.out.println
("StreamBaseException while closing client: "
+ e.getMessage());
e.printStackTrace();
}
}
}
}
The startDequeuing method enters a continuous loop and retrieves tuples from the output stream(s). The application dequeues one or more tuples on each iteration of the loop. The code then creates an iterator and extracts each tuple from the list of tuples. Each tuple is then passed to the processTuple method. Note that the list of tuples may include tuples emitted on any of the output streams to which the dequeuer client has subscribed. Therefore, both the output stream name and the tuple are passed as arguments to the processTuple method.
private void startDequeuing() throws StreamBaseException {
DequeueResult dr = null;
try {
//client.dequeue() blocks until either data
//has arrived for a single stream,
//or the client connection has been broken (returning null)
while ((dr = client.dequeue()) != null) {
dr.reuseTuple();
Iterator tuples = dr.iterator();
while (tuples.hasNext()) {
processTuple(dr.getStreamName(), (Tuple) tuples.next());
}
}
} catch (StreamBaseException e) {
System.out.println("StreamBaseException during dequeue: "
+ e.getMessage());
e.printStackTrace();
}
System.out.println
("Client connection ended: dequeuing thread exiting.");
return;
}
As generated, the processTuple method simply calls the toString method on each tuple so that the value in each field is displayed.
private void processTuple(String stream, Tuple tuple) {
// Note that this runs synchronous to the dequeuing process,
// so this method should return promptly.
System.out.println("Tuple dequeued: " + tuple.toString()
+ " from streamProperties: "
+ streamProperties.getQualifiedName());
}
You will need to modify the code within this method, using the tuple field values to accomplish your business objective.
Follow these steps to run the StreamBase application and your dequeuer client.
- From within StreamBase Studio, start the application described in the Introduction. Using the Manual Input view, confirm that text entered as input is emitted from both output streams
- From within Eclipse, in the Package Explorer view, highlight the icon corresponding to your dequeuer application. Then right-click and select Run As > Run... from the popup menu; alternatively, select Run > Run... menu item, or click on the Run Eclipse Application toolbar button and select Run... from the dropdown menu.
All three options open the Run, Create, manage, and run configurations window. In the left-hand panel, highlight the Java Application entry and click on the New launch configuration toolbar icon.

- Now, in the right-hand panel, enter a name (for example, dequeuer) for this launch configuration into the Name: text box, confirm that the dequeuer project is identified in the Project: text box and that your application class,
com.training.MyDequeuer, is identified in the Main class: text box. Save your entries by clicking the Apply command button.
- Start the dequeuer application by clicking the Run command button. A console window opens in the bottom right-hand corner of the Eclipse window and the dequeuer application displays confirmation that it has subscribed to the StreamBase application.
- Return to StreamBase Studio and use the Manual Input view to submit data to the application. Notice that all data is emitted by both output streams but that the tuples retrieved by OutputStream1 are those with a symbol of IBM, whereas OutputStream2 retrieves all of the submitted tuples. This difference is due to the fact that your client used the version of the subscribe method that filters retrieved by the dequeuer client; OutputStream1 will only deliver tuples containing the symbol IBM to the dequeuer client while OutputStream2 will deliver all tuples to the dequeuer client.
