Library Articles
Using the StreamBase Java Toolkit to Write an Enqueuer Client
Author: Dr. John Lifter
Contributor: Bingfang Song
StreamBase Systems
10-January-2007
Revised: 10-May-2007
Applicable To: StreamBase 3.7
Topics:
An enqueuer client is an application that submits tuples to one, or more, input streams in a StreamBase application. The StreamBase Java Toolkit for Eclipse will generate starting point code for an enqueuer client.
In the discussion that follows, an enqueuer client against a very simple StreamBase application will be developed. The schema defined on the input stream includes a single string field, size 10. The application is illustrated in the following figure.

The equivalent application may be written in StreamSQL.
CREATE INPUT STREAM InputStream1 (symbol string(10));
CREATE OUTPUT STREAM OutputStream1 AS
SELECT symbol FROM InputStream1;
CREATE OUTPUT STREAM OutputStream2 AS
SELECT symbol FROM InputStream1;
This article was developed using the StreamBase Java Toolkit for Eclipse, v3.7.2, which supports development of enqueuer and dequeuer clients, custom Java operators, Java functions, and embedded adapters that are compatible with StreamBase 3.5.x or 3.7.x.
Installation of the StreamBase Java Toolkit for Eclipse is detailed in Getting Started with the StreamBase Java Toolkit for Eclipse.
Follow these steps to create an Eclipse project in which to build a StreamBase enqueuer client.
- In Eclipse, create a new Java project by selecting the File > New > Project... menu item, or right-click in the Package Explorer view and select New > Project... from the popup menu.
In the New Project, Select a Wizard window, select Java Project and click the Next command button.
In the New Project, Create a Java Project window, enter a Project name and click the Next command button.
- Use the New Java Project, Java Settings window to add the StreamBase client libraries to the project.
Select the Libraries tab and click the Add Library... command button, which opens the Add Library window.
Highlight the StreamBase Client API entry and click the Next command button.
In the Add Library, StreamBase Client API Library window, select the version of StreamBase against which this dequeuer client will run and click the Finish command button.

Complete the process by clicking the Finish command button.

If you already have a Java project to which you want to add StreamBase Java client coding, you can add the StreamBase Client Library to the project by right-clicking on the project icon and selecting Build Path > Add Libraries... from the popup menu, which opens the Add Library window. Follow the preceding instructions to add the StreamBase Client Library to your existing project.
Alternatively, you could right-click on the project icon and select Properties from the popup menu. In the Properties window, select the Java Build Path entry and then the Libraries tab. Click the Add Library... command button and continue as described previously.
Follow these steps to generate starting point code using the StreamBase Java Toolkit.
- Highlight the icon corresponding to your new Java project, right-click and select New > Other... from the popup menu. Alternatively, highlight the project and select the File > New > Other... menu entry.
The New, Select a Wizard window opens; expand the options under the StreamBase entry, highlight the StreamBase Client selection and click the Next command button.

- In the StreamBase Client, StreamBase Client window, as the Type: entry select the Enqueuer radio button, confirm that the Source folder: text box references your project directory, enter the package name for your enqueuer client class into the Package: text box and the class name into the Name: text box. (You can enter any package and class name you want.)

Note that the Target: radio button corresponding to your target version is preselected. If you want to change the target version, select the other radio button and the window will display a link to the Build Path dialog window.

- Click the Next command button to move to the New StreamBase Client, New StreamBase Client Enqueuer window.

