StreamBase Documentation


APPLY Statement

Syntax

Modules

APPLY [PARALLEL [number_of_instances BY field_identifier]] MODULE {"file_name.ssql" | "file_name.sbapp"}
    ['('parameter_list')']
  FROM input_stream_identifier_1=stream_identifier_input_1[, ...]
  INTO output_stream_identifier_1=stream_identifier_output_1[, ...];

Java Operator

APPLY [PARALLEL] JAVA "java_class_name" ['('parameter_list')']
  FROM input_stream_identifier_1[, ...]
  INTO output_stream_identifier_1[, ...];

Java Embedded Adapter

APPLY [PARALLEL] JAVA "java_class_name" '('parameter_list')'
  FROM stream_identifier | INTO stream_identifier;

JDBC Table

APPLY [PARALLEL] JDBC jdbc_datasource_name "sql_statement"
  FROM input_stream_identifier
  INTO output_stream_identifier;

Substitutable Fields

number_of_instances

The number of parallel instances of the module to instantiate.

field_identifier

An integer type tuple field in an input stream to the module that is used to distribute incoming tuples across multiple instances of the module. Each distinct value of field_identifier is guaranteed to be delivered to the same instance of the module.

file_name.ssql:
file_name.sbapp:

The StreamSQL or StreamSQL Application Diagram file that includes the description of the application to be used as a module. This file name must be enclosed within double quotation marks.

java_class_name:

The full package and class name of the Java class that implements either the operator or embedded adapter functionality. The classname must be enclosed within double quotation marks.

parameter_list:

Entries of the format parameter_name=value. Multiple entries in the list are separated by commas.

parameter_name:

The name of a parameter used by a parameterized module, Java operator or an embedded Java adapter.

value:

The value of the corresponding parameter being passed to a parameterized module, Java operator or a Java embedded adapter. When passing parameters to a parameterized module, value must always be enclosed in quotation marks ("value") regardless of its type; if value is a string type, then it must also be enclosed within escaped quotation marks ("\"value\""). When passing parameters to a Java operator or Java embedded adapter, only string parameter types must be enclosed within unescaped quotation marks (for example, "value" where value is a string type).

jdbc_datasource_name:

The value of the name attribute in a <data-source> entry within the sbd.sbconf configuration file used by this StreamSQL application. It is through this link that the database connection details are obtained by the application. The sbd.sbconf file is described in the StreamBase Server Configuration XML topic.

stream_identifier:

The unique identifier (name) of a named stream within the application. When used within the INTO clause, the named stream receives tuples from an embedded adapter; when used within the FROM clause, the named stream provides tuples to an embedded adapter.

input_stream_identifier:
input_stream_identifier_n:

The unique identifier (name) of an input stream in a module or custom Java operator. There may be multiple input streams in a module or custom Java operator.

output_stream_identifier:
output_stream_identifier_n:

The unique identifier (name) of an output stream in a module or custom Java operator. There may be multiple output streams in a module or custom Java operator.

stream_identifier_input_n:

The unique identifier (name) of a stream in the main application that submits tuples to an input stream of a module or Java operator.

stream_identifier_output_n:

The unique identifier (name) of a stream in the main application that receives tuples emitted by an output stream in a module or Java operator.

... in the FROM clause of the APPLY MODULE statement:

Additional module input stream to application input stream mappings of the form: input_stream_identifier_n=stream_identifier_input_n.

... in the INTO clause of the APPLY MODULE statement:

Additional module output stream to application output stream mappings of the form: output_stream_identifier_n=stream_identifier_output_n.

... in the FROM clause of the APPLY JAVA operator statement:

Additional input streams to the custom Java operator of the form: input_stream_identifier_n.

... in the INTO clause of the APPLY JAVA operator statement:

Additional output stream from the custom Java operator of the form: output_stream_identifier_n.

Discussion

The APPLY statement can be used to include an existing StreamSQL EventFlow or StreamSQL module, a Java operator, or an embedded adapter into a StreamSQL application. It can also be used to query tables in an external JDBC database. The optional keyword PARALLEL will cause the statement to run in a thread separate from the thread running the main application.

Modules

To use a StreamSQL application as a module within a StreamSQL EventFlow application, simply drag the icon representing the .ssql file onto the canvas for the .sbapp file. StreamBase Studio will add a Module Reference to the canvas. Then connect the module's input and output ports to other application operators.

To use a StreamSQL or EventFlow application as a module within another StreamSQL application, use the APPLY statement. Since the module application may have multiple input and output streams, a mapping must be specified between the streams in the StreamSQL application correspond and each of the module's streams.

