Library Articles
Data Parallelism in StreamBase Applications
Authors: Richard Tibbetts, Dr. John Lifter
StreamBase Systems
26-February-2007
Applicable To: StreamBase 3.7, 5.0
Topics:
In a StreamSQL EventFlow application, many operators and module references can be configured to run concurrently in a separate thread. This feature is enabled on the Concurrency tab of the component's Properties view by checking the Run this component in a separate thread checkbox.
If you know that portions of your StreamBase application can run without dependencies on the other streaming data in your application, you may be able to improve the overall throughput by selecting this concurrency option. Each EventFlow module reference or operator that has concurrency enabled will run in its own processing thread. On a Symmetric Multi-Processing (SMP) machine, the threads are distributed automatically across the available processors. Optionally, you can assign the processing of components marked for concurrency to run on separate physical machines in a cluster.
The concurrency option is not available for the Query, Lock, and Unlock operators. Individually, the Query operator cannot run in its own thread because it depends on a connected Query Table data construct. Each Lock or Unlock operator does not offer the concurrency option because they work with a Lock Set data construct and function as a unit to protect critical data.
In a StreamSQL application, only modules can be run in separate threads. Concurrency is enabled by including the PARALLEL keyword in the APPLY MODULE statement. If you convert an EventFlow application into a StreamSQL application, each operator marked for concurrency must be placed into a separate module and the module is then marked for concurrency.
Beginning with the StreamBase v3.7 release, the concurrency option has been extended to include the concept of data parallelism. Data parallelism is the ability within a StreamBase application to instantiate multiple instances of a StreamSQL EventFlow operator or an EventFlow or StreamSQL module. When data parallelism is enabled, the StreamBase runtime engine increases application throughput by transparently distributing tuples across the multiple instances of the operator or module. When configured in this way, it is guaranteed that all tuples with the same value in a field identified as the Key expression will be sent to the same operator or module instance. Only operators or modules configured to run in a separate thread may be optionally configured to offer data parallelism.
In an EventFlow application, the Properties view, Concurrency tab has three controls, which are only enabled if you first select the Run the component in a separate thread option. For components using concurrency, you may then check the Run in parallel threads checkbox, which enables the Number of threads: and the Key expression: text boxes. Use these controls to set the number of instances of the component you want to instantiate and an expression, derived from fields in the incoming tuple, which evaluates to an integer value. StreamBase will use the resulting integer to select which instance will process the tuple.
In the example that follows, concurrency and data parallelism have been enabled for a Map operator and the StreamBase application will create two instances of this operator. As each tuple is processed, StreamBase will use the value in the customerNumber tuple field to select which instance of the Map operator will process the tuple. StreamBase guarantees that tuples with the same customerNumber value will be processed by the same instance of this Map operator.

The key expression must evaluate to an integer value. If the customerNumber field is an integer data type, there is no issue. If the field type is a double, the int(double) type conversion function can be used to transform the field into an integer.
String fields are converted into an integer value with the function hash(string_field_name). In this example, the key expression would become hash(customerNumber).
Other concurrency capable StreamSQL EventFlow operators and the module reference are configured in the same way.
When the application starts it instantiates multiple instances of each operator or module reference. You can confirm that the multiple instances have been created by running the sbmonitor

or jsbmonitor applications

and observe the multiple entries for the operators or module references. Note that the designation default refers to the container hosting the application; see The StreamBase Server Container Model for more information on the StreamBase container model.
In a StreamSQL application, only modules can be configured to run in a separate thread and use data parallelism. Inclusion of a module in a StreamSQL application is through the APPLY MODULE statement. To use data parallelism, you need to include the PARALLEL instance_number BY field_name clause immediately after the APPLY keyword. For instance_number supply an integer value, which is the equivalent of the number selected in the Number of threads: spinner control; the field_name is the name of an integer field in the incoming tuple.
If, in the previous example, customerNumber is an integer value, the statement would have the following syntax.
APPLY PARALLEL 2 BY customerNumber MODULE..
If customerNumber is a non-integer value, you will need to use one of the type conversion functions [int(double), hash(string)] to convert the field into an integer. The conversion function may not be used directly within the APPLY statement. You will need to invoke the conversion function within a SELECT statement, adding the resulting integer field to the tuple passed into the module. This process is summarized in the following coding example.
CREATE INPUT STREAM in (customerNumber string(10), ...);
CREATE OUTPUT STREAM out;
CREATE STREAM s AS
SELECT *, hash(customerNumber) AS hash_value
FROM in;
APPLY PARALLEL 2 BY hash_value MODULE "module_name"
FROM module_input_stream = s
INTO module_output_stream = out;
Each operator, module or StreamSQL APPLY statement configured for data parallelism will instantiate multiple instances. This is easily confirmed by running the sbmonitor or jsbmonitor command line utilities and noting the replica instances of operators or their StreamSQL equivalents. When modules are configured for data parallelism, all components within the module are replicated including data constructs (Query Table, JDBC Table, Chronicle Connection, Materialized Windows, Lock Set). However, data constructs are not listed in the output from sbmonitor or jsbmonitor. Therefore, you need to be aware that a separate instance of each of these data constructs will exist in each replicated module instance.
Back to Top ^