Select either the All Streams or Specified Streams radio button from the Enqueue To: group. If you choose All Streams, the generated code will retrieve a listing of all available input streams from the StreamBaseClient proxy instance once it has connected to the StreamBase application. If you choose Specified Streams, you will enter the names of the input streams to which the client will submit data.
Use the Host: and Port: text boxes to specify the computer hosting the StreamBase application and the TCP/IP port used by the application to receive input messages.
The optional checkboxes allow finer control over the content of the generated code. If you check the Enable buffering checkbox you must make entries in the Buffer Size(tuples): and Flush Interval(ms): text boxes. The generated code will use these entries as the parameters to the enableBuffering method, which is called on the StreamBaseClient instance.
If you check the Enable batching checkbox, the generated code will allow you to populate and enqueue a collection of tuples rather than populating and enqueuing tuples one at a time. The size of the collection is the value you enter into the Batch Size(tuples): text box.
Finally, the Use a background thread checkbox will lead to code that enqueues the tuples in a thread separate from the application's main thread.
In this first effort, leave all of the checkboxes unchecked.
The generated code differs depending on which, if any, of the checkboxes you select, however the only portion of the code that you need to modify is the body of the fillTupleValues method. As shown in the following fragment for an enqueuer client that does not employ batching, no default values are set into the tuple fields. Although the generated code will run, each field value will contain a null value.
protected void fillTupleValues
(Collection tuples, Schema schema, String streamTarget)
throws TupleException
{
tuples.clear();
Schema schema = streamProperties.getSchema();
Schema.Field[] fields = schema.getFields();
// when batching, the FOR loop with index i is operational
//for (int i = 0; i < batchSize; ++i) {
Tuple t = schema.createTuple();
for (int j = 0; j < fields.length; j++) {
// this loop sets every field in the tuple to null
// although unset fields are always set to null
// alternative methods on the tuple object can set fields
// by index or by name
Schema.Field field = fields[j];
t.setField(field, null);
}
// end of for loop with index j
tuples.add(t);
//}
// end of for loop with index i
}
In your application, you have a choice of methods to set each field value, as shown in the following code extract.
protected void fillTupleValues
(Collection tuples, Schema schema, String streamTarget)
throws TupleException
{
tuples.clear();
Schema.Field[] fields = schema.getFields();
for (int j = 0; j < fields.length; j++) {
Schema.Field field = fields[j];
String fieldName = field.getName();
//t.setString(j, "GM");
//t.setString(fields[j], "HP");
//t.setString(fieldName,"APPL");
//t.setField(j,new String("IBM"));
//t.setField(fields[j], new String("MSFT"));
t.setField(fieldName, new String("FORD"));
}
tuples.add(t);
}
Take a moment to review the code in the run method. Notice how the code uses an instance of a java.util.Collection object to hold multiple tuples and enqueues tuples to the StreamBase application with the version of the enqueue method that takes a collection, rather than a single tuple instance, as a parameter. With this approach, the same code can support clients that either use, or do not use, batching. Also note that the run method uses a while loop, with a one second delay, to repeatedly call the fillTupleValues and enqueue methods. In an actual application, you will want to remove the one second delay and probably change the behavior of the while loop
Follow these steps to run the StreamBase application and your enqueuer client.
- From within StreamBase Studio, start the application described in the Introduction. Using the Manual Input view, confirm that text entered as input is emitted from both output streams
- From within Eclipse, restart the dequeuer application (see Using the StreamBase Java Toolkit to Write a Dequeuer Client) by selecting the dequeuer launch configuration.
- Again from within Eclipse, in the Package Explorer view, highlight the icon corresponding to your enqueuer application. Then right-click and select Run As > Run... from the popup menu; alternatively, select Run > Run... menu item, or click on the Run Eclipse Application toolbar button and select Run... from the dropdown menu.
In the run configuration window, in the left-hand panel, highlight the Java Application entry and click on the New launch configuration toolbar icon.
Now, in the right-hand panel, enter a name (for example, enqueuer) for this launch configuration into the Name: text box and confirm that the enqueuer project is identified in the Project: text box and that your application class, com.training.MyEnqueuer, is identified in the Main class: text box. Save your entries by clicking the Apply command button.
Start the enqueuer application by clicking the Run command button. Another console window opens in the bottom right-hand corner of the Eclipse window. Switch back and forth between the console windows to confirm that the StreamBase application is processing each input.
In the same Eclipse project, create a second enqueuer. This time, check the Enable buffering checkbox. As before, add code to the fillTupleValues method and then rerun the application. You should observe that the enqueuer application enqueues tuples at an even, constant pace whereas the dequeuer application retrieves tuples in batches. The number of tuples retrieved in each batch is equivalent to the value you entered into the Buffer size(tuples): text box