The name of the file that identifies the module must be within quotation marks. The entries following the FROM keyword map each of the module's input streams to streams in the referencing StreamSQL application. The entries following the INTO keyword map each of the module's output streams to streams in the referencing StreamSQL application. If the module only has one input or output stream, simply list the name of the corresponding stream in the referencing StreamSQL application; that is:

     stream_identifier_input_n

Rather than:

     module_input_stream_identifier_n=stream_identifier_input_n

The optional number_of_instances BY field_identifier clause may only be used if the optional PARALLEL keyword is specified. Use of this clause allows multiple instances of the module to be instantiated. The type of field_identifier must be an integer or be converted to an integer. If the type of field_identifier is a string, use the hash(field_identifier) function to generate an integer from the value of the string. Tuples with the same field_identifier value are guaranteed to be processed by the same module instance. Use of the PARALLEL keyword without the optional clause creates only a single instance of the module, which processes all incoming tuples.

The optional parameter_list clause, used to pass parameter values to the module, consists of one, or more parameter_name=value entries, separated by commas. When passing parameters to a parameterized module, value must always be enclosed in quotation marks ("value") regardless of its type; if value is a string type, then it must also be enclosed within escaped quotation marks ("\"value\""). Modules that utilize parameters will have default values for the parameters set at design time. The parameter_list entries are only required for parameters that need to be reset to a different value at run time.

Java Operators

To use a Java operator within a StreamSQL application, provide, within quotation marks, the full package and class name of the implementation class, and enclose within parentheses the parameters used by this class; these are the same parameters that are set on the Parameters tab of the EventFlow Java operator. The source of tuples submitted to the Java operator is the stream referenced within the FROM clause, and the tuples emitted by the operator are written to the stream referenced in the INTO clause.

JDBC Tables

The APPLY JDBC statement allows you to run, from within your StreamSQL application, SQL queries against tables in an external relational database.

The first argument of an APPLY JDBC statement references the name of a JDBC datasource. As soon as the statement is entered, StreamBase Studio connects to the datasource. If the connection fails (for example, an "unknown data-source" message is displayed), the StreamSQL Editor cannot typecheck the application. Therefore, a datasource must be configured within the project's sbd.sbconf configuration file before the APPLY JDBC statement can be coded.

The following sbd.sbconf entry configures a database connection to a Derby database:

    <data-sources>
        <data-source name="myDB" type="jdbc">
            <uri value="jdbc:derby://host:port/db_name;user=userName;password=pwd"/>
            <driver value="org.apache.derby.jdbc.ClientDriver"/>
            <param name="jdbc-timeout" value="15000"/>
            <param name="jdbc-max-column-size" value="2048"/>
        </data-source>
    </data-sources>

Where myDB is the datasource name used in the configuration file; host:port is the name, or IP address, and port of the computer hosting the relational database management system; db_name is the name of the target database; userName and pwd and the user name and password of a user authorized to use this database instance.

In the following StreamSQL code, the application references the datasource defined above, then queries a table, and finally sends the result to the application's output stream:

    CREATE INPUT STREAM in (symbol string(4), price double, SKU int);
    CREATE OUTPUT STREAM out;

    APPLY JDBC myDB "SELECT * FROM table WHERE column_name={symbol}" FROM in INTO out;

