When defining the dimensions of a window within a StreamBase EventFlow aggregate operator, or when writing a StreamSQL window definition, you must provide values for the open/advance and/or close/size entries. These values are entered at design time and there is no facility to change these settings while the application is running. It is possible that you may have an application in which you want to alter a window's dimensions without stopping, editing, and then restarting the application. Although there is no way to accomplish this objective when using a Tuple or Time based window, a Field based window that uses a numeric field to deliminate a window's dimensions will support this use case.
In order to fulfill this objective, you must have some way of changing, on demand, the value of the field that controls the window's opening and closing. Additionally, to avoid the possiblity that windows of unintended dimensions could be created during the reconfiguration process, you must ensure that a window's dimensions are only changed in the interval between when a window closes and the next window opens.
This article will develop a StreamBase EventFlow application that adds a controllable, incrementable numeric field to a tuple. The application may be used as a module in other StreamBase applications where you want the capability of managing the dimensions of a downstream Field based window so that it behaves as a reconfigurable Tuple based window. While a comparable application could be written using the StreamSQL text based paradigm (see A StreamSQL Version of the ControlledWindowSize EventFlow Application), the EventFlow implementation is a little more versatile and suitable for use as a module in both EventFlow and StreamSQL text based applications.
Let's consider the situation in which, although the downstream window dimension will be Field based, the design requires the ability to change the number of tuples comsumed by each window. That is, the application will control a Field based window dimension so that it functions as a reconfigurable Tuple based window dimension. This application must manage four numeric values:
- The size (that is, number of tuples) of the currently opened window.
- A window identifier.
- The desired size (again, number of tuples) of the next window to open.
- The number of tuples in the current window.
In this example, all four values are integers, but if your application requires a larger range of window identifiers, the window identifier may be a double data type.
At runtime, as required, the user of the application changes the desired size of the next window. When the current window closes, that is, when the number of tuples in the current window equals the size of the current window, the application will increment the window identifier and then, if necessary, update the current window size setting to the desired size of the next window. If the desired window size has not changed, the next window will be the same size as the previous window; if the desired window size has changed, the new window will have a different size. The new window size may be either smaller or larger than the current window size.
The easiest way to manage these four variables is by using an in memory query table. The table has only one row and five columns: a primary key column and one column for each variable. As a tuple enters the application, its schema will be modified so that it includes all of the fields from the query table as well as its original fields and a new field containing the window identifier. Then the application determines if the current window is ready to close. If so, the application increments the window identifier field and then updates the values in the query table; if the window is not ready to close, the window identifier and query table values are not altered. Finally, the application removes the query table derived fields from the tuple and emits a tuple containing the original fields and the window identifier. A downstream aggregate operator will close and open windows whenever the value in the window identifier changes.
Since this application is being developed as a reusable module, it is extremely important that all of the fields originally included in the input tuple are passed unaltered to the output stream. This is accomplished by configuring each operator to implicitly pass all input fields to its emitted tuple.
The EventFlow Application Diagram
The following diagram presents the application's processing logic.

The WindowControl Query Table
This table has the structure illustrated in the following figure. The column Row is the primary key column and, since there is only one row in this table, will always be equal to 1 (or any other integer value you select).

The SetWindowSize Query Operator
On the Query Settings tab, this operator is configured to perform a write operation into the query table row with primary index value 1. This is the only row in the WindowControl query table. The important configuration information is on the Operation Settings tab where an update is used to set the desired size of the next window to a new value. Notice that the values in the other columns are not changed. If the table is empty, this operator inserts a row setting each of the columns to an appropriate starting value. The Output Settings tab has been configured so that a tuple is not emitted from this operator.

The Control Input Stream
Tuples enqueued to this input stream have a single field, named WindowSize, which is the desired size of the window. Since the design of this application requires setting the initial window size before the remainder of the application can perform as desired, a tuple must be enqueued to this stream immediately after the application starts. In an actual deployment, you might want to use the 'One Shot' Embedded Adapter to set the initial window size and use the Control input stream to change the window size.

The MatchingData Input Stream
Tuples to be processed by the components downstream from this application enter through this stream. In order to write this application, a schema must be assigned to this stream. However, when this application is used as a module, the schema assigned to this input stream will be overridden by the schema of the tuple submitted by components upstream to this application. Therefore, in this EventFlow implementation the actual schema assigned to the MatchinData input stream does not matter. Consequently, the schema associated with this input stream simply includes a single integer field named value.
The ReadWindowSizeSetTupleId Query Operator
On the Query Settings tab, this operator is configured to perform a write operation into a row with primary index value 1. This is the only row in the WindowControl query table. The important configuration information is on the Operation Settings and Output Settings tabs. An update is used to increment the number of tuples in the current window. Note that if the row does not already exist in the query table, the update operation is aborted.

The emitted tuple will contain all of the fields from the input tuple and the query table.

The schema of the emitted tuple contains many fields. All of the fields derived from the query table will eventually dropped from the tuples emitted onto the DataOutWithWndowId output stream. The field value will be replaced by the fields in the tuple being processed.

The GenerateWindowId Map Operator
This operator passes all fields from the input tuple and adds another field, named WindowId, to the tuple. Ultimately, this field, along with the fields in the tuple being processed, will be included in the tuples emitted onto the DataOutWithWndowId output stream.

