Custom Java Aggregate Function Sample

Introduction

The nth.sbapp sample application demonstrates a custom Java aggregate function, calljava, that performs an aggregation on three tuples. The calljava function for aggregate expressions was built by extending the AggregateWindow class, which is documented in the StreamBase Java API.

In the application, each input tuple contains a single integer field. The Aggregate operator has a window size of 3, so it It waits until three tuples are input before emitting a tuple. The output tuple contains four fields, each generated by an expression in the Aggregate operator:

  • firstval(val)

  • calljava('com.streambase.sb.sample.Nth', 2, val)

  • calljava('com.streambase.sb.sample.Nth', 3, val)

  • calljava('com.streambase.sb.sample.Nth', 4, val)

In the first expression, the built-in function firstval is used to return the value in the first tuple.

In the remaining expressions calljava is used to do the same thing: it returns the value of a specified tuple.

Notice that the last expression references a fourth tuple. However, only three tuples should be in the window when it closes, because the aggregate window size size is 3. This reference to a non-existent tuple is a deliberate mistake - read on and run the sample to find out the result.

This topic describes how to load and run the nth.sbapp sample. For more information about custom Java functions, see the topic, Using the StreamBase Java Function Wizard, in the API Guide.

Sample Location

By default, the sample files are installed in:

  • On Windows: C:\Program Files\StreamBase Systems\StreamBase.n.m\sample\custom-java-aggregate

  • On UNIX: /opt/streambase/sample/custom-java-aggregate

When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace. StreamBase Systems recommends that you use the workspace copy of the sample, especially on UNIX, where you may not have write access to /opt/streambase. In the default installation, the path to this sample in your Studio workspace is:

UNIX:       
  ~/streambase-studio-n.m-workspace/sample_custom-java-aggregate
Windows XP:
  C:\Documents and Settings\username\My Documents\StreamBase Studio n.m Workspace\
      sample_custom-java-aggregate
Windows Vista:
  C:\Users\username\Documents\StreamBase Studio n.m Workspace\
      sample_custom-java-aggregate

This Sample's Files

The sample has the following files:

  • The source code for the function, Nth.java

  • A sample configuration file, sbd.sbconf, which tells StreamBase Server to load the custom simple function.

  • A sample application, nth.sbapp, which uses the function.

  • An ant build.xml file that can be used to create the function's JAR file.

Running the Custom Java Aggregate Function Application

Running nth.sbapp in StreamBase Studio

  1. In the Package Explorer, double-click to open the nth.sbapp application. Make sure the application is the currently active tab in the EventFlow Editor.

  2. Click the Run button. This opens the SB Test/Debug perspective and starts the application.

  3. When the server starts, StreamBase Studio switches to the SB Test/Debug perspective.

  4. On the Manual Input view, enter the following values in the val field, pressing Send Data after each entry:

    1
    2
    3

  5. Observe the tuples in the Application Output view. No tuples are output until three input tuples are sent, because the aggregate's window size is set to 3. The output you should observe is:

    first=1.0, second=2.0, third=3.0, fourth=null

    These values are returned by the calljava functions for each field. Notice that the fourth field is always null because there are only three tuples in the window.

  6. When done, press F9 or click the Stop Running Application button.

Running nth.sbapp in Terminal Windows

This section describes how to run the sample in UNIX terminal windows or Windows command prompt windows. On Windows, be sure to use the StreamBase Command Prompt from the Start menu as described in the Test/Debug Guide, not the default command prompt.

  1. Open three terminal windows on UNIX, or three StreamBase Command Prompts on Windows. In each window, navigate to the directory where the sample is installed, or to your workspace copy of the sample, as described above.

  2. In window 1, launch the StreamBase Server for the sample application:

    sbd -f sbd.sbconf nth.sbapp -p 10000

  3. In window 2, run a dequeuer so that you can see the output that will be produced:

    sbc dequeue

  4. In window 3:

    1. Run an enqueuer:

      sbc enqueue InputStream1

    2. Type these numbers into the enqueuer, one number per line:

      1
      2
      3

  5. In window 2, you should see this output from the application:

    first=1.0, second=2.0, third=3.0, fourth=null

    These values are returned by the calljava functions for each field. Notice that the fourth field is always null because there are only three tuples in the window.

  6. In window 2, type: Ctrl-Z (Windows) or Ctrl-D (UNIX). The sbc command exits.

  7. In window 3, type: sbadmin shutdown. This terminates the server and the dequeuer.

Back to Top ^

How We Created the Custom Java Aggregate Function Sample

  1. Created the custom Java application:

    1. Edited and saved Nth.java in a text editor.

    2. Built Nth.java using our favorite Java development tools, to generate Nth.jar:

      Note

      The installation includes a build file, build.xml, which you can use to build Nth.java if ant is installed on your system.

  2. Launched StreamBase Studio

  3. Created the custom-java-aggregate project. Selected the option to Create template server configuration file.

  4. From the top menu, in the SB Authoring perspective, selected FileNewEventFlow Application. Selected the custom-customjava-aggregate project, and entered nth.sbapp for the diagram name.

  5. Created an input stream:

    1. Dragged an input stream from the palette to the canvas.

    2. Clicked it on the canvas, which invoked the Input Stream Properties view.

    3. On the Edit Schema tab, added:

      • Field Name: val

      • Type: double

      • Size: 8

  6. Set up the Aggregate operator:

    1. Dragged an Aggregate operator from the palette to the canvas and opened its Properties view.

    2. Dragged a connector from the Input Stream to the input of the Aggregate1 operator.

    3. On the Dimensions tab, clicked the Add button.

      In the Edit Diminsion dialog, entered the following:

      • Name: val

      • Window size: Close and emit after 3 tuples

      Clicked OK.

    4. On the Aggregate Functions tab, unchecked the delta option, Output all input fields. Then used the Add button to add these functions:

      Aggregate Functions

      Output Field Name Expression
      first firstval(val)
      second calljava('com.streambase.sb.sample.Nth', 2, val)
      third calljava('com.streambase.sb.sample.Nth', 3, val)
      fourth calljava('com.streambase.sb.sample.Nth', 4, val)


  7. Created an output stream:

    1. Dragged an output stream from the palette to the canvas.

    2. Connected the Map1 operator to the output stream.

  8. Double-clicked sbd.sbconf in the Resources folder to open it in the StreamBase editor. In the file, removed unnecessary template code and added a java-vm element referring to the Random.jar custom Java program.

Back to Top ^

Back to Top ^