Developers: Using the StreamBase Java Toolkit to Write a Custom Java Aggregate Function

Home
Documentation
Library
Sample Code and Applications
FAQs
Articles
Community
Training
Download Center
Contact DevZone

Printer Friendly

Library Articles

Using the StreamBase Java Toolkit to Write a Custom Java Aggregate Function

Author: Dr. John Lifter
Contributor: Bingfang Song
StreamBase Systems
12-January-2007
Revised: 10-May-2007

Applicable To: StreamBase 3.5, 3.7

 

Topics:

Introduction 

You implement an aggregate custom function by writing a class that extends com.streambase.sb.operator.AggregateWindow, providing implementations for the init, calculate, and accumulate methods.

The init method is called each time a new window is opened. The accumulate method is called each time a tuple enters a window. And the calculate method is called each time a window closes and/or emits an output tuple.

The accumulate method performs the logic of your custom function; its return type is void and its parameters, if any, are used by your processing. Most likely, logic within accumulate will save state in an instance variable that will then be used by the calculate method. The calculate method must return a StreamBase internal data type (int, Integer, double, Double, Timestamp, byte[], String), which becomes the value of the output return field of the aggregate function.

This article was developed using the StreamBase Java Toolkit for Eclipse, v3.7.2, which supports development of enqueuer and dequeuer clients, custom Java operators, Java functions, and embedded adapters that are compatible with StreamBase 3.5.x or 3.7.x.

Starting an Eclipse StreamBase Project 

Follow these steps to create an Eclipse project in which to build a StreamBase Custom Java Simple Function.

  1. In Eclipse, create a new Java project by selecting the File > New > Project... menu item, or right-click in the Package Explorer view and select New > Project... from the popup menu.

    In the New Project, Select a Wizard window, select Java Project and click the Next command button.

    In the New Project, Create a Java Project window, enter a Project name and click the Next command button.

  2. Use the New Java Project, Java Settings window to add the StreamBase client libraries to the project.

    Select the Libraries tab and click the Add Library... command button, which opens the Add Library window.

    Highlight the StreamBase Client API entry and click the Next command button.

    In the Add Library, StreamBase Client API Library window, select the version of StreamBase against which this dequeuer client will run and click the Finish command button.

    Add StreamBase Client Library

    Complete the process by clicking the Finish command button.

    New Java Project, Java Settings Window

    If you already have a Java project to which you want to add StreamBase Java client coding, you can add the StreamBase Client Library to the project by right-clicking on the project icon and selecting Build Path > Add Libraries... from the popup menu, which opens the Add Library window. Follow the preceding instructions to add the StreamBase Client Library to your existing project.

    Alternatively, you could right-click on the project icon and select Properties from the popup menu. In the Properties window, select the Java Build Path entry and then the Libraries tab. Click the Add Library... command button and continue as described previously.

Using the StreamBase Java Toolkit to Create a Custom Java Aggregate Function 

Follow these steps to generate starting point code using the StreamBase Java Toolkit.

  1. Highlight the icon corresponding to the Java project, right-click and select New > Other... from the popup menu. Alternatively, highlight the project and select the File > New > Other... menu entry.

    The New, Select a Wizard window opens; expand the options under the StreamBase entry, highlight the StreamBase Java Function selection and click the Next command button.

  2. In the New StreamBase Java Function, StreamBase Custom Java Function window, check the Aggregate Function and New Class: controls then enter a package name into the Package: text box and the name of your class into the Name: text box.

    New StreamBase Java Function Window

  3. Click the Next command button to move to the New StreamBase Java Function, StreamBase Custom Java Aggregate Function window.

    To define the parameters to the accumulate method, click the Add command button and in the New Arguments window select the argument type from the Select Type: dropdown list and click the Add command button to add the entry to the Argument List: listing. Once entered into the listing, an argument can be given a more meaningful name by selecting and changing the entry under the Name column.

    New Arguments Window

    Note that although parameters to the accumulate method (and the type of the return value from the calculate method) may be either Java primitive types or Java Objects, as summarized in the following table, this Java toolkit wizard only offers the primitive options (with the exception of the Timestamp object type).

    StreamBase Type

    Java Primitive Type

    Java Object Type

    boolean

    boolean

    Boolean

    int

    int

    Integer

    double

    double

    Double

    string

    byte[]

    String

    timestamp

    com.streambase.sb.
    Timestamp

    com.streambase.sb.
    Timestamp

  4. Click the OK command button to return to the New StreamBase Java Function, StreamBase Custom Java Aggregate Function window and in the dropdown under the calculate() return type: label, select the return type.

    New StreamBase Java Function Window

    Click the Finish command button. The starting point code is generated.

