Library Articles
Using the StreamBase Studio Custom Function Wizard
Authors: Eddie Galvez, Bingfang Song, Dr. John Lifter
StreamBase Systems
Date: 01-September-2007
Applicable To: 01-StreamBase 5.0
You can use StreamBase Studio to write code that implements a simple or aggregate custom Java function. A wizard will collect basic information about your function, for example, the number and types of the parameters and the return type, and then generate starting point code for your implementation. When developing a simple custom function your task will be to provide the implementation for a method you define; when developing an aggregate custom function, you will need to implement three predefined methods that together provide your desired functionality.
This article overviews the code generated by the wizard and describes the basic implementation tasks for both simple and aggregate custom functions. The description is specific to StreamBase 5 and later point releases. If you are using StreamBase 3.5 or 3.7, refer to the discussions in Using the StreamBase Java Toolkit to Write a Custom Java Simple Function and Using the StreamBase Java Toolkit to Write a Custom Java Aggregate Function.
Simple Custom Java Functions
You implement a simple custom function by writing a Java class that has one, or more, public static methods. A Java class used as the source of a simple custom function must be contained within a package. The parameters and return values must be a Java primitive or object type corresponding to a StreamBase type, as summarized in the following table.
|
StreamBase Type |
Java Primitive Type |
Java Object Type |
|
boolean
|
boolean
|
Boolean
|
|
int
|
int
|
Integer
|
|
long
|
long
|
Long
|
|
double
|
double
|
Double
|
|
string
|
byte[]
|
String
|
|
timestamp
|
com.streambase.sb. Timestamp
|
com.streambase.sb. Timestamp
|
To call a simple custom function, you use the calljava function, providing the full package and class name, the name of the target method, and method parameters as arguments to the invocation.
As an alternative to writing your own class, any public static method of a Java class that is already in the CLASSPATH of the application may be called. Using the java.lang.Math class as an example, the public static int abs (int value) method would be called as illustrated in the following code fragment.
calljava('java.lang.Math', 'abs', value)
When used in a StreamBase expression, value references an integer tuple field and the calljava function returns a value that is either used further within a StreamBase expression or becomes the value of a tuple field.
Note that you may also use the function alias syntax, instead of calljava.
See below for details.
Aggregate Custom Java Function
You implement an aggregate custom function by writing a class that extends com.streambase.sb.operator.AggregateWindow, providing implementations for the init, calculate, and accumulate methods.
The init method is called each time a new window is opened. The accumulate method is called each time a tuple enters a window. And the calculate method is called each time a window closes and/or emits an output tuple.
The accumulate method performs the logic of your custom function; its return type is void and its parameters, if any, are used by your processing. Most likely, logic within accumulate will save state in an instance variable that will then be used by the calculate method. The calculate method must return a StreamBase internal data type (int, Integer, long, Long, double, Double, Timestamp, byte[], String), which becomes the value of the output return field of the aggregate function.
Call an aggregate custom function as shown in the following statement, where packageName.className is the fully qualified class name of the class containing your aggregate custom function, and the value arguments are tuple fields that are passed to the accumulate method. The calljava function returns the value returned by the calculate method, which is either used further within a StreamBase expression or becomes the value of a tuple field. Return values must be a Java primitive or object type corresponding to a StreamBase type, as summarized in the table within the section Simple Custom Java Function.
calljava('packageName.className', value [, value , ...)
Create a StreamBase or standard Java project. If you choose to create a StreamBase project, the New StreamBase Project, StreamBase Project window will give you the opportunity to add the StreamBase Client API to the Java build path. If you choose to create a standard Java project, the New Java Project, Java Settings window will give you an opportunity to add the StreamBase Client API to the project.
A best practice would be to create your custom function(s) in a dedicated project, which may then be included on the build path of any StreamBase project that uses this function. Since the project will only include the .java files related to your function, there is no need to include .sbapp, .ssql, or a StreamBase configuration file in this project.
In the Package Explorer, highlight the project, right-click, and select New > Other... from the popup menu. Alternatively, select the File > New > Other... menu item. In the New, Select a Wizard window, expand the selections under the StreamBase Java Wizards listing and highlight the StreamBase Java Function entry. Click the Next command button to start the New StreamBase Java Function wizard.
After the wizard starts, the New StreamBase Java Function, StreamBase Custom Java Function window opens. Confirm that the Source folder: text box references the correct project directory, then select the Simple Function radio button in the Type: grouping, and then enter a package and class name into the appropriate text boxes. Click the Next command button to move to the New StreamBase Java Function, StreamBase Custom Java Simple Function window.

To add a method to your class, click the Add command button and in the Create StreamBase Simple Function window enter a name and return type for the function.
If the function requires parameters, select the parameter type from the Select Type: dropdown list and click the Add command button to add the entry to the Argument List: listing; repeat if additional parameters are required. You can highlight and edit the text in the Name column, once a parameter is listed in the Argument List: listing.

In this example, define two methods with the signatures:
byte[] toUpper (byte[] clause)
byte[] toLower (byte[] clause)
Once the method signatures have been defined, click the Next command button to return to the StreamBase Custom Java Simple Function window and then click the Finish command button.

The starting point code is generated.
Understanding the Generated Code
The starting point code includes the following class declaration and method implementations.
package com.training;
public class CaseConverter {
public static byte[] toUpper (byte[] clause) {
// TODO Implement function here
return null;
}
public static byte[] toLower (byte[] clause) {
// TODO Implement function here
return null;
}
}
You can optionally change the method signatures to use String objects rather than byte[], which makes the coding a little easier.
package com.training;
public class CaseConverter {
public static String toUpper (String clause) {
// TODO Implement function here
return null;
}
public static String toLower (String clause) {
// TODO Implement function here
return null;
}
}
Writing the Method Bodies
In each method, you need to replace the return null; statement with the desired processing logic. In this example, you can use the String class toUpperCase and toLowerCase methods.
- For the methods that use primitive types, you could use the following code in the
toUpper and toLower methods.
return new String(clause).toUpperCase().getBytes();
return new String(clause).toLowerCase().getBytes();
- For the methods that use object types, you could use the following code in the
toUpper and toLower methods.
return clause.toUpperCase();
return clause.toLowerCase();
After the wizard starts, the New StreamBase Java Function, StreamBase Custom Java Function window opens. Confirm that the Source folder: text box references the correct project directory, then select the Aggregate Function radio button in the Type: grouping, and then enter a package and class name into the appropriate text boxes. Click the Next command button to move to the New StreamBase Java Function, StreamBase Custom Java Aggregate Function window.

To define the parameters to the accumulate method, click the Add command button and in the New Arguments window select the argument type from the Select Type: dropdown list and click the Add command button to add the entry to the Argument List: listing. Once entered into the listing, an argument can be given a more meaningful name by selecting and changing the entry under the Name column.

Note that although parameters to the accumulate method (and the type of the return value from the calculate method) may be either Java primitive types or Java Objects, this wizard only offers the primitive options (with the exception of the Timestamp object type).
Click the OK command button to return to the New StreamBase Java Function, StreamBase Custom Java Aggregate Function window and in the dropdown under the calculate() return type: label, select the return type.

Click the Finish command button. The starting point code is generated.
Understanding the Generated Code
The starting point code includes the following code.
package com.training;
import com.streambase.sb.operator.AggregateWindow;
public class CountMatches extends AggregateWindow {
public static final long serialVersionUID = 1168628033171L;
public void init() {
}
public int calculate() {
// TODO Implement function here
return 0;
}
public void accumulate(byte[] val, byte[] pattern) {
// TODO Implement function here
}
public void release() {
}
}
You can optionally change the method signatures to use String and Integer objects rather than byte[] and int, which makes the coding a little easier.
package com.training;
import com.streambase.sb.operator.AggregateWindow;
public class CountMatches extends AggregateWindow {
public static final long serialVersionUID = 1168628033171L;
public void init() {
}
public Integer calculate() {
// TODO Implement function here
return 0;
}
public void accumulate(String val, String pattern) {
// TODO Implement function here
}
public void release() {
}
}
Writing the Method Bodies
Once the code has been generated, you must add processing logic to the init, accumulate, and calculate methods.
When a window using this aggregate function opens, the init method initializes a counter (matches) to zero. The accumulate method compares two incoming string fields (val and pattern) and if they are the same, increments the counter (matches). When a window using this aggregate function closes, the calculate method returns the current value of the counter (matches).
package com.training;
import com.streambase.sb.operator.AggregateWindow;
public class CountMatches extends AggregateWindow {
public static final long serialVersionUID = 1168628033171L;
private int matches;
public void init() {
matches = 0;
}
public int calculate() {
// TODO Implement function here
return matches;
}
public void accumulate(byte[] val, byte[] pattern) {
// TODO Implement function here
if (new String(val).equals(new String(pattern))) matches++;
}
public void release() {
}
}
An alternative implementation, in which the method signatures for accumulate and calculate use Java Object types, would contain the following code.
package com.training;
import com.streambase.sb.operator.AggregateWindow;
public class CountMatches extends AggregateWindow {
public static final long serialVersionUID = 1168628033171L;
private int matches;
public void init() {
matches = 0;
}
public Integer calculate() {
// TODO Implement function here
return new Integer(matches);
}
public void accumulate(String val, String pattern) {
// TODO Implement function here
if (val.equals(pattern)) matches++;
}
public void release() {
}
}
Follow these steps to incorporate a custom Java function into a StreamBase application.
- Create a new StreamBase project that includes an Application Diagram. (Custom functions may also be used within a StreamSQL document; refer to the discussion of the
SELECT statement for details.)
- Include the project in which you developed the custom Java function on the new project's build path.
- Use the
calljava function in an expression (or as a target list entry in the SELECT clause).
While working within StreamBase Studio, there is no requirement that the custom function code be packaged into a JAR file. To use custom functions in a deployed application it is, however, convenient to package the files into a JAR file. StreamBase Studio, through its built-in Eclipse functionality, will create the JAR file; no special content is required in the manifest file, so this can be transparently generated when the JAR file is created. A JAR file may include implementations for multiple custom functions.
You use the built-in Eclipse functionality to create and export a JAR file. In the Package Explorer, highlight the project containing your custom function code and either select the File > Export... menu item, or right-click and select Export... from the popup menu.
In the Export, Select window, expand the listing under the Java icon and select the JAR file entry, then click the Next command button. Your selections in the JAR Export, JAR File Specification window may initially seem a little strange, but it will be clearer once you have worked through the process.

Note how only the functions source code files are selected via the entries in the left-hand panel. A subsequent window will add the manifest file to the JAR; the .project, .classpath, and, if you wrote your custom functions from within a StreamBase project, the .sbapp files are not part of the custom function code and should not be included in the JAR file. Click the Browse command button and in the Save As window select the directory location and name for the JAR file; alternatively, enter the path and name of the JAR file directly into the JAR file: text box. Click the Next command button.
In the JAR Export, JAR Packaging Options window select the desired options. Note that the default entries will create a JAR file containing files with compilation warnings and errors. While you are developing your operator, it is useful to accept files with compilation warnings as these may simply be indications that you have not completed your coding and your code contains unused variables and/or class imports. Click the Next command button to move to the JAR Export, JAR Manifest Specification window.
In the JAR Manifest Specification window, select the Generate the manifest file radio button. Click the OK and Finish command buttons to complete the process. The JAR file will be written to the directory location specified earlier.

When you want to use your custom function(s) in other StreamBase applications, simply add this JAR file to the new project's build path as described in the article New Features in StreamBase Studio.
Using the Custom Function Alias
Newly introduced in StreamBase 5.0 is the concept of a custom function alias. This eliminates the need to use the calljava method to invoke a custom Java function. We encountered this earlier in this article, in the call to :
calljava('java.lang.Math', 'abs', value)
With a custom function alias, this could be simplied to a call such as
MyAbs(value)
This more concise syntax is enabled by adding a configuration entry to the <custom-functions>
section of the sbd.sbconf file:
<custom-function language="java" class="java.lang.Math" name="abs" alias="MyAbs" args="auto"/>
This line maps the Java class to a more user-friendly alias, and provides a single place in which to control the Java class used in operators.
New Features in StreamBase Studio
Using the StreamBase Studio Java Operator Wizard
Using the StreamBase Studio Client Wizard