This topic explains how to modify a custom embedded adapter's Java file for use in StreamBase applications. We
describe the methods that are available in the Adapter API, the embedded adapter's life
cycle events, and the configuration parameters that must be set for deployment.
Finally, we will step through the creation of a sample embedded adapter.
The topic assumes that the initial file has been generated by the StreamBase Embedded Adapter wizard, following the development process recommended in Developing Embedded Adapters.
If you are porting custom adapter code written for StreamBase 3.7 or earlier, see the Migration Note.
Embedded adapters extend the following packages in the StreamBase Java API:
-
com.streambase.sb.adapter.InputAdapter -
com.streambase.sb.adapter.OutputAdapter
Adapters inherit some behavior from operators, and both InputAdapter and OutputAdapter are
subclasses of com.streambase.sb.operator.Operator.
Like any other StreamBase operator, adapters can have properties that you can customize within StreamBase Studio. You will need to modify the initial properties that are generated by the StreamBase Embedded Adapter Wizard.
Each adapter must provide some object that implements the com.streambase.sb.operator.Parameterizable interface. The
adapter class may either implement this interface itself or delegate to an
accompanying class that implements this interface. The class that implements
Parameterizable must provide the getter and setter
methods that reflect the parameters that appear in the StreamBase Studio
Properties View for that adapter. StreamBase finds the Parameterizable object associated with an adapter by calling
the getParameters method on an instance of an
InputAdapter or an OutputAdapter.
The default implementation of getParameters returns
the last Parameterizable set by a call to
setParameters . If setParameters has never been called, but the adapter class itself
implements Parameterizable, getParameters simply returns the adapter object. Alternatively,
an adapter class may override getParameters to
implement any other desired behavior.
By default, StreamBase uses reflection (via JavaBeans) to derive the
available parameters and their types from the getter and setter methods provided by
the Parameterizable object. For example, here is a
code fragment of an OutputAdapter class with some
properties:
public class Example extends OutputAdapter implements Parameterizable {
private int startCount = 1;
private boolean clever = true;
private String [] stockSymbols;
private String name;
public void setStartCount(int i) {startCount = i;}
public int getStartCount() {return startCount;}
public void setClever(boolean b) {clever = b;}
public boolean isClever() {return clever;}
public void setStockSymbols(String [] ss) {stockSymbols = ss;}
public String [] getStockSymbols() {return stockSymbols;}
public void setName(String s) {name=s;}
public String getName() {return name;}
public void typecheck() throws TypecheckException {}
public void processTuple(int inputPort, Tuple t) throws StreamBaseException {}
}
The example OutputAdapter class above declares four
properties:
-
startCount -
clever -
stockSymbols -
name
They will appear in StreamBase Studio's Properties View. Note that the
getter method for a boolean property may be named
either getPropertyName, isPropertyName, or hasPropertyName.
StreamBase currently supports the following property types:
-
int -
long -
double -
boolean -
String -
String [] -
Schema -
Enum -
ResourceFile
All but the last two are simple property types that StreamBase derives
automatically by looking at the signatures of the getter/setter methods in the
Parameterizable class. These can be specified in the
BeanInfo class with SBPropertyDescriptor objects, as in the example above.
Enum properties are string properties that can only take on a specified set of values. ResourceFile properties are similar to Enum properties, but their value must be the name of a resource. In StreamBase Studio, this is enforced by displaying a drop-down list of files available from the Resources folder of your project.
Because both Enum and ResourceFile properties are implemented as Strings, their getter
and setter methods must return and expect objects of type String, respectively.
Thus, these property types cannot be automatically derived by
StreamBase. To use these properties, an adapter must have an
accompanying BeanInfo class that returns the
SBPropertyDescriptor subclasses EnumPropertyDescriptor or ResourceFilePropertyDescriptor in its list of PropertyDescriptors.
The BeanInfo class is defined in the java.beans.BeanInfo API documentation. It can be used by the
Parameterizable object to control what
properties are exposed; add additional metadata about properties, such as which
properties are optional; and access special types of properties that can't be
automatically derived via reflection. If a BeanInfo
class is present, only the properties explicitly declared in this class will be
exposed by StreamBase.
The easiest way to make a BeanInfo class is to
extend java.beans.SimpleBeanInfo in a class whose
name consists of the name of the Parameterizable
class, with BeanInfo appended, residing in the same
package as the Parameterizable class. For
example, if the Parameterizable class is called
MyAdapter, the BeanInfo class should be called MyAdapterBeanInfo.
The following example BeanInfo class makes the
clever property optional, and hides the stockSymbols property from StreamBase:
public class ExampleBeanInfo extends SimpleBeanInfo {
public PropertyDescriptor [] getPropertyDescriptors() {
try {
SBPropertyDescriptor [] p = {
new SBPropertyDescriptor("StartCount", Example.class),
new SBPropertyDescriptor("Clever", Example.class).optional(),
new SBPropertyDescriptor("Name", Example.class)
};
return p;
}
catch(IntrospectionException e) {
System.out.println("Exception: " + e);
e.printStackTrace();
}
return null;
}
}
Note that the optional() method shown in this
example simply marks the descriptor as optional.
Note
To enable a hidden password, you can use the mask property of SBPropertyDescriptor; if the property is a String type and is set, the adapter Properties view displays asterisks instead of the characters typed.
The adapter has the following life cycle when running within the StreamBase Server (sbd) process:
-
Constructor
All adapters must have a public default constructor. The constructor is called when the
InputAdapterorOutputAdapterinstance is created, but before the adapter is connected to the StreamBase application. We recommended that you set the initial input port and output port count in the constructor by callingsetPortHints(inPortCount, outPortCount). The default is no input ports or output ports. The constructor may also set default values for adapter parameters. These values will be displayed in StreamBase Studio when a new adapter is dragged to the canvas, and serve as the default values for omitted optional parameters. -
Parameters set
StreamBase will call the adapter's setter methods to configure it according to the application.
-
typecheckThe
typecheckmethod is called after the adapter instance is connected in the StreamBase application. The adapter should validate its parameters and throwTypecheckExceptionif any problems are found. The message associated with the thrownTypecheckExceptionwill be displayed in StreamBase Studio during authoring, or printed on the console by sbd. Input adapters should set the schema of each output port by calling thesetOutputSchema(portNum, schema)method for each output port. If the adapter needs to change the number of input ports based on parameter values, it should callrequireInputPortCount(portCount)at this point.Call the
getResourceContentsmethod during typecheck, instead of waiting until start or run to call it. This is to ensure that StreamBase Studio can indicate to the user whether it was able to find the resource during authoring, and avoid waiting until sbd fails silently. -
initIf
typechecksucceeds, theinitmethod is called before the StreamBase application is started. Note that your adapter is not required to define theinitmethod, unless you need to register a runnable or perform initialization of a resource such as, for example, a JDBC pool. -
runAt this point, the application begins to run. StreamBase will start threads for any managed runnables registered by earlier calls to
registerRunnable. The behavior of the adapter at this point depends on whether it is anInputAdapteror anOutputAdapter.-
processTuple(output adapters)The
processTuplemethod is called when a tuple is received on an input port. -
Call
sendOutput(input adapters)Input adapters can call
sendOutput(port, tuple)at any time to output a tuple to the specified port.
-
-
shutdownThe
shutdownmethod is called when the StreamBase Server is in the process of shutting down.
Input adapters often need to perform operations asynchronously with the StreamBase application they are running in. For example, an input adapter may need to listen for information on a socket, or read input from a file, without interrupting the rest of the application. Though output adapters are called upon by StreamBase whenever a tuple arrives on an input port, there may be other asynchronous tasks an output adapter may wish to perform, such as periodically flushing a buffer.
To facilitate such requirements, the adapter API provides a mechanism for creating
threads that are managed along with the rest of the StreamBase
application. During the init method, an adapter may
call registerRunnable(runnable) to register
Runnable objects that will be managed in separate
threads by StreamBase. When the application starts running,
StreamBase will start all registered Runnables in new threads.
StreamBase-managed threads should have the following general structure:
...
public void run() {
// Perform thread startup tasks
while (shouldRun()) {
// Perform tasks such as calling sendOutput
}
// Perform thread shutdown tasks
}
...
The call to shouldRun ties the thread into
StreamBase's thread management in the following ways:
-
Startup - The adapter is not considered started until all threads have made their first call to
shouldRun. -
Shutdown - When the application is shutting down, the adapter's
shutdownmethod will be called first. Once this has returned,shouldRunwill return false, and all threads should exit. Once all threads have exited, the adapter is considered shutdown. -
Suspend - If the adapter is being suspended, the adapter's
suspendmethod will be called first. Once this has returned, theshouldRuncall will block until the adapter is resumed. Once all threads are blocked inshouldRun, the adapter'ssuspendedmethod will be called. Finally, the adapter will be considered successfully suspended. -
Resume - When the adapter is resumed following suspension, the adapter's
resumemethod will be called. Once this has returned,shouldRunwill return in all threads (returningtrueif this is an ordinary resume, orfalseif a suspended adapter is being shutdown). After all threads have successfully unblocked, the adapter'sresumedmethod will be called.
A Java operator runs in the same process as the StreamBase application that contains it. The same is true of embedded adapters, which are a specific type of Java operator. One advantage of this is that when the StreamBase application starts or stops, any Java operators or embedded adapters in it start and stop along with it. More generally, the application and these components undergo life cycle changes in a synchronized fashion.
With respect to life cycle changes, there may be dependencies among embedded adapters. For example, an input adapter may input data into a StreamBase application. Suppose this data eventually reaches an embedded output adapter. In this case, it would be appropriate for the output adapter to start before the input adapter. That is, the input adapter shouldn't feed data to an output adapter that may have not yet started.
Embedded adapters start, pause, resume and shutdown along with the StreamBase application that contains them. It is also possible to suspend and resume embedded adapters independently of their StreamBase application.
This is accomplished with the same commands that are used to suspend and resume a StreamBase application as a whole. The command sbadmin suspend can be used to suspend an application; similarly, sbadmin resume is used to resume a StreamBase application.
To suspend or resume individual adapters, you can append a list of embedded adapters to the sbadmin suspend or sbadmin resume command. If one or more strings are appended to sbadmin suspend, then the StreamBase application as a whole will not be suspended, rather the individual embedded adapters named by the appended strings will be suspended.
Note that an individual embedded adapter can be suspended or resumed only if the StreamBase application itself is running. An individual embedded adapter can be running only if the application that contains it is also running. Therefore, it is not meaningful to suspend or resume an adapter that is not currently running if the application that contains it is not running.
The sbc status command returns status information about the StreamBase Server.
The sbc status --operators command returns the status of any embedded adapters contained by the server's application. Note that sbc status --operators and sbc status are disjoint commands: sbc status returns information about the server only, not about embedded adapters contained in the server. Similarly, sbc status --operators returns information about contained embedded adapters only, and not any information about the server itself.
In this context, the status of an embedded adapter consists only of its current
state. For example, if an embedded adapter has been started and is currently
running, its state will be STARTED. If an embedded
adapter has yet to be started, its state will be NONE. If an embedded adapter has been suspended, its state will
be either SUSPENDED_AND_DROPPING_TUPLES or
SUSPENDED_AND_PROCESSING_TUPLES. Lastly, if an
adapter has been shut down its state will be SHUTDOWN.
By default, a embedded adapter starts along with the StreamBase
application that contains it. In Studio, in the General tab for embedded adapters,
there is a checkbox labelled Start with application.
By default, this box is checked, meaning that the embedded adapter will start with
the application. Deselecting this box will cause the adapter to be left in the
NONE state when the application starts.
A Java operator that does not start with its application will stay in the
NONE state until it is explicitly started with the
sbadmin resume adapterName command. Such an
adapter will not start even if the application as a whole is resumed. So for
example, the application as a whole may suspend and then resume; this will have no
effect on an adapter that has not started with its application.
If an embedded adapter is suspended separately from the application that contains it, tuples might still arrive at the suspended adapter. You can configure the embedded adapter to handle these tuples in two different ways:
-
A suspended adapter can choose to drop tuples that are delivered to it.
-
The adapter can choose to process these tuples.
These two possibilities are represented by static Strings on the class com.streambase.sb.operator.Operator, SUSPENDED_AND_DROPPING_TUPLES and SUSPENDED_AND_PROCESSING_TUPLES, respectively. An embedded
adapter is configured to either drop or process tuples by calling the method
setSuspendBehavior on its instance. setSuspendBehavior takes an int argument, the value of which must be either
SUSPENDED_AND_DROPPING_TUPLES or SUSPENDED_AND_PROCESSING_TUPLES.
This section steps you through the creation of a sample Java adapter class,
InputExample.java. This example is kept deliberately simple, to allow you to focus on
the API without having to read other lines of code. The adapter is used in the
custom-adapter sample application to read input lines from a file. The files that
comprise this sample, including the adapter source code discussed here, are installed
with StreamBase in .
streambase-directory/sample/adapter/embedded/custom-adapter
Before beginning this exercise, please open StreamBase Studio and load
the custom-adapter sample, as described in Custom Embedded Adapter
Sample. This creates the sample_adapter_embedded_custom-adapter project in your Package
Explorer. In this project, double-click InputExample.java to open it in the Java Editor.
We started by adding these import statements in InputExample.java:
import java.util.*; import java.io.*; import com.streambase.sb.DataType; import com.streambase.sb.Schema; import com.streambase.sb.Schema.Field; import com.streambase.sb.Tuple; import com.streambase.sb.StreamBaseException; import com.streambase.sb.TupleException; import com.streambase.sb.adapter.InputAdapter; import com.streambase.sb.operator.Parameterizable; import com.streambase.sb.operator.TypecheckException; import com.streambase.sb.operator.ResourceNotFoundException;
Next, we declared the class by extending the StreamBase InputAdapter class and implementing Parameterizable (to mark that this class contains getter and
setter methods for this adapter's properties), and Runnable (because this adapter will be using a thread to read
the input file). For example:
/**
* This input adapter reads an input file one line at a time.
*/
public class InputExample
extends InputAdapter implements Parameterizable, Runnable
{
In the class, we declared variables to hold the values of the adapter's properties.
// Properties
private String _resourceName;
private int _period;
In the constructor, we set default values for optional parameters, and used the
setPortHints method to specify a fixed number of
input and output ports:
public InputExample()
{
// Set defaults
setPeriod(1000);
setPortHints(0, 1);
}
Note
If the adapter needs a variable number of output ports, use setOutputSchema in the typecheck method to set schemas for all of the output ports.
StreamBase Studio will update the number of ports visible on the
canvas at typecheck time.
Next, we created the typecheck method for the
class, which validates the period parameter and sets the output schema for the one
output port.
/**
* Typecheck this adapter.
*/
public void typecheck() throws TypecheckException
{
if (getPeriod() <= 0) {
throw new TypecheckException("Period must be greater than 0");
}
// Verify our resource file is available
try {
InputStream resourceStream = getResourceContents(getResourceName());
try {
resourceStream.close();
} catch (IOException e) {
// ignore
}
} catch (ResourceNotFoundException e) {
throw new TypecheckException("Input file not found", e);
} catch (StreamBaseException e) {
throw new TypecheckException("Could not open input file", e);
}
// Create an output schema for our port
List fields = new ArrayList();
fields.add(new Field("line", DataType.STRING, 128));
Schema schema = new Schema("schema", fields, true);
setOutputSchema(0, schema);
}
Note
Be sure to call the getResourceContents method
during typecheck, as described in The
Adapter Life Cycle.
Next, we created the init method. This registers
the class as a runnable, so StreamBase will start a managed thread
running this class's run method.
/**
* Initialize the adapter.
*/
public void init() throws StreamBaseException
{
super.init();
// Register the object so it will be run as a thread managed
// by StreamBase
registerRunnable(this, true);
}
Finally, we defined the main body of the adapter in its run method. Because init called
registerRunnable with this, StreamBase
will invoke this method in its own thread when the application starts.
This method takes care of a number of jobs. First, it creates a tuple to use for
output. For efficiency, this tuple will be reused each time. Next, it opens the
named resource for input and creates a BufferedReader
object to aid in reading the file. Then it enters the main loop, which is
surrounded by a call to shouldRun. This will keep
the adapter running and checking in with StreamBase as long as the
application is running, and will make the adapter capable of suspending and
resuming in sync with the application. Within this loop, it reads a line from the
input file, puts it in the tuple, outputs that tuple, and pauses for the configured
time.
/**
* Main thread of the adapter. This reads and outputs a line from
* the specified file at the specified rate.
*/
public void run()
{
Tuple tuple = getOutputSchema(0).createTuple();
InputStream resource;
BufferedReader reader;
System.err.println("Opening input resource " + getResourceName());
try {
resource = getResourceContents(getResourceName());
} catch (ResourceNotFoundException e) {
System.err.println("Input file not found");
return;
} catch (StreamBaseException e) {
System.err.println("Could not open input file");
return;
}
reader = new BufferedReader(new InputStreamReader(resource));
try {
while (shouldRun()) {
try {
// Clear the tuple so it can be reused
tuple.clear();
// Read the next line
String line;
try {
line = reader.readLine();
} catch (IOException e) {
System.err.println("Failed to read line from input");
return;
}
if (line == null)
break;
// Set the tuple
tuple.setString(0, line);
} catch (TupleException e) {
System.err.println("Exception creating tuple");
System.err.println(e);
continue;
}
try {
// Output the tuple
sendOutput(0, tuple);
} catch (StreamBaseException e) {
System.err.println("Exception sending output");
System.err.println(e);
return;
}
// Delay before the next tuple
try {
Thread.sleep(getPeriod());
} catch (InterruptedException e) {
}
}
} finally {
// Be sure to close the resource
try {
resource.close();
} catch (IOException e) {
// Ignore
}
}
}
To finish up the class, we defined getter and setter methods for the configurable parameters of the adapter.
//
// Properties
//
public String getResourceName()
{
return _resourceName;
}
public void setResourceName(String resourceName)
{
_resourceName = resourceName;
}
public int getPeriod()
{
return _period;
}
public void setPeriod(int period)
{
_period = period;
}
}
InputExampleBeanInfo.java is the BeanInfo class for
the InputExample adapter.
Note
StreamBase Systems strongly recommends using the New
StreamBase Embedded Adapter wizard to generate the base code for
your adapter, including the BeanInfo file. If you create your own files, you must
name your BeanInfo file with the basename of the primary adapter Java file. That
is, for an adapter defined in Foo.java, you must
name the BeanInfo file FooBeanInfo.java.
StreamBase Studio relies on this naming convention.
If you use the wizard, the Java file names already use the right convention.
InputExampleBeanInfo defines the metadata for the
properties that will appear in the StreamBase Studio Properties View
for this adapter. The InputExample adapter has two
properties, "Input File" and "Period".
Now, open InputExampleBeanInfo.java in the Java
Editor and examine it. First we imported these classes:
import java.beans.SimpleBeanInfo; import java.beans.PropertyDescriptor; import java.beans.IntrospectionException; import com.streambase.sb.operator.parameter.SBPropertyDescriptor; import com.streambase.sb.operator.parameter.ResourceFilePropertyDescriptor;
And defined the bean, as follows:
public class InputExampleBeanInfo extends SimpleBeanInfo
{
public PropertyDescriptor[] getPropertyDescriptors()
{
try {
SBPropertyDescriptor[] p = {
// This resource property will get the string name of
// the input resource. This is given a friendlier
// display name.
new ResourceFilePropertyDescriptor("Input File", InputExample.class,
"getResourceName",
"setResourceName"),
// The period value will be an integer property
// because getPeriod returns an int.
new SBPropertyDescriptor("Period", InputExample.class),
};
return p;
} catch(IntrospectionException e) {
System.err.println("Failed to create property descriptors");
return null;
}
}
}
This section steps you through the creation of a sample Java adapter class, OutputExample.java. This exercise uses the same sample application that you loaded in the previous exercise, and is similarly kept as simple as possible.
If you have not already loaded the custom-adapter sample, please follow the
instructions in the preceding section to do so. Then, in the sample_adapter_embedded_custom-adapter project in your Package
Explorer, open OutputExample.java in the Java Editor.
We start by adding these import statements in OutputExample.java:
import java.io.*; import com.streambase.sb.Tuple; import com.streambase.sb.StreamBaseException; import com.streambase.sb.adapter.OutputAdapter; import com.streambase.sb.operator.Parameterizable; import com.streambase.sb.operator.TypecheckException;
Next, we declared the class by extending the StreamBase OutputAdapter class and implementing Parameterizable (to mark that this class contains getter and
setter methods for this adapter's properties) For example:
/**
* This output adapter outputs all tuples sent to it to a file.
*/
public class OutputExample
extends OutputAdapter implements Parameterizable
{
In the class, we declared the variable to hold the value of the adapter's OutputFile property.
// Properties
private String _outputFile;
We also stored the opened file in the class so that we do not have to reopen it on
every call to processTuple.
// The opened output file
private FileOutputStream _output = null;
private DataOutputStream _writer;
In the constructor we used the setPortHints method
to specify a fixed number of input and output ports. Because this adapter does not
have any parameters with reasonable default values, we do not set any parameters
here.
public OutputExample()
{
setPortHints(1, 0);
}
Note
If the adapter needs a variable number of input ports, use requireInputPortCount in the typecheck method. StreamBase Studio will update
the number of ports visible on the canvas as typecheck time.
Next, we created the typecheck method for the
class, which ensures there is exactly one input port and checks that the output
file parameter has not been left blank.
/**
* Typecheck this adapter.
*/
public void typecheck() throws TypecheckException
{
requireInputPortCount(1);
if (getOutputFile().length() == 0) {
throw new TypecheckException("Required parameter OutputFile must be specified");
}
}
Next, we created the init method. This opens the
output file.
/**
* Initialize the adapter. This opens the output file for
* writing.
*/
public void init() throws StreamBaseException
{
super.init();
File file = new File(getOutputFile());
System.out.println("Opening output file: " + file.getAbsolutePath());
try {
_output = new FileOutputStream(file);
} catch (FileNotFoundException e) {
throw new StreamBaseException("File not found: " + file, e);
}
_writer = new DataOutputStream(_output);
}
Now we defined the main body of the adapter in its processTuple method. This method will be called by
StreamBase with every tuple that arrives at the input port of this
adapter. The adapter writes the string representation of the tuple to a line in the
output file.
/**
* Process an incoming tuple by writing it to the output file.
*/
public void processTuple(int port, Tuple tuple) throws StreamBaseException
{
try {
_writer.writeBytes(tuple.toString() + "\n");
} catch (IOException e) {
throw new StreamBaseException("Error writing", e);
}
}
Finally, we defined the shutdown method, which
cleans up the resources of the adapter by closing the output file if it has been
opened.
/**
* Shutdown the output adapter by closing the output file.
*/
public void shutdown()
{
try {
if (_output != null)
_output.close();
} catch (IOException e) {
// Ignore
}
}
To finish up the class, we defined a getter and setter method for the configurable parameter of the adapter. Unlike the input adapter example in the previous section, this adapter does not include a BeanInfo class, so StreamBase will derive the parameters of this adapter from names and types of the public methods of the class. Thus, defining these two methods will cause the adapter to have an "outputFile" parameter.
//
// Properties
//
public String getOutputFile()
{
return _outputFile;
}
public void setOutputFile(String outputFile)
{
_outputFile = outputFile;
}
}
The StreamBase Java Client API changed in Version 3.7. The older API was still supported in 3.7, but it is not supported in Version 5.0 and later. The Embedded Adapter wizard uses the newer API exclusively. If you open an application that contains an older embedded adapter, an error will be reported.
If you have any embedded adapter source created in 3.7 or earlier, note that the
com.streambase.sb.adapter.Adapter class is replaced by
two classes:
-
com.streambase.sb.adapter.InputAdapter -
com.streambase.sb.adapter.OutputAdapter
Therefore, you must change the class statements as follows:
| Change statement | To |
|---|---|
extends Adapter implements InputAdapter
|
extends InputAdapter
|
extends Adapter implements OutputAdapter
|
extends OutputAdapter
|
After making these changes, in StreamBase Version 5.0 and later, you no longer need to compile and import JARs to use your adapter in StreamBase Studio. However, you will need to recompile and redeploy any JARs that you deploy.
Also, in prior versions, all tuple setter methods that take objects (for example, Tuple.setString(...)) threw IllegalArgumentException if they were given a null object. As of this version, they will instead simply delegate to the API call setNull(...), without throwing any exception.
