Creating Custom Java Embedded Adapters

This topic explains how to modify a custom embedded adapter's Java file for use in StreamBase applications.

Note

StreamBase Studio ships with new projects configured by default to build with Java 6. If you have a requirement to use Java 5, configure the Studio project that contains your custom code using the steps in Using Java 5 for Custom Extensions.

Introduction

This topic assumes that you used the StreamBase Java Adapter wizard to generate the initial starter code for your adapter, following the development process recommended in Developing Embedded Adapters.

To understand how embedded adapters are used, we also recommend that you run the Custom Embedded Adapter sample and see the sample's documentation. To load this sample into StreamBase Studio, use FileLoad StreamBase Sample and select custom-adapter from the [Extending StreamBase] category.

The following sections describe the methods available in the Java Adapter and Operator APIs. For additional details, refer to the Javadoc for the InputAdapter, OutputAdapter, and Operator classes.

Methods for Studio Presentation

Use the following methods of the Operator class to manage how an instance of your adapter behaves on the EventFlow Editor canvas:

  • Call Operator.setPortHints(int, int) in your adapter's constructor to set the initial number of ports when your adapter is first placed on the canvas.

  • Call Operator.setDisplayName(String) in your constructor to set a user-friendly name for your adapter. This name is used in the Adapter Name field in the Properties view, and as the details text in the Palette view's Details mode.

  • Call Operator.setShortDisplayName(String) in your constructor to set a short name for your adapter. This name is used as the operator's name in the Palette view and shows above the operator on the EventFlow canvas.

  • Call Operator.setShortDisplayName(String) in your constructor to set a short name for your adapter. This name is used as the adapter's name in the Insert Input/Output Adapter dialogs and shows above the adapter on the EventFlow canvas.

  • Override Operator.getPortCounts() if you change the number of ports based on end-user setting properties.

  • Override Operator.getIconResource(Operator.IconKind) if you want your adapter to appear on the canvas with a custom icon.

  • During Operator.typecheck(), throw Operator.PropertyTypecheckException instead of TypecheckException to allow Studio to decorate your adapter icon with an overlay to indicate its warning or error state. See the PropertyTypecheckException class and the Operator.getLocation() method for details.

  • See the UIHints class and Parameterizable interface for information on additional control over the display of properties in Studio's Properties view.

For the latest version of this list, see the Special Studio Considerations section of the Javadoc for the Operator class.

Embedded Adapter Properties

Embedded adapters extend the following packages in the StreamBase Java Client library:

  • com.streambase.sb.adapter.InputAdapter

  • com.streambase.sb.adapter.OutputAdapter

Adapters inherit some behavior from operators, and both InputAdapter and OutputAdapter are subclasses of com.streambase.sb.operator.Operator.

Like any other StreamBase operator, adapters can have properties that you can customize in StreamBase Studio. You must modify the initial properties generated by the StreamBase Embedded Adapter Wizard.

Each adapter must provide an object that implements the com.streambase.sb.operator.Parameterizable interface. The adapter class may either implement this interface itself or delegate to an accompanying class that implements this interface. The class that implements Parameterizable must provide the getter and setter methods that reflect the parameters that appear in the StreamBase Studio Properties View for that adapter. StreamBase finds the Parameterizable object associated with an adapter by calling the getParameters method on an instance of an InputAdapter or an OutputAdapter.

The default implementation of getParameters returns the last Parameterizable set by a call to setParameters. If setParameters has never been called, but the adapter class itself implements Parameterizable, then getParameters simply returns the adapter object. An adapter class can also override getParameters to implement any other desired behavior.

By default, StreamBase uses reflection (via JavaBeans) to derive the available parameters and their types from the getter and setter methods provided by the Parameterizable object. For example, here is a code fragment of an OutputAdapter class with some properties:

public class Example extends OutputAdapter implements Parameterizable {
    private int         startCount = 1;
    private boolean     clever = true;
    private String []   stockSymbols;
    private String      name;

    public void setStartCount(int i) {startCount = i;}
    public int getStartCount() {return startCount;}
    public void setClever(boolean b) {clever = b;}
    public boolean isClever() {return clever;}
    public void setStockSymbols(String [] ss) {stockSymbols = ss;}
    public String [] getStockSymbols() {return stockSymbols;}
    public void setName(String s) {name=s;}
    public String getName() {return name;}
    public void typecheck() throws TypecheckException {}
    public void processTuple(int inputPort, Tuple t) throws StreamBaseException {}
}

The example OutputAdapter class above declares four properties:

  • startCount

  • clever

  • stockSymbols

  • name

These properties will appear in StreamBase Studio's Properties view. Notice that the getter method for a boolean property can be named either getPropertyName, isPropertyName, or hasPropertyName.

Property Types

StreamBase currently supports the property types data types shown in the table in Adding Adapter Properties. Most are simple property types that StreamBase derives automatically by looking at the signatures of the getter and setter methods in the Parameterizable class. These can be specified in the BeanInfo class with SBPropertyDescriptor objects, as in the example above.

