Creating Custom C++ Functions

The StreamBase C++ API supports custom C++ functions that you can call directly in StreamBase expressions by using the callcpp function. There are two forms of the callcpp function, used in StreamBase simple and aggregate expressions.

The installed StreamBase kit provides the source files for two custom C++ function samples:

streambase-install-dir/sample/custom-aggregate-function

streambase-install-dir/sample/custom-simple-function

These custom functions were built by extending, respectively, the sb::PluginAggregate class and the sb::PluginFunction class. These classes are provided in the documented C++ API Documentation.

This topic describes:

Custom Aggregate Function Sample

Let's begin with a sample. The Custom Aggregate Sample consists of the following:

  • The source code for the plug-in aggregate function, NthValue.cpp

  • A sample configuration file, nth_value.sbconf, which tells the StreamBase Server to load the custom aggregate function.

  • A sample application, nth_value.sbapp, which uses the aggregate function.

Binaries are provided, as well as their source code. The sample code extends the StreamBase C++ API. For information about the API, see its C++ API Documentation.

On UNIX, a Makefile is provided in each custom C++ function sample's installation directory to (re)build the sample source code on a supported UNIX machine. On Windows, a Visual Studio solution file is provided to (re)build the sample source code on a supported Windows machine.

Note how the custom function is declared in the nth_value.sbconf file. For example:

<streambase-configuration>
    <global>
        <plugin file="./nth_value"/>
    </global>
    <server>
        <param name="tcp-port" value="10000"/>
    </server>
</streambase-configuration>

However, in StreamBase Studio, you can import the samples and the IDE will take care of sending the proper configuration settings to the server (see next section).

Running the Sample from StreamBase Studio

  1. Launch StreamBase Studio on Windows or Linux.

  2. From the top menu, click FileLoad StreamBase Sample, and select the custom-aggregate-function and custom-simple-function samples from the Extending StreamBase section of the list. StreamBase Studio creates two projects for you.

    Note

    On Windows, the custom functions nth_value.dll and log.dll are imported into the aggregate and simple projects, respectively; on Linux, the files nth_value.so and log.so are imported instead.

  3. In the SB Authoring perspective, open the nth_value.sbapp application for the Custom Aggregate Function sample.

  4. Click the Run button. This opens the SB Test/Debug perspective and starts the application.

  5. In the Application Output view, make sure All Streams is selected in the Output Stream drop-down list.

  6. In the Manual Input view, select input from the Input Stream list. For this example, uncheck the Log to Application Input option, and minimize or close the Application Input view.

  7. The Field Values field is initially filled with null. Enter 1 and click Send Data. No output is shown in the Application Output view.

  8. Now enter 10 and click Send Data. This time, in the Application Output view, notice the output from output_window_size_2:

  9. Enter 50 and click Send Data. Notice the output from output_window_size_3 as well as output_window_size_2. For example:

  10. Enter the number 100 and click Send Data.

To see what happens when the aggregate function throws an exception, try enqueuing a number to the input_that_will_cause_an_error stream.

Running the Sample from Terminal Windows and Using StreamBase Commands

To run this sample on a machine where StreamBase is installed:

  1. Open three terminal windows on UNIX, or three StreamBase Command Prompt windows on Windows. In each window, navigate to the custom-aggregate-function directory in your StreamBase installation area. For example:

    • On UNIX:

      cd /opt/streambase/sample/custom-aggregate-function
      
    • On Windows:

      cd "C:\Program Files\StreamBase Systems\StreamBase.n.m\sample\custom-aggregate-function"
      
  2. In the first window, launch the StreamBase Server for the sample application: sbd -f nth_value.sbconf nth_value.sbapp

  3. In the second window, run a dequeuer so that you can see the output that will be produced:

    sbc dequeue

  4. In the third window:

    1. Run an enqueuer:

      sbc enqueue input

    2. Type numbers into the enqueuer, one number per line.

  5. In the second (dequeuer) window, look for output from output_window_size_2.

  6. When you enter the third number in the enqueuer window, look for output in the dequeuer window from output_window_size_3 as well as output_window_size_2. For example, if you enter the numbers 1, 10, 50, and 100, then look for output like the following from the dequeuer:

    output_window_size_2,10
    output_window_size_2,50
    output_window_size_3,10
    output_window_size_2,100
    output_window_size_3,50

To see what happens when the aggregate function throws an exception, try enqueuing a number to the input_that_will_cause_an_error stream.

Custom Simple Function Sample

The Custom Simple Function Sample consists of the following:

  • The source code for the plug-in function, LogFunction.cpp

  • A sample configuration file, log.sbconf, which tells the StreamBase Server to load the custom simple function.

  • A sample application, log.sbapp, which uses the function.