Understanding the Generated Code 

The starting point code includes the following code.

   package com.training;

   import com.streambase.sb.operator.AggregateWindow;

   public class CountMatches extends AggregateWindow {

     public static final long serialVersionUID = 1168628033171L;

     public void init() {
     }

     public int calculate() {
       // TODO Implement function here
       return 0;
     }

     public void accumulate(byte[] val, byte[] pattern) {
       // TODO Implement function here
     }

     public void release() {
     }
   }
   

You can optionally change the method signatures to use String and Integer objects rather than byte[] and int, which makes the coding a little easier.

   package com.training;

   import com.streambase.sb.operator.AggregateWindow;

   public class CountMatches extends AggregateWindow {

     public static final long serialVersionUID = 1168628033171L;

     public void init() {
     }

     public Integer calculate() {
       // TODO Implement function here
       return 0;
     }

     public void accumulate(String val, String pattern) {
       // TODO Implement function here
     }

     public void release() {
     }
   }
   
Writing the Method Bodies 

Once the code has been generated, you must add processing logic to the init, accumulate, and calculate methods.

When a window using this aggregate function opens, the init method initializes a counter (matches) to zero. The accumulate method compares two incoming string fields (val and pattern) and if they are the same, increments the counter (matches). When a window using this aggregate function closes, the calculate method returns the current value of the counter (matches)

   package com.training;

   import com.streambase.sb.operator.AggregateWindow;

   public class CountMatches extends AggregateWindow {

     public static final long serialVersionUID = 1168628033171L;
     private int matches;

     public void init() {
       matches = 0;
     }

     public int calculate() {
       // TODO Implement function here
       return matches;
     }

     public void accumulate(byte[] val, byte[] pattern) {
       // TODO Implement function here

       if (new String(val).equals(new String(pattern))) matches++;
     }

     public void release() {
     }
   }
   

An alternative implementation, in which the method signatures for accumulate and calculate use Java Object types, would contain the following code.

   package com.training;

   import com.streambase.sb.operator.AggregateWindow;

   public class CountMatches extends AggregateWindow {

     public static final long serialVersionUID = 1168628033171L;
       private int matches;

     public void init() {
       matches = 0;
     }

     public Integer calculate() {
       // TODO Implement function here
       return new Integer(matches);
     }

     public void accumulate(String val, String pattern) {
       // TODO Implement function here

       if (val.equals(pattern)) matches++;
     }

     public void release() {
     }
   }
   
Creating a JAR File 

Before you can use your custom function in a StreamBase application, you need to prepare a JAR file that contains the .class file and a generic manifest.

  1. In the Package Explorer, highlight the icon that represents your project and select File > Export... menu item. Alternatively, you can right-click and select Export... from the popup menu. In the Export window, select the JAR file entry under the Java category and click the Next command button.

  2. In the JAR Export, JAR File Specification window, check the checkbox corresponding to the package(s) you want to include in the JAR file and in the JAR file: text box, provide the path and name of the JAR file that will be created by this process.

  3. Then click the Next command button twice to move to the JAR Export, JAR Manifest Specification window. Select the Generate the manifest radio button.

  4. Complete the process by clicking the Finish command button. The JAR file will be given the specified name and written to the location provided in the JAR Export, JAR File Specification window.
Using the Java Function in a StreamBase Application 

As with a custom Java simple function, you must import the JAR file into a StreamBase project before you can use your custom aggregate function. When working within StreamBase Studio, import the JAR file into the Custom Libraries directory under your project. In a deployed application, the location and name of the JAR file are included as an entry in the application's configuration file.

Again, the calljava function is used to call this function from a StreamBase application. However, in the case of an aggregate function, StreamBase knows the names of the methods and the sequence in which they should be invoked. Consequently, the signature of the calljava method does not need to identify the target method. The only parameters are the the full package and class name (com.training.CountMatches) and the parameters to the accumulate method.

   calljava('com.training.CountMatches', pattern, val)
Related Topics 

Back to Top ^