The StreamBase C++ API supports custom C++ functions that you can call
directly in StreamBase expressions by using the callcpp function. There are two forms of the
callcpp function, used in StreamBase simple
and aggregate expressions.
The installed StreamBase kit provides the source files for two custom C++ function samples:
streambase-install-dir/sample/custom-aggregate-function
streambase-install-dir/sample/custom-simple-function
These custom functions were built by extending, respectively, the sb::PluginAggregate class and the sb::PluginFunction class. These classes are provided in the
documented C++ API Documentation.
This topic describes:
Let's begin with a sample. The Custom Aggregate Sample consists of the following:
-
The source code for the plug-in aggregate function,
NthValue.cpp -
A sample configuration file, nth_value.sbconf, which tells the StreamBase Server to load the custom aggregate function.
-
A sample application, nth_value.sbapp, which uses the aggregate function.
Binaries are provided, as well as their source code. The sample code extends the StreamBase C++ API. For information about the API, see its C++ API Documentation.
On UNIX, a Makefile is provided in each custom C++ function sample's installation directory to (re)build the sample source code on a supported UNIX machine. On Windows, a Visual Studio solution file is provided to (re)build the sample source code on a supported Windows machine.
Note how the custom function is declared in the nth_value.sbconf file. For example:
<streambase-configuration>
<global>
<plugin file="./nth_value"/>
</global>
<server>
<param name="tcp-port" value="10000"/>
</server>
</streambase-configuration>
However, in StreamBase Studio, you can import the samples and the IDE will take care of sending the proper configuration settings to the server (see next section).
-
Launch StreamBase Studio on Windows or Linux.
-
From the top menu, click → , and select the
custom-aggregate-functionandcustom-simple-functionsamples from theExtending StreamBasesection of the list. StreamBase Studio creates two projects for you.Note
On Windows, the custom functions
nth_value.dllandlog.dllare imported into the aggregate and simple projects, respectively; on Linux, the filesnth_value.soandlog.soare imported instead. -
In the SB Authoring perspective, open the
nth_value.sbappapplication for the Custom Aggregate Function sample. -
Click the
Run button. This opens the
SB Test/Debug perspective and starts the application.
-
In the Application Output view, make sure All Streams is selected in the Output Stream drop-down list.
-
In the Manual Input view, select from the Input Stream list. For this example, uncheck the
Log to Application Inputoption, and minimize or close the Application Input view.
-
The Field Values field is initially filled with
null. Enter1and click . No output is shown in the Application Output view. -
Now enter
10and click . This time, in the Application Output view, notice the output fromoutput_window_size_2:
-
Enter
50and click . Notice the output fromoutput_window_size_3as well asoutput_window_size_2. For example:
-
Enter the number
100and click .
To see what happens when the aggregate function throws an exception, try enqueuing
a number to the input_that_will_cause_an_error stream.
To run this sample on a machine where StreamBase is installed:
-
Open three terminal windows on UNIX, or three StreamBase Command Prompt windows on Windows. In each window, navigate to the custom-aggregate-function directory in your StreamBase installation area. For example:
-
On UNIX:
cd /opt/streambase/sample/custom-aggregate-function
-
On Windows:
cd "C:\Program Files\StreamBase Systems\StreamBase
.n.m\sample\custom-aggregate-function"
-
-
In the first window, launch the StreamBase Server for the sample application:
sbd -f nth_value.sbconf nth_value.sbapp -
In the second window, run a dequeuer so that you can see the output that will be produced:
sbc dequeue -
In the third window:
-
Run an enqueuer:
sbc enqueue input -
Type numbers into the enqueuer, one number per line.
-
-
In the second (dequeuer) window, look for output from
output_window_size_2. -
When you enter the third number in the enqueuer window, look for output in the dequeuer window from
output_window_size_3as well asoutput_window_size_2. For example, if you enter the numbers 1, 10, 50, and 100, then look for output like the following from the dequeuer:output_window_size_2,10
output_window_size_2,50
output_window_size_3,10
output_window_size_2,100
output_window_size_3,50
To see what happens when the aggregate function throws an exception, try enqueuing
a number to the input_that_will_cause_an_error
stream.
The Custom Simple Function Sample consists of the following:
-
The source code for the plug-in function,
LogFunction.cpp -
A sample configuration file,
log.sbconf, which tells the StreamBase Server to load the custom simple function. -
A sample application,
log.sbapp, which uses the function.
Binaries are provided, as well as their source code. A Makefile is provided to rebuild the sample source code.
To run this sample from StreamBase Studio, use steps that are similar to the ones outlined in the previous section.
To run this sample on a supported machine where StreamBase is installed:
-
Launch a StreamBase Server on the sample application:
sbd -f log.sbconf log.sbapp -
Run a dequeuer so that you can see the output produced:
sbc dequeue output -
Run an enqueuer:
sbc enqueue input -
Type numbers into the enqueuer, one number per line. For example, if you enter the numbers 1, 10, 50, and 100, then you should see the following output from the dequeuer:
output,0
output,2.30258509299405
output,3.91202300542815
output,4.60517018598809 -
To see what happens when the function throws an exception, try enqueuing a negative number.
The following sections explain how to create custom C++ functions for your StreamBase applications, by extending the StreamBase C++ Custom Function API.
Notes:
-
A StreamBase custom C++ function is mapped to a class. See the sections below for details.
-
Multiple classes (multiple StreamBase custom C++ functions that you write) can be added to a single DLL on Windows, or to a single shared library on UNIX.
As the simple function sample shows, the basic steps are to writing a simple function are first to include some headers:
#include "StreamBase.hpp" #include "PluginFunction.hpp"
You need to define some name spaces:
using namespace std; using namespace sb;
Then define the class name and have it inherit from PluginFunction. For example:
class MySimpleFunction : public PluginFunction
Two virtual methods are required in simple function classes. They are:
-
virtual void typecheck(const Schema &arg_types)Should validate the number and type of arguments the simple function uses. The
typecheckmethod is the one used by StreamBase Studio when authoring and also by the StreamBase Server when the application is started.Note
In Version 3.0, this replaced
virtual DataType typecheck(const PluginTypeList &arg_types) -
virtual void eval(Tuple &retval, ConstTuple &args)The
evalmethod is what is called when the simple function is executed in a StreamBase application.Note
In Version 3.0, this replaced
virtual void eval(sb::Value &retval, const PluginValueList &args)
Within the simple function class, you must declare the class to StreamBase:
STREAMBASE_DECLARE_PLUGIN_FUNCTION(MySimpleFunction);
This declaration provides some infrastructure that aids in registering the simple function class for use by StreamBase.
Finally, outside of the simple function class definition, you must define the class and what it will be known as in the StreamBase application:
STREAMBASE_DEFINE_PLUGIN_FUNCTION
(MySimpleFunction, "MyApplicationCalculation");
This definition provides a means to register your simple function and the mapping between the simple function class name and what you want to call this function in your StreamBase application.
This section describes guidelines for writing custom C++ aggregate functions.
Creating aggregate functions is very similar to simple functions. Again, you need to include some headers:
#include "StreamBase.hpp"
#include "PluginAggregate.hpp"
And then define namespaces:
using namespace std;
using namespace sb;
Then define the class name and have it inherit from PluginAggregate (instead of PluginFunction):
class MyAggregateFunction : public PluginAggregate
There are four required methods in an aggregate function:
-
virtual void typecheck(const Schema &arg_types)Validates the argument types.
New in 3.0, replaces:
virtual DataType typecheck(const PluginTypeList &arg_types) -
virtual void initialize()Clear any window state an aggregate function may keep.
-
virtual void increment(ConstTuple &args)Use to add values to the windows state.
New in 3.0, replaces:
virtual void increment(const PluginValueList &args) -
virtual void calculate(Tuple &retval)Use to calculate the value of the aggregate over the values currently in the window.
New in 3.0, replaces:
virtual void calculate(Value &retval)
Within the aggregate function class, you must declare the class to StreamBase:
STREAMBASE_DECLARE_PLUGIN_AGGREGATE(MyAggregateFunction);
Finally, outside of the aggregate function class definition, you must "define" the class and what it will be known as in the StreamBase application:
STREAMBASE_DEFINE_PLUGIN_AGGREGATE
(MyAggregateFunction, "MyApplicationAggregate");
When you write custom functions by extending the StreamBase C++ API classes, your function object may use internal fields to record the data types with which it was typechecked. Note that functions can be polymorphic (take different argument data types), and they can also take a variable number of arguments.
To specify variable number of arguments, call arg_types.size() in your typecheck method.
If your function is polymorphic, then it is probably necessary to record the type
information provided at typecheck time. This recorded information can be used in
the implementation of the eval() method, to vary
behavior based on the types of the arguments. The object is guaranteed to have been
typechecked before eval() is called, and to receive
arguments to eval() that match the types passed to
typecheck().
The following code example returns the sum of the integer or double arguments, as a double. If there are no arguments, it returns 0 using the setIntValue method. The eval method is used to verify the data type of inputs. It :
class IntDoubleSumFunction : public PluginFunction {
private:
unsigned int _arg_count;
public:
typecheck(const Schema &argSchema)
{
_arg_count = argSchema.getNumFields();
for (unsigned int i = 0; i < _arg_count; ++i)
requireType(argSchema, i, DataType::INT, DataType::DOUBLE);
setReturnType(DataType::DOUBLE);
}
virtual void eval(Tuple &retval, ConstTuple &args)
{
if (_arg_count == 0)
retval.setDouble(0, 0);
else {
double ret = 0.0;
for (unsigned int i = 0; i < _arg_count; ++i) {
DataType dt = args.getSchema().getField(i).getType();
if(DataType::DOUBLE == dt) {
ret += args.getDouble(i);
} else if(DataType::INT == dt) {
ret += args.getInt(i);
}
}
retval.setDouble(0, ret);
}
}
STREAMBASE_DECLARE_PLUGIN_FUNCTION(IntDoubleSumFunction);
};
STREAMBASE_DEFINE_PLUGIN_FUNCTION(IntDoubleSumFunction, "int_double_sum");
Strings passed between the runtime and C++ plug-ins are of fixed size. In your C++ custom function, you can use one of these two methods to control string length:
-
setReturnType(DataType retType)Allows the maximum string length to be specified at run time in the sbconf file's runtime
cpp-string-field-sizeparameter. That value is used when the runtime cannot determine the maximum size of a string passed between the runtime and a C++ plug-in.Example
In this example from the simple C++ function sample, a DOUBLE data type is specified, but the string length will be determined at run time:
setReturnType(DataType::DOUBLE);
-
setReturnString(int size)Explicitly specifies the maximum string size (as an integer) at compile time. The runtime
cpp-string-field-sizevalue in the sbconf file is ignored.Example
In the example below, if a string longer than 10K is returned, an exception is thrown.
setReturnString(int::10000);
On a supported UNIX machine
where StreamBase is installed, use the sb-config utility to set up the environment and define the
compiler to use when compiling your program. For example:
CXX=`sb-config --cxx` $CXX MyClient.cpp `sb-config --cflags` -c -oMyFunction.o $CXX MyClient.o `sb-config --libs` -oMyFunction
Substitute the name of your client for MyFunction.
Use the Makefiles in the following StreamBase samples as a guide to setting up your projects:
/opt/streambase/sample/custom-aggregate-function
/opt/streambase/sample/custom-simple-function
Also refer to the related section, Configuration Steps, in this topic.
To build your custom C++ function on Windows, you must configure Microsoft Visual Studio as described in Configuring Visual C++.
Also see the Configuration Steps for an important reminder.
-
Before running your application in StreamBase Studio:
Register the plug-in function name in a StreamBase Server configuration file. If a configuration file does not already exist in your project, you can click → to create one.
Edit the custom-functions section in your configuration file, adding a <custom-function> element for each C++ custom function that you built. For example:
<custom-functions> <custom-function name="func1" type="simple" > <args> <arg type="int" /> <arg type="string" /> </args> <return type="string" size="16" /> </custom-function> </custom-functions> -
When you deploy your application:
Edit the StreamBase Server configuration file that will be used to run your application. In addition to declaring the function names in the custom-functions section, identify the location of your custom functions in the global section. For example:
<streambase-configuration> <global> <plugin directory="${STREAMBASE_HOME}/plugin" </global> <server> <param name="tcp-port" value="10000"/> </server> </streambase-configuration>This example assumes that the environment variable
STREAMBASE_HOMEis defined to be the shared library that contains your plug-in files. The StreamBase Server automatically loads all the files in that directory.Finally, identify your customized configuration file when you start the server. For example:
sbd -f MyAppsConfig.sbconf MyApp.sbapp
See the StreamBase Server Configuration XML topic for details.