Binaries are provided, as well as their source code. A Makefile is provided to rebuild the sample source code.

To run this sample from StreamBase Studio, use steps that are similar to the ones outlined in the previous section.

To run this sample on a supported machine where StreamBase is installed:

  1. Launch a StreamBase Server on the sample application:

    sbd -f log.sbconf log.sbapp

  2. Run a dequeuer so that you can see the output produced:

    sbc dequeue output

  3. Run an enqueuer:

    sbc enqueue input

  4. Type numbers into the enqueuer, one number per line. For example, if you enter the numbers 1, 10, 50, and 100, then you should see the following output from the dequeuer:

    output,0
    output,2.30258509299405
    output,3.91202300542815
    output,4.60517018598809

  5. To see what happens when the function throws an exception, try enqueuing a negative number.

Steps for Creating Custom C++ Functions

The following sections explain how to create custom C++ functions for your StreamBase applications, by extending the StreamBase C++ Custom Function API.

Notes:

  • A StreamBase custom C++ function is mapped to a class. See the sections below for details.

  • Multiple classes (multiple StreamBase custom C++ functions that you write) can be added to a single DLL on Windows, or to a single shared library on UNIX.

Creating Custom Simple Functions

As the simple function sample shows, the basic steps are to writing a simple function are first to include some headers:

#include "StreamBase.hpp"
#include "PluginFunction.hpp"

You need to define some name spaces:

using namespace std;
using namespace sb;

Then define the class name and have it inherit from PluginFunction. For example:

class MySimpleFunction : public PluginFunction

Two virtual methods are required in simple function classes. They are:

  • virtual void typecheck(const Schema &arg_types)

    Should validate the number and type of arguments the simple function uses. The typecheck method is the one used by StreamBase Studio when authoring and also by the StreamBase Server when the application is started.

    Note

    In Version 3.0, this replaced virtual DataType typecheck(const PluginTypeList &arg_types)

  • virtual void eval(Tuple &retval, ConstTuple &args)

    The eval method is what is called when the simple function is executed in a StreamBase application.

    Note

    In Version 3.0, this replaced virtual void eval(sb::Value &retval, const PluginValueList &args)

Within the simple function class, you must declare the class to StreamBase:

STREAMBASE_DECLARE_PLUGIN_FUNCTION(MySimpleFunction);

This declaration provides some infrastructure that aids in registering the simple function class for use by StreamBase.

Finally, outside of the simple function class definition, you must define the class and what it will be known as in the StreamBase application:

STREAMBASE_DEFINE_PLUGIN_FUNCTION 
(MySimpleFunction, "MyApplicationCalculation");

This definition provides a means to register your simple function and the mapping between the simple function class name and what you want to call this function in your StreamBase application.

Creating Custom Aggregate Functions

This section describes guidelines for writing custom C++ aggregate functions.

Creating aggregate functions is very similar to simple functions. Again, you need to include some headers:

#include "StreamBase.hpp"
#include "PluginAggregate.hpp"

And then define namespaces:

using namespace std;
using namespace sb;

Then define the class name and have it inherit from PluginAggregate (instead of PluginFunction):

class MyAggregateFunction : public PluginAggregate

There are four required methods in an aggregate function:

  • virtual void typecheck(const Schema &arg_types)

    Validates the argument types.

    New in 3.0, replaces: virtual DataType typecheck(const PluginTypeList &arg_types)

  • virtual void initialize()

    Clear any window state an aggregate function may keep.

  • virtual void increment(ConstTuple &args)

    Use to add values to the windows state.

    New in 3.0, replaces: virtual void increment(const PluginValueList &args)

  • virtual void calculate(Tuple &retval)

    Use to calculate the value of the aggregate over the values currently in the window.

    New in 3.0, replaces: virtual void calculate(Value &retval)

Within the aggregate function class, you must declare the class to StreamBase:

STREAMBASE_DECLARE_PLUGIN_AGGREGATE(MyAggregateFunction);

Finally, outside of the aggregate function class definition, you must "define" the class and what it will be known as in the StreamBase application:

STREAMBASE_DEFINE_PLUGIN_AGGREGATE
(MyAggregateFunction, "MyApplicationAggregate");

Typechecking Arguments Passed to Functions

When you write custom functions by extending the StreamBase C++ API classes, your function object may use internal fields to record the data types with which it was typechecked. Note that functions can be polymorphic (take different argument data types), and they can also take a variable number of arguments.

To specify variable number of arguments, call arg_types.size() in your typecheck method.