Note how the expression checks if the current window is ready to close; if true, the expression increments the window identifier, which will cause a downstream window to close and opens a new window.
The OnlyUpdateOnChange Filter Operator
This operator checks whether the window identifier has been incremented. If true, it passes a tuple to the UpdateWindowId query operator, which leads to an update of the WindowControl query table.
The expression used as the predicate is:
t_LastWindowId != WindowId
The UpdateWindowId Query Operator
On the Query Settings tab, this operator is configured to perform a write operation into a row with primary index value 1. The important configuration information is on the Operation Settings tab. An update is used to copy the the value in the desired window size column into the column containing the current window size and to change the value in the window identifier column to a new value that was supplied by the GenerateWindowId map operator. Note that if the row does not already exist in the query table, the update operation is aborted. The combination of the GenerateWindowId map operator, OnlyUpdateOnChange filter operator, and UpdateWindowId query operator provides the logic that changes, if necessary, the number of tuples that will be included in the Field based window in a downstream aggregate operator.

The TidyUp Map Operator
This operator removes all the fields derived from the WindowControl query table from the tuple that will be emitted on the output stream DataOutWithWindowId.

The DataOutWithWindowId Output Stream
Tuples emitted by this application will include an integer field named WindowId and the fields from the tuples originally enqueued to the application.
This EventFlow application – ControlledWindowSize.sbapp – may be used as a module in either another EventFlow application diagram or a StreamSQL text based application as illustrated in the following figure. When combined with a downstream aggregate operator, or StreamSQL SELECT statement against a windowed stream, that use a Field based dimension, this module mimics the functionality of a Tuple based window.
|
EventFlow Application |
|
StreamSQL Application CREATE INPUT STREAM ControlStream (
WindowSize int
);
CREATE INPUT STREAM DataStream (
x int,
y string(4),
z timestamp
);
CREATE OUTPUT STREAM AggregatedDataStream;
CREATE STREAM ModuleOutputStream;
APPLY MODULE "ControlledWindowSize.sbapp"
FROM Control = ControlStream,
MatchingData = DataStream
INTO DataOutWithWindowId = ModuleOutputStream;
SELECT count() AS numberOfTuples
FROM ModuleOutputStream
[SIZE 1 ADVANCE 1 ON WindowId OFFSET 0]
INTO AggregatedDataStream;
|
When configuring the ControlledWindowSize module reference in the EventFlow application, you must select the Override module input schemas with incoming schemas checkbox on the Properties view, Input Ports tab. For the StreamSQL version of this application, using the incoming schema to override the module input stream schema is the default behavior, so the APPLY statement does not require a specific directive.

Note that the input stream ControlStream has the same schema as the module's input stream Control while the input stream DataStream has a schema completely different than the module's input stream MatchingData. Since tuples enqueued to ControlStream provide a value that is used by the module – the new window size – they must include a field with the same name and type as the field required by the module. However, since the fields in the tuples enqueued to DataStream are not used within the module, the schema associated with these tuples is not restricted in any way.
The example illustrated in the preceding figures uses a downstream aggregate operator (ConfigurableAggregate), or StreamSQL SELECT statement against a windowed stream, with a Field based window definition where SIZE and ADVANCE are both set to one. This combination of module and aggregate operator (or SELECT statement) yields an outcome equivalent to a Tuple based window where SIZE and ADVANCE are both set to the same value. With a judicious choice of WindowSize, SIZE, and ADVANCE it is possible for the ControlledWindowSize module and downstream Field based window to mimic any Tuple based dimension setting.
For example, setting:
WindowSize to 10 tuples, SIZE to 1, and ADVANCE to 1 is equivalent to a Tuple based dimension with SIZE and ADVANCE set to 10 tuples.
WindowSize to 5 tuples, SIZE to 2, and ADVANCE to 1 is equivalent to a Tuple based dimension with SIZE set to 10 tuples and ADVANCE set to 5 tuples.
WindowSize to 2 tuples, SIZE to 5, and ADVANCE to 1 is equivalent to a Tuple based dimension with SIZE set to 10 tuples and ADVANCE set to 2 tuples.
WindowSize to 5 tuples, SIZE to 2, and ADVANCE to 1 is equivalent to a Tuple based dimension with SIZE set to 10 tuples and ADVANCE set to 5 tuples.
From these examples, you can observe that with the ControlledWindowSize module and downstream Field based dimension combination WindowSize * SIZE becomes the SIZE and WindowSize * ADVANCE becomes the ADVANCE, settings of a comparable Tuple based dimension.
A StreamSQL text based version of the ControlledWindowSize EventFlow application can be easily developed (or simply generated from the existing EventFlow application). This version of the applicaiton can also be used as a module in either an EventFlow application diagram or another StreamSQL application. However, there is a slight difference in the way a module based on a .ssql file and a module based on a .sbapp file interpret the Override module input schemas with incoming schemas directive. The input stream schema of a .ssql based module must include at least one field that is also included in the schema of the stream that enqueues tuples. In this example, the module input stream MatchingData would need to have at least one field that is also included in the schema DataStream; this restriction does not exist for a .sbapp based module.
You may download a StreamBase application illustrating these concepts. Right click on the hyperlink and save the .sbapp file to a convenient location; make certain you save the file with the extension .sbapp. Now import this application file into a StreamBase project.
Back to Top ^