Within the SQL statement, string literals are demarcated by single quotation marks ('). Since curly braces ({ and }) are used to demarcate parameters, curly braces used within the SQL statement must be escaped with the backslash (\) character. Use of stored procedures within the SQL statement is not supported.

The APPLY JDBC statement processes data from both the external database and an input stream in the application, as the preceding example shows:

  • The SQL statement that will be run against the database is enclosed within double quotation marks. Note that operations on the datasource are done using SQL instead of StreamSQL statements. The syntax of the SQL statement depends on the type of database in use, and is not documented by StreamBase.
  • The result set returned by the SQL operation is written to the output stream referenced in the INTO clause.
  • Within the SQL statement, values in the stream are accessed by enclosing them in { and } curly braces. In the example, the WHERE clause matches the value of {symbol} fields in the stream against column_name column in the database table.
  • The FROM clause specifies an application stream that is used in coding the query.
  • The APPLY JDBC statement always passes through all of the input stream's fields. This is different from the Query operator in an EventFlow application, where input schema fields may be optionally suppressed by choosing the explicitly specified fields option in the Pass Input tab. To suppress one or more input stream fields in the output, consider either adding a SELECT statement after the APPLY JDBC statement, or writing an EventFlow application instead of a StreamSQL application.

    The following example shows a SELECT statement that removes fields from the output of the preceding the APPLY JDBC statement:

        SELECT SYMBOL, PRICE FROM out => CREATE OUTPUT STREAM subsetout;

    In this example, the subsetout stream will contain only the SYMBOL and PRICE fields from the output of the preceding APPLY JDBC statement.

Java Embedded Adapters

To use a Java embedded adapter within a StreamSQL application, provide, within quotation marks, the full package and class name of the implementation class, and enclose within parentheses the parameters used by this class; these are the same parameters that are set on the Adapter Settings tab of the EventFlow embedded adapter. Parameter values that are strings must be enclosed within double quotation marks. Parameter values are not positionally defined and may be entered in any sequence.

If the adapter reads data from an external source (Input Adapters: CSV File Reader, CSV Socket Reader, Regular Expression File Reader, Regular Expression Socket Reader, JMS Reader, and StreamBase to StreamBase), then an INTO clause identifies the named stream that will receive tuples from the adapter. If the adapter writes data to an external source (Output Adapters: CSV File Writer, Email Sender, XML File Writer, and XML over HTTP File Writer), then a FROM clause identifies the named stream from which the adapter receives tuples.

CSV File Reader Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.csvreader.CSVReader. The parameter_list includes the following parameters:

Parameter Description
FileNameThe name of the file from which data will be read one line at a time. This file must be located either in the same directory in which the StreamBase server is started or in a directory identified in a <operator-resource-search directory> element within the <global> section of the configuration file used by the StreamBase server running the application. This parameter is required.
DelimiterThe character used to separate values in the data file. This parameter is optional and defaults to a comma; if desired, enter a string value (for example, ",") to explicitly specify the parameter's value.
QuoteCharThe character used within the file to delimit string values. This character is optional and defaults to a double quotation mark; if desired, enter a string (for example, "\""; note how the \ is used to escape the " that represents the desired QuoteChar) to explicitly specify the parameter's value.
TimestampFormatThe string format used to represent timestamp values. This parameter is optional and defaults to "MM/dd/yyyy hh:mm:ss aa".
PeriodThe time to wait, in milliseconds, between the processing of records. This parameter is optional.
RepeatThe number of times to iterate over the csv file; the value 0 means iterate indefinitely. This parameter is optional.
SchemaA parenthesized, comma separated list of field names and types in the tuple produced by the adapter. The sequence of types must be the same as the data in the input file. For example: Schema=(TS timestamp, Username string(20), IPAddress string(15)). This parameter is required.
CSV Socket Reader Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.csvreader.CSVSocketReader. The parameter_list includes the following parameters:

Parameter Description
HostnameA string containing either the host name or IP address of the computer sending data.
PortAn integer used to specify the TCP port to use to receive input.
DelimiterThe character used to separate values in the data file. This parameter is optional and defaults to a comma; if desired, enter a string value (for example, ",") to explicitly specify the parameter's value.
QuoteCharThe character used within the file to delimit string values. This character is optional and defaults to a double quotation mark; if desired, enter a string (for example, "\""; note how the \ is used to escape the " that represents the desired QuoteChar) to explicitly specify the parameter's value.
TimestampFormatThe string format used to represent timestamp values. This parameter is optional and defaults to "MM/dd/yyyy hh:mm:ss aa".
JMS Reader Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.jms.enqueue.JMSReader. The parameter_list includes the following parameters:

Parameter Description
ConfigFileThe name of the configuration file used by the adapter. This file must be located either in the same directory in which the StreamBase server is started or in a directory identified in a <operator-resource-search directory> element within the <global> section of the configuration file used by the StreamBase server running the application. This parameter is required.
SchemaA parenthesized, comma separated list of field names and types in the tuple produced by the adapter. The sequence of types must be the same as the data in the input message. For example: Schema=(TS timestamp, Username string(20), IPAddress string(15)). This parameter is required.
Regular Expression Reader Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.regexreader.RegexReader. The parameter_list includes the following parameters:

Parameter Description
FileName (resource) The file to read and parse. This file is read one line at a time. Each line is parsed using the Format property and emits one tuple. This file must be located either in the same directory in which the StreamBase server is started or in a directory identified in a <operator-resource-search directory> element within the <global> section of the configuration file used by the StreamBase server running the application. This parameter is required.
Format The regular expression used the parse the input file. This must be a Java regular expression as expected by the java.util.regex.Pattern class. For example, "([^,]*),([^,]*)" could be used to parse a simple, two field CSV file. This parameter is required.
Period The rate, in milliseconds, at which to read lines from File and emit tuples. Specify 0 or omit this property to emit tuples as quickly as possible. This parameter is optional.
Repeat The number of times to repeat the input file. If omitted or 1, this reads the input file once and then stops emitting tuples. If set to 0, the repeats the input file indefinitely. This parameter is optional.
DropMismatches If true, records that do not match the Format regular expression are ignored and the next record immediately examined. Otherwise, a tuple with all fields set to null is emitted when a non-matching input line is encountered. Acceptable values are true or false.
TimestampFormat The format, expressed as a string, used to parse timestamp fields extracted from the input file. This should be in the form expected by the SimpleDateFormat class. For more information, see the com.streambase.sb.adapter.Adapter class description in the StreamBase Java API documentation, for example "MM/dd/yyyy".
Schema A parenthesized, comma separated list of field names and types in the tuple produced by the adapter. The sequence of types must be the same as the data in the input message. For example: Schema=(TS timestamp, Username string(20), IPAddress string(15)). This parameter is required.
Regular Expression Socket Reader Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.regexreader.RegexSocketReader. The parameter_list includes the following parameters:

Parameter Description
Format The regular expression used the parse the input file. This must be a Java regular expression as expected by the java.util.regex.Pattern class. For example, "([^,]*),([^,]*)" could be used to parse simple, two field CSV input. This parameter is required.
DropMismatches If true, records that do not match the Format regular expression are ignored and the next record immediately examined. If false, a tuple with all fields set to null is emitted when a non-matching input line is encountered. This parameter is optional and defaults to true.
TimestampFormat The format used to parse timestamp fields extracted from the input file. This should be in the form expected by the SimpleDateFormatclass. For more information, see the com.streambase.sb.adapter.Adapter class description in the StreamBase Java API documentation. This string parameter is optional.
Schema (schema) A parenthesized, comma separated list of field names and types in the tuple produced by the adapter. The sequence of types must be the same as the data in the input message. For example: Schema=(TS timestamp, Username string(20), IPAddress string(15)). This parameter is required.
Hostname A string containing either the host name or IP address of the computer sending data.
Port An integer used to specify the TCP port to use to receive input.
StreamBase to StreamBase Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.sbd2sbdinput.SBD2SBDInput. The parameter_list includes the following parameters:

Parameter Description
UpstreamHost A string containing the host name or IP address of the upstream application.
UpstreamPort A string containing the TCP port number of the upstream application.
UpstreamStream A string containing the name of the stream to read from in the upstream application. If no stream with this name is present in the upstream application, or the stream's schema is incompatible with the schema configured in the adapter, the downstream application behaves as though the upstream application is inaccessible. The Reconnect Interval property determines whether the downstream application attempts to reconnect periodically until a compatible stream is present.
ReconnectInterval An integer representing the period, in seconds, the downstream application waits in between reconnect attempts after the upstream application fails or no compatible stream is present. Set this property to zero to disable reconnection. When reconnection is disabled, the upstream application must be started first and must have a compatible output stream from which the downstream application can read.
Schema The schema to output into the downstream application. This schema must match that of the output stream in the upstream application.
CSV File Writer Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.csvwriter.CSVWriter. The parameter_list includes the following parameters:

Parameter Description Acceptable Values
FileName Path and name of file into which the adapter will write data. If this adapter is configured with a max file size then this filename will be appended with the date & time if the max file size is reached.

When running Windows, the path separator character (\) must be escaped (for example, C:\\myDirectory\\myFile.csv). Alternatively, forward slashes may be used with Windows (C:/myDirectory/myFile.csv) and UNIX/Linux.

"path/file_name.extension"
IncludeHeaderInFile Include an optional row a the top of each file with the name of each column. true or false
IfFileDoesntExist Action to take if the specified CSV file does not exist when the adapter is started. "Fail" or "Create new file"
IfFileExists Action to take if the specified CSV file already exists when the adapter is started. "Truncate existing file", "Fail", or "Append to existing file"
FieldDelimiter Character used to mark the end of one field and the beginning of another. Comma: ","
StringQuoteCharacter Character to use to quote strings when they contain the field delimiter. Double quotation mark: ""\""
FlushInterval How often to force tuples to disk. Defined in seconds. Integer values
NullValueRepresentation String to write when a field is null. "null"
ThrottleErrorMessages Only show a given error message once. true or false
MaxFileSize Maximum size, in bytes, of the file on disk. If the file reaches this limit, it is renamed with the current timestamp and new data is put to the current FileName. Integer values; the value 0 indicates no rolling file
StringQuoteOption Determines when string fields are quoted in the CSV file. "Never quote", "Always quote", or "Quote if necessary"
Email Sender Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.emailsender.EmailSender. Two parameters are needed to configure each adapter setting. The first parameter specifies whether the configuration value is contained within the tuple being processed or specified in an associated second parameter. The second parameter provides either the name of the tuple field that holds the configuration value or a static value for the configuration value. The parameter_list includes the following parameters:

Parameter Description Acceptable Values
FromContainsWhether the configuration value is contained within the tuple being processed or specified in an associated second parameter.Either the string literal "Static text" or the string literal "Schema field name"
FromValueEither a string containing the From value or the field name in the tuple from which the From value should be extracted.Enter, within quotation marks, the From email address or the name of the tuple field that contains this value.
ReplyToContainsWhether the configuration value is contained within the tuple being processed or specified in an associated second parameter.Either the string literal "Static text" or the string literal "Schema field name"
ReplyToEither a string containing the Reply To value or the field name in the tuple from which the Reply To value should be extracted.Enter, within quotation marks, the Reply To email address or the name of the tuple field that contains this value.
ToContainsWhether the configuration value is contained within the tuple being processed or specified in an associated second parameter.Either the string literal "Static text" or the string literal "Schema field name"
ToEither a string containing the To value or the field name in the tuple from which the To value should be extracted.Enter, within quotation marks, the To email address or the name of the tuple field that contains this value.
CcContainsWhether the configuration value is contained within the tuple being processed or specified in an associated second parameter.Either the string literal "Static text" or the string literal "Schema field name"
CcEither a string containing the Cc value or the field name in the tuple from which the Cc value should be extracted.Enter, within quotation marks, the Cc email address or the name of the tuple field that contains this value.
BccContainsWhether the configuration value is contained within the tuple being processed or specified in an associated second parameter.Enter the string literal "Static text" or the string literal "Schema field name"
BccEither a string containing the Bcc value or the field name in the tuple from which the Bcc value should be extracted.Enter, within quotation marks, the Bcc email address or the name of the tuple field that contains this value.
SubjectContainsWhether the configuration value is contained within the tuple being processed or specified in an associated second parameter.Either the string literal "Static text" or the string literal "Schema field name"
SubjectEither a string containing the Subject value or the field name in the tuple from which the Subject value should be extracted.Enter, within quotation marks, the subject of the email or the name of the tuple field that contains this value.
BodyContainsWhether the configuration value is contained within the tuple being processed or specified in an associated second parameter.Either the string literal "Static text" or the string literal "Schema field name"
BodyEither a string containing the From value or the field name in the tuple from which the From value should be extracted.Enter, within quotation marks, the body of the email message or the name of the tuple field that contains this text.
SmtpPortThe port number on which to contact the SMTP mail server.Port number
SmtpServerThe URL or IP address of the SMTP mail server.URL or IP Address
ThrottleErrorMessagesDetermines whether the adapter suppresses duplicate error messages.true or false
XML File Writer Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.xmlfilewriter.XMLFileWriter. The parameter_list includes the following parameters:

Parameter Description Acceptable Values
FileName Path and name of file into which the adapter will write data. This parameter is required. "path/file_name.extension"
truncateFile If true, truncate file on startup. If false, append to an existing file. This parameter is required. true or false.
maxFileSize An integer that sets the maximum XML file size in bytes before closing the file and opening another. This parameter is optional. 0, unlimited file size
bufferSize (optional int) An integer that sets the buffer size. This parameter is optional. 0, unlimited buffer size
flushInterval An integer that sets how frequently to force flush in milliseconds. This parameter is optional. 0, always flush and write immediately
charset A string identifying the charset for the XML file. This parameter is optional; the default is the system property streambase.tuple-charset. "US-ASCII"
escape If true, XML encode tuple data. The default is false. true or false
XML Over HTTP File Writer Embedded Adapter

The Java class that implements this adapter is: com.streambase.sb.adapter.xmloverhttp.XMLOverHTTP. The parameter_list includes the following parameters:

Parameter Description Acceptable Values
URL (string) A string containing the URL to send the HTTP POST request. This parameter is required. "URL"
flushInterval An integer indicating how often to deliver messages to the URL, in milliseconds. If set to 0, messages are delivered immediately. This parameter is optional. See the maxMessageChache parameter for the case where this parameter is required. An integer >= 0.
maxMessageCache An integer that sets the maximum number of messages to queue each interval. If set to 0, unlimited messages are cached per interval. This parameter is optional. If the maxMessageCache parameter is set, the flushInterval parameter must also be set and its value must be greater than 0. An integer >= 0.

Related Topics

Back to Top ^