This topic explains how to use the StreamBase API to create your own custom Java operators, which you can then use in StreamBase applications.
Migration Note
The StreamBase Java Client API changed in Version 5. Prior releases of
the standalone Java Toolkit for Eclipse support 3.7 and earlier versions, but
StreamBase Studio now always generates code for the current version.
Starting in Version 3.7, clients have methods that take a StreamProperties object instead of a string
.
stream-name
This topic assumes that you used the StreamBase Java Operator wizard to generate the initial starter code for your operator, following the development process recommended in Developing Java Operators.
To understand how Java operators are used, we also recommend that you run the
installed Java Operator sample, view the source for StringCase.java, and see the sample's README in .
Examples in this topic are based on the Java Operator sample. To load this sample
into StreamBase Studio, use →
and select streambase-install-dir/sample/javaoperator/*javaoperator from the [Extending
StreamBase] section.
The sections that follow describe the methods that are available in the Java Operator API, and the steps for defining a custom operator. We also walk you through the creation of a sample Java operator. For additional details, please refer to the Operator package's Javadoc. Also see the description of the Java Operator's Properties View in the Authoring Guide.
The StreamBase Java API for custom operators extend the following packages in the StreamBase Java API:
-
com.streambase.sb.operator -
com.streambase.sb.operator.parameter
Like any other StreamBase operator, Java Operators may have properties that you can customize within StreamBase Studio. You will need to modify the initial properties that are generated by the StreamBase Java Operator Wizard.
Each Java Operator class has an Object that implements
the com.streambase.sb.operator.Paramaterizable
interface. This Parameterizable Object has
getter/setter accessor methods that are used in the StreamBase Studio
Properties View. The Parameterizable object is actually
a JavaBean as defined in the java.beans package.
StreamBase finds the Parameterizable
Object by calling the Operator.getParameters method
on an instance of the Java Operator class.
The Java Operator class can either implement the Parameterizable interface directly, or delegate to another
object that implements the interface. The default Operator constructor assumes that this is the case. Another
Operator constructor that has a Parameterizable argument also exists. There is also an
Operator.setParameters method that can be used to set
the Operator's Parameterizable Object. In any case,
all Java Operators must have a public
default constructor.
Properties are declared simply by implementing get/set accessor methods. Here is a
code fragment of an Operator class with some
properties:
public class Example extends Operator implements Parameterizable {
private int startCount = 1;
private boolean clever = true;
private String [] stockSymbols;
private String name;
public void setStartCount(int i) {startCount = i;}
public int getStartCount() {return startCount;}
public void setClever(boolean b) {clever = b;}
public boolean isClever() {return clever;}
public void setStockSymbols(String [] ss) {stockSymbols = ss;}
public String [] getStockSymbols() {return stockSymbols;}
public void setName(String s) {name=s;}
public String getName() {return name;}
public void typecheck() throws TypecheckException {}
public void processTuple(int inputPort, Tuple t) throws StreamBaseException {
}
}
The example Operator class above declares four
properties:
-
startCount
-
clever
-
stockSymbols
-
name
They will appear in StreamBase Studio's Properties View. Note that the
reader accessor method for a boolean property may be named either getPropertyName, isPropertyName,
or hasPropertyName.
StreamBase currently supports the following property types:
-
int -
long -
double -
boolean -
String -
String [] -
Schema -
Enum -
ResourceFile
The only way to create an Enum property is to declare
it as a String property in the Java Operator class
and define the enumeration values in a BeanInfo
class, as in the example below.
All but the last two are simple property types that StreamBase derives
automatically by looking at the signatures of the getter/setter methods in the
Parameterizable class. These can be specified in the
BeanInfo class with SBPropertyDescriptor objects, as in the example above.
Enum properties are string properties that can only take on a specified set of values. ResourceFile properties are similar to Enum properties, but their value must be the name of a resource. In StreamBase Studio, this is enforced by displaying a drop-down list of files available from the Resources folder of your project.
Because both Enum and ResourceFile properties are implemented as Strings, their getter
and setter methods must return and expect objects of type String, respectively.
Thus, these property types cannot be automatically derived by
StreamBase. To use these properties, an adapter must have an
accompanying BeanInfo class that returns the
SBPropertyDescriptor subclasses EnumPropertyDescriptor or ResourceFilePropertyDescriptor in its list of PropertyDescriptors.
The BeanInfo class is described in the java.beans.BeanInfo API documentation. It can be used by the
Parameterizable object to control what
properties are exposed; add additional metadata about properties, such as which
properties are optional; and access special types of properties that can't be
automatically derived via reflection. If a BeanInfo
class is present, only the properties explicitly declared in this class will be
exposed by StreamBase.
The easiest way to make a BeanInfo class is to create a class that you name by
appending BeanInfo to the Parameterizable class name. Thus if the Parameterizable class is
called OpParams, the corresponding BeanInfo would be OpParamsBeanInfo (in the same package as OpParams) and extending
java.beans.SimpleBeanInfo.
The following example BeanInfo class changes the
name property to an enum, makes the clever property
optional, and hides the stockSymbols property from
StreamBase:
public class ExampleBeanInfo extends SimpleBeanInfo {
public PropertyDescriptor [] getPropertyDescriptors() {
try {
SBPropertyDescriptor [] p = {
new SBPropertyDescriptor("StartCount", Example.class),
new SBPropertyDescriptor("Clever", Example.class).optional(),
new EnumPropertyDescriptor("Name", Example.class,
new String [] {
"Tom", "Dick", "Harry"
})
};
return p;
}
catch(Exception e) {
System.out.println("Exception: " + e);
e.printStackTrace();
}
return null;
}
}
Note that the optional() method shown in this
example simply marks the descriptor as optional.
For an example of the ResourceFilePropertyDescriptor
subclass, see Stepping through InputExampleBeanInfo.java in the topic, Using the Embedded Adapter API.
Java Operators can also change their number of input and output ports. If this is
done by setting property values, the property's setter method should call
setPortHints with the new number of ports. Output
ports are automatically configured by calling setOutputSchema. Operators may ensure the correct number of
input ports in their typecheck method by calling
the requireInputPortCount method.
For example:
/**
* This operator has two integer parameters, many input and many output ports
*/
public class RoundRobinOperator extends Operator implements Parameterizable {
private int _numOutputPorts = 1;
private int _nextOutputPort = 0;
private int _numInputPorts = 1;
public RoundRobinOperator() {
setPortHints(_numInputPorts, _numOutputPorts);
}
public int getInputPortNumber() {return _numInputPorts;}
public void setInputPortNumber(int i) {
_numInputPorts = i;
setPortHints(_numInputPorts, _numOutputPorts);
}
public int getOutputPortNumber() {return _numOutputPorts;}
public void setOutputPortNumber(int i) {
_numOutputPorts = i;
setPortHints(_numInputPorts, _numOutputPorts);
}
public void init() throws StreamBaseException {
System.out.println("RoundRobinOperator.init");
}
public void typecheck() throws TypecheckException {
System.out.println("RoundRobinOperator.typecheck");
Schema s = null;
StringBuffer errorMessage = new StringBuffer();
if(getInputPortCount() <= 0) {
throw new TypecheckException("Invalid Number of Input Ports: "
+ getInputPortCount()
+ " must have at least 1");
}
System.out.println("RoundRobinOperator.typecheck: requiring inputs: " + _numInputPorts);
requireInputPortCount(_numInputPorts);
for(int i = 0; i < getInputPortCount(); ++i) {
Schema inputSchema = getInputSchema(i);
if(s == null) {
s = inputSchema;
} else if(!s.sameFieldTypes(inputSchema)) {
errorMessage.append("Invalid schema on port " + i + "\n");
}
}
if(errorMessage.length() > 0)
throw new TypecheckException(errorMessage.toString());
System.out.println("RoundRobinOperator.typecheck: input schemas ok");
System.out.println("input port num " + getInputPortNumber()
+ " output port num " + getOutputPortNumber());
s = getInputSchema(0);
for(int i=0; i < getOutputPortNumber(); ++i) {
setOutputSchema(i, s);
}
System.out.println("RoundRobinOperator.typecheck: done");
}
public void processTuple(int inputPort, Tuple t) throws StreamBaseException {
sendOutput(_nextOutputPort, t);
_nextOutputPort = (_nextOutputPort + 1) % _numOutputPorts;
}
}
The Java Operator class has the following life cycle. Note that this section describes the life cycle within the StreamBase Server (sbd) process:
-
ConstructorAll Java Operators must have a public default constructor. The
Constructoris called when theOperatorinstance is created, but before theOperatoris connected to the StreamBase application. We recommended that you set the default Input port and Output port count in theConstructorwith thesetPortHints(inPortCount, outPortCount)method. The default is 1 input port, 0 output ports. -
typecheckThe
typecheckmethod is called after theOperatorinstance is connected in the StreamBase application, allowing theOperatorto validate its properties. TheOperatorclass may change the number of input or output ports by calling theerequireInputPortCount(portCount)method or thesetOutputSchema(schema, portNum)method. If theverifyInputPortCountmethod is passed a different number of ports than the Operator currently has, aPortMismatchException(subtype ofTypeCheckException) is thrown.Call the
getResourceContentsmethod during typecheck, instead of waiting until start or run to call it. This is to ensure that StreamBase Studio can indicate to the user whether it was able to find the resource during authoring, and avoid waiting until sbd fails silently. -
initIf
typechecksucceeds, theinitmethod is called before the StreamBase application is started. Note that your Operator class is not required to define theinitmethod, unless (for example) you need to perform initialization of a resource such as a JDBC pool, if your operator is making JDBC calls. -
processTupleThe
processTuplemethod is called when a tuple is available for processing. This is the only time an operator should enqueue outputTuples. -
shutdownThe
shutdownmethod is called when the StreamBase Server is in the process of shutting down.
A Java operator runs in the same process as the StreamBase application that contains it. One advantage of this is that when the StreamBase application starts or stops, any Java operators or embedded adapters in it start and stop along with it. More generally, the application and these components undergo life cycle changes in a synchronized fashion.
Java operators undergo life cycle changes in a prescribed order. Typically, when a StreamBase application is represented in StreamBase Studio, data flows through the application from left to right. Here, the order in which operators undergo life cycle events is described as being either “left to right”, or “right to left”, where “left to right” means in the same direction as data flows through the app. “Right to left”, on the other hand, means that Java operators will undergo some life cycle change in the opposite order as they will see data.
Life cycle changes that a StreamBase application undergoes include starting, pausing, resuming and shutting down. During starting and resuming, operators are processed in right to left order. This means that operators start in an order that is opposite to that in which they will receive data. Conversely, operators are paused and stopped in left to right order, or in the same order as they will receive data.
Java operators start, pause, resume and shutdown along with the StreamBase application that contains them. It is also possible to suspend and resume Java operators independently of their StreamBase application.
This is accomplished with the same commands that are used to suspend and resume a StreamBase application as a whole. The command sbadmin suspend can be used to suspend an application; similarly, sbadmin resume is used to resume a StreamBase application.
To suspend or resume individual operators, you can append a list of Java operators to the sbadmin suspend or sbadmin resume command. If one or more strings are appended to sbadmin suspend, then the StreamBase application as a whole will not be suspended, rather the individual Java operators named by the appended strings will be suspended.
Note that an individual Java operator can be suspended or resumed only if the StreamBase application itself is running. An individual Java operator can be running only if the application that contains it is also running. Therefore, it is not meaningful to suspend or resume an operator that is not currently running if the application that contains it is not running.
The sbc status command returns status information about the StreamBase Server.
The sbc status --operators command returns the status of any Java operators contained by the server's application. Note that sbc status --operators and sbc status are disjoint commands: sbc status returns information about the server only, not about Java operators contained in the server. Similarly, sbc status --operators returns information about contained Java operators only, and not any information about the server itself.
In this context, the status of a Java operator consists only of its current state.
For example, if a Java operator has been started and is currently running, its
state will be STARTED. If a Java operator has yet
to be started, its state will be NONE. If a Java
operator has been suspended, its state will be either SUSPENDED_AND_DROPPING_TUPLES or SUSPENDED_AND_PROCESSING_TUPLES. Lastly, if an operator has
been shut down its state will be SHUTDOWN.
By default, a Java operator starts along with the StreamBase
application that contains it. In Studio, in the General tab for Java operators,
there is a checkbox labelled Start with application.
By default, this box is checked, meaning that the Java operator will start with the
application. Deselecting this box will cause the operator to be left in the
NONE state when the application starts.
A Java operator that does not start with its application will stay in the
NONE state until it is explicitly started with the
sbadmin resume operatorName command. Such an
operator will not start even if the application as a whole is resumed. So for
example, the application as a whole may suspend and then resume; this will have no
effect on an operator that has not started with its application.
If a Java operator is suspended separately from the application that contains it, tuples might still arrive at the suspended operator. You can configure the Java operator to handle these tuples in two different ways:
-
A suspended Java operator can choose to drop tuples that are delivered to it.
-
The operator can choose to process these tuples.
These two possibilities are represented by static Strings on the class com.streambase.sb.operator.Operator, SUSPENDED_AND_DROPPING_TUPLES and SUSPENDED_AND_PROCESSING_TUPLES, respectively. A Java operator
is configured to either drop or process tuples by calling the method setSuspendBehavior on its instance. setSuspendBehavior takes an int argument, the value of which must be either
SUSPENDED_AND_DROPPING_TUPLES or SUSPENDED_AND_PROCESSING_TUPLES.
This section steps you through the creation of a sample Java Operator class,
StringCase.java. This example is kept deliberately simple, to allow you to focus on
the API without having to read other lines of code. The Java Operator is used in a
StringCase.sbapp application to simply map string input
to lower-case or upper-case. The files that comprise the sample, including the
StringCase.java file discussed here, are installed with
StreamBase in the sample/javaoperator
directory.
Before beginning this exercise, open StreamBase Studio and load the
javaoperator sample, as described in Java Operator Sample. This creates the sample_javaoperator project in your Package Explorer. In this
project, double-click StringCase.java to open it in the
Java Editor.
In an editor, we started by adding these import statements in StringCase.java:
import com.streambase.sb.DataType; import com.streambase.sb.Schema; import com.streambase.sb.StreamBaseException; import com.streambase.sb.Tuple; import com.streambase.sb.operator.Operator; import com.streambase.sb.operator.Parameterizable; import com.streambase.sb.operator.TypecheckException; import java.util.Arrays;
Next, we declared the class by extending the StreamBase Operator class. For example:
public class StringCase extends Operator implements Parameterizable {
In the constructor, we defined two constants for an enum property, conversionType. In this example, we defined the metadata for the
enum property in a Java bean, the StringCaseBeanInfo class. For example:
//
// Constants for enum property conversionType
//
public static String UPPER = "Upper Case";
public static String LOWER = "Lower Case";
/** Enum property, metadata defined in StringCaseBeanInfo.java */
private String conversionType = LOWER;
Note
The source for StringCaseBeanInfo is Stepping through
StringCaseBeanInfo.java You can find it in the sample/javaoperator directory in the StreamBase
installation.
Next we declare the Java Operator's input and output, and an array for fields in the schema. For example:
private Schema inSchema; // input to this operator
private Schema outSchema; // output from this schema
private Schema.Field[] fields; // individual fields in the schema
Then we use the setPortHints method to specify a
fixed number of input and output ports:
public StringCase() {
// this operator has one input and one output
setPortHints(1, 1);
}
Note
If the Java Operator will take a variable number of output ports, use the
Parameterizable interface and the typecheck method. There must be a parameter for the number of
input and output ports. Within typecheck, you
must specify a setOutputSchema for each port, so
that StreamBase Studio knows how many square black dots to draw for
the output ports. If you do not use setOutputSchema for each port, the Java Operator on the
StreamBase Studio canvas will not display the correct number of
black dots.
Next, we set up the typecheck method for the class,
added exception handling, and declared that the schema of the output is the same as
for the input:
public void typecheck() throws TypecheckException {
// require exactly one input port
requireInputPortCount(1);
// the input must contain at least one string field
inSchema = getInputSchema(0);
fields = inSchema.getFields();
int stringCount = 0;
for(int i=0; i < fields.length; ++i) {
DataType dt = fields[i].getDataType();
if(DataType.STRING.equals(dt)) {
++stringCount;
}
}
if (stringCount==0)
throw new TypecheckException(
"At least one string field is required");
// the output schema is the same as the input
setOutputSchema(0, inSchema);
outSchema = inSchema;
}
Finally we defined the work that the custom Java operator will perform, by using
the processTuple method. For example:
public void processTuple(int inputPort, Tuple t)
throws StreamBaseException {
Tuple out = outSchema.createTuple();
for(int i=0; i < fields.length; ++i) {
Schema.Field f = fields[i];
DataType dt = f.getDataType();
String fname = f.getName();
if(DataType.STRING.equals(dt)) {
String str = t.getString(f);
if(LOWER.equals(conversionType)) {
out.setString(fields[i], str.toLowerCase());
} else if(UPPER.equals(conversionType)) {
out.setString(fields[i], str.toUpperCase());
}
} else {
out.setField(fields[i], t.getField(fields[i]));
}
}
sendOutput(0, out);
}
public String getConversionType() {return conversionType;}
public void setConversionType(String s) {
conversionType = s;
}
}
StringCaseBeanInfo.java is the BeanInfo class for the StringCase operator. StringCaseBeanInfo defines the metadata for the properties that
will appear in the StreamBase Studio Properties view for this Java
operator. The StringCase operator has only one property, conversionType, which is an enum whose values are defined in the
StringCase class.
Now, open StringCaseBeanInfo.java in the Java Editor
and examine it. First we imported these classes:
import java.beans.PropertyDescriptor; import java.beans.SimpleBeanInfo; import com.streambase.sb.operator.parameter.EnumPropertyDescriptor; import com.streambase.sb.operator.parameter.SBPropertyDescriptor;
And defined the bean, as follows:
public class StringCaseBeanInfo extends SimpleBeanInfo {
public PropertyDescriptor [] getPropertyDescriptors() {
try {
SBPropertyDescriptor [] p = {
new EnumPropertyDescriptor("ConversionType", StringCase.class,
new String [] {
StringCase.UPPER, StringCase.LOWER
})
};
return p;
}
catch(Exception e) {
System.out.println("Exception: " + e);
e.printStackTrace();
}
return null; // should never happen
}
}