If your function is polymorphic, then it is probably necessary to record the type information provided at typecheck time. This recorded information can be used in the implementation of the eval() method, to vary behavior based on the types of the arguments. The object is guaranteed to have been typechecked before eval() is called, and to receive arguments to eval() that match the types passed to typecheck().

The following code example returns the sum of the integer or double arguments, as a double. If there are no arguments, it returns 0 using the setIntValue method. The eval method is used to verify the data type of inputs. It :

class IntDoubleSumFunction : public PluginFunction {
  private:
    unsigned int _arg_count;
  public:
     typecheck(const Schema &argSchema)
    {
        _arg_count = argSchema.getNumFields();
        for (unsigned int i = 0; i < _arg_count; ++i)
            requireType(argSchema, i, DataType::INT, DataType::DOUBLE);
        setReturnType(DataType::DOUBLE);
    }
    virtual void eval(Tuple &retval, ConstTuple &args)
    {
        if (_arg_count == 0)
            retval.setDouble(0, 0);
        else {
            double ret = 0.0;
            for (unsigned int i = 0; i < _arg_count; ++i) {
                DataType dt = args.getSchema().getField(i).getType();

                if(DataType::DOUBLE == dt) {
                    ret += args.getDouble(i);
                } else if(DataType::INT == dt) {
                    ret += args.getInt(i);
                }
            }
            retval.setDouble(0, ret);
        }
    }
    STREAMBASE_DECLARE_PLUGIN_FUNCTION(IntDoubleSumFunction);
};
STREAMBASE_DEFINE_PLUGIN_FUNCTION(IntDoubleSumFunction, "int_double_sum");

Controlling String Length

Strings passed between the runtime and C++ plug-ins are of fixed size. In your C++ custom function, you can use one of these two methods to control string length:

  • setReturnType(DataType retType)

    Allows the maximum string length to be specified at run time in the sbconf file's runtime cpp-string-field-size parameter. That value is used when the runtime cannot determine the maximum size of a string passed between the runtime and a C++ plug-in.

    Example

    In this example from the simple C++ function sample, a DOUBLE data type is specified, but the string length will be determined at run time:

    setReturnType(DataType::DOUBLE);
    
  • setReturnString(int size)

    Explicitly specifies the maximum string size (as an integer) at compile time. The runtime cpp-string-field-size value in the sbconf file is ignored.

    Example

    In the example below, if a string longer than 10K is returned, an exception is thrown.

    setReturnString(int::10000);
    

Building Custom C++ Functions on UNIX

On a supported UNIX machine where StreamBase is installed, use the sb-config utility to set up the environment and define the compiler to use when compiling your program. For example:

CXX=`sb-config --cxx`
$CXX MyClient.cpp `sb-config --cflags` -c -o MyFunction.o
$CXX MyClient.o `sb-config --libs` -o MyFunction

Substitute the name of your client for MyFunction.

Use the Makefiles in the following StreamBase samples as a guide to setting up your projects:

/opt/streambase/sample/custom-aggregate-function

/opt/streambase/sample/custom-simple-function

Also refer to the related section, Configuration Steps, in this topic.

Building Custom C++ Functions on Windows

To build your custom C++ function on Windows, you must configure Microsoft Visual Studio as described in Configuring Visual C++.

Also see the Configuration Steps for an important reminder.

Configuration Steps

  • Before running your application in StreamBase Studio:

    Register the plug-in function name in a StreamBase Server configuration file. If a configuration file does not already exist in your project, you can click NewServer Configuration to create one.

    Edit the custom-functions section in your configuration file, adding a <custom-function> element for each C++ custom function that you built. For example:

    <custom-functions>
        <custom-function name="func1" type="simple" >
          <args>
            <arg type="int" />   
            <arg type="string" />    
          </args>
          <return type="string" size="16" />
        </custom-function>
    </custom-functions>
    
  • When you deploy your application:

    Edit the StreamBase Server configuration file that will be used to run your application. In addition to declaring the function names in the custom-functions section, identify the location of your custom functions in the global section. For example:

    <streambase-configuration>
        <global>
            <plugin directory="${STREAMBASE_HOME}/plugin"
        </global>
        <server>
            <param name="tcp-port" value="10000"/>
        </server>
    </streambase-configuration>
    

    This example assumes that the environment variable STREAMBASE_HOME is defined to be the shared library that contains your plug-in files. The StreamBase Server automatically loads all the files in that directory.

    Finally, identify your customized configuration file when you start the server. For example:

    sbd -f MyAppsConfig.sbconf MyApp.sbapp

See the StreamBase Server Configuration XML topic for details.