Enum properties are string properties that can only take on a specified set of values. ResourceFile properties are similar to Enum properties, but their value must be the name of a resource. In StreamBase Studio, this is enforced by displaying a drop-down list of files available from your project's folder.

Because both Enum and ResourceFile properties are implemented as Strings, their getter and setter methods must return and expect objects of type String, respectively. Thus, these property types cannot be automatically derived by StreamBase. To use these properties, an adapter must have an accompanying BeanInfo class that returns the SBPropertyDescriptor subclasses EnumPropertyDescriptor or ResourceFilePropertyDescriptor in its list of PropertyDescriptors.

The BeanInfo class is defined in the java.beans.BeanInfo API documentation. It can be used by the Parameterizable object to:

  • Control what properties are exposed.

  • Add additional metadata about properties, such as which properties are optional.

  • Access special types of properties that cannot be automatically derived via reflection.

If a BeanInfo class is present, only the properties explicitly declared in this class are exposed by StreamBase.

The easiest way to make a BeanInfo class is to extend java.beans.SimpleBeanInfo in a class whose name consists of the name of the Parameterizable class, with BeanInfo appended, residing in the same package as the Parameterizable class. For example, if the Parameterizable class is called MyAdapter, the BeanInfo class should be called MyAdapterBeanInfo.

The following example BeanInfo class makes the Clever property optional:

public class ExampleBeanInfo extends SimpleBeanInfo {
  public PropertyDescriptor [] getPropertyDescriptors() {
    try {
        SBPropertyDescriptor [] p = {
            new SBPropertyDescriptor("StartCount", Example.class),
            new SBPropertyDescriptor("Clever", Example.class).optional(),
            new SBPropertyDescriptor("Name", Example.class)
        };

        return p;
    }
    catch(IntrospectionException e) {
        System.out.println("Exception: " + e);
        e.printStackTrace();
    }
    return null;
  }
}

Note that the optional() method shown in this example simply marks the descriptor as optional.

Note

To enable a hidden password, use the mask property of SBPropertyDescriptor. If the property is a String type and is set, the adapter's Properties view displays asterisks instead of the characters typed.

The Adapter Life Cycle

The adapter has the following life cycle when running within the StreamBase Server (sbd) process:

  1. Constructor

    All adapters must have a public default constructor. The constructor is called when the InputAdapter or OutputAdapter instance is created, but before the adapter is connected to the StreamBase application. We recommended that you set the initial input port and output port count in the constructor by calling setPortHints(inPortCount, outPortCount). The default is no input ports or output ports. The constructor may also set default values for adapter parameters. These values are displayed in StreamBase Studio when a new adapter is dragged to the canvas, and serve as the default values for omitted optional parameters.

  2. Parameters set

    StreamBase will call the adapter's setter methods to configure it according to the application.

  3. typecheck

    The typecheck method is called after the adapter instance is connected in the StreamBase application. The adapter should validate its parameters and throw TypecheckException if any problems are found. The message associated with the thrown TypecheckException is displayed in Studio during authoring, or printed on the console by sbd. Input adapters should set the schema of each output port by calling the setOutputSchema(portNum, schema) method for each output port. If the adapter needs to change the number of input ports based on parameter values, it should call requireInputPortCount(portCount)at this point.

    Call the getResourceContents method during typecheck, instead of waiting until start or run to call it. This is to ensure that StreamBase Studio can indicate to the user whether it was able to find the resource during authoring, and avoid waiting until sbd fails silently.

  4. init

    If typecheck succeeds, the init method is called before the StreamBase application is started. Note that your adapter is not required to define the init method, unless you need to register a runnable or perform initialization of a resource such as, for example, a JDBC pool.

  5. run

    At this point, the application begins to run. StreamBase starts threads for any managed runnables registered by earlier calls to registerRunnable. The behavior of the adapter at this point depends on whether it is an InputAdapter or an OutputAdapter.

    • processTuple (output adapters)

      The processTuple method is called when a tuple is received on an input port.

    • Call sendOutput (input adapters)

      Input adapters can call sendOutput(port, tuple) at any time to output a tuple to the specified port.

  6. shutdown

    The shutdown method is called when the StreamBase Server is in the process of shutting down.

Using Threads

Input adapters often need to perform operations asynchronously with the StreamBase application they are running in. For example, an input adapter may need to listen for information on a socket, or read input from a file, without interrupting the rest of the application. Though output adapters are called upon by StreamBase whenever a tuple arrives on an input port, there may be other asynchronous tasks an output adapter may wish to perform, such as periodically flushing a buffer.

To facilitate such requirements, the adapter API provides a mechanism for creating threads that are managed along with the rest of the StreamBase application. During the init method, an adapter may call registerRunnable(runnable) to register Runnable objects that will be managed in separate threads by StreamBase. When the application starts running, StreamBase will start all registered Runnables in new threads.

StreamBase-managed threads have the following general structure:

...
  public void run() {
    // Perform thread startup tasks
    while (shouldRun()) {
      // Perform tasks such as calling sendOutput
    }
    // Perform thread shutdown tasks
  }
...

The call to shouldRun ties the thread into StreamBase's thread management in the following ways:

  • Startup - The adapter is not considered started until all threads have made their first call to shouldRun.

  • Shutdown - When the application is shutting down, the adapter's shutdown method will be called first. Once this has returned, shouldRun will return false, and all threads should exit. Once all threads have exited, the adapter is considered shutdown.

  • Suspend - If the adapter is being suspended, the adapter's suspend method will be called first. Once this has returned, the shouldRun call will block until the adapter is resumed. Once all threads are blocked in shouldRun, the adapter's suspended method will be called. Finally, the adapter will be considered successfully suspended.

  • Resume - When the adapter is resumed following suspension, the adapter's resume method will be called. Once this has returned, shouldRun will return in all threads (returning true if this is an ordinary resume, or false if a suspended adapter is being shutdown). After all threads have successfully unblocked, the adapter's resumed method will be called.

Embedded Adapters and Life Cycle Events

A Java operator runs in the same process as the StreamBase application that contains it. The same is true of embedded adapters, which are a specific type of Java operator. One advantage of this is that when the StreamBase application starts or stops, any Java operators or embedded adapters in it start and stop along with it. More generally, the application and these components undergo life cycle changes in a synchronized fashion.

With respect to life cycle changes, there may be dependencies among embedded adapters. For example, an input adapter may input data into a StreamBase application. Suppose this data eventually reaches an embedded output adapter. In this case, it would be appropriate for the output adapter to start before the input adapter. That is, the input adapter shouldn't feed data to an output adapter that may have not yet started.

Managing Embedded Adapters

Embedded adapters start, pause, resume and shutdown along with the StreamBase application that contains them. It is also possible to suspend and resume embedded adapters independently of their StreamBase application.

This is accomplished with the same commands that are used to suspend and resume a StreamBase application as a whole. The command sbadmin suspend can be used to suspend an application; similarly, sbadmin resume is used to resume a StreamBase application.

To suspend or resume individual adapters, you can append a list of embedded adapters to the sbadmin suspend or sbadmin resume command. If one or more strings are appended to sbadmin suspend, then the StreamBase application as a whole will not be suspended, rather the individual embedded adapters named by the appended strings will be suspended.

Note that an individual embedded adapter can be suspended or resumed only if the StreamBase application itself is running. An individual embedded adapter can be running only if the application that contains it is also running. Therefore, it is not meaningful to suspend or resume an adapter that is not currently running if the application that contains it is not running.

Getting the Status of Embedded Adapters

The sbc status command returns status information about the StreamBase Server.

The sbc status --operators command returns the status of any embedded adapters contained by the server's application. Note that sbc status --operators and sbc status are disjoint commands: sbc status returns information about the server only, not about embedded adapters contained in the server. Similarly, sbc status --operators returns information about contained embedded adapters only, and not any information about the server itself.

In this context, the status of an embedded adapter consists only of its current state. For example, if an embedded adapter has been started and is currently running, its state will be STARTED. If an embedded adapter has yet to be started, its state will be NONE. If an embedded adapter has been suspended, its state will be either SUSPENDED_AND_DROPPING_TUPLES or SUSPENDED_AND_PROCESSING_TUPLES. Lastly, if an adapter has been shut down its state will be SHUTDOWN.

Starting Embedded Adapter Independently of its StreamBase Application

By default, an embedded adapter starts along with the StreamBase application that contains it. In Studio, in the General tab for embedded adapters, there is a check box labeled Start with application. By default, this box is selected, meaning that the embedded adapter will start with the application. Clearing this check box causes the adapter to be left in the NONE state when the application starts.

A Java operator that does not start with its application will stay in the NONE state until it is explicitly started with the sbadmin resume adapterName command. Such an adapter will not start even if the application as a whole is resumed. So for example, the application as a whole may suspend and then resume; this will have no effect on an adapter that has not started with its application.

Processing of Tuples During the Suspension of an Adapter

If an embedded adapter is suspended separately from the application that contains it, tuples might still arrive at the suspended adapter. You can configure the embedded adapter to handle these tuples in two different ways:

  • A suspended adapter can choose to drop tuples that are delivered to it.

  • The adapter can choose to process these tuples.

These two possibilities are represented by static Strings on the class com.streambase.sb.operator.Operator, SUSPENDED_AND_DROPPING_TUPLES and SUSPENDED_AND_PROCESSING_TUPLES, respectively. An embedded adapter is configured to either drop or process tuples by calling the method setSuspendBehavior on its instance. setSuspendBehavior takes an int argument, the value of which must be either SUSPENDED_AND_DROPPING_TUPLES or SUSPENDED_AND_PROCESSING_TUPLES.