Developers: Controlling Window Size — Part 2

Home
Documentation
Library
Sample Code and Applications
FAQs
Articles
Community
Training
Download Center
Contact DevZone

Printer Friendly

Library Articles

Controlling Window Size — Part 2

Author: Simon Keen
Contributor: Dr. John Lifter
StreamBase Systems
7-May-2007

Topics:

Introduction 

When defining the dimensions of a window within a StreamBase EventFlow aggregate operator, or when writing a StreamSQL window definition, you must provide values for the open/advance and/or close/size entries. These values are entered at design time and there is no facility to change these settings while the application is running. It is possible that you may have an application in which you want to alter a window's dimensions without stopping, editing, and then restarting the application. Although there is no way to accomplish this objective when using a Tuple or Time based window, a Field based window that uses a numeric field to deliminate a window's dimensions will support this use case.

In order to fulfill this objective, you must have some way of changing, on demand, the value of the field that controls the window's opening and closing. Additionally, to avoid the possiblity that windows of unintended dimensions could be created during the reconfiguration process, you must ensure that a window's dimensions are only changed in the interval between when a window closes and the next window opens.

This article will develop a StreamBase EventFlow application that adds a controllable, incrementable numeric field to a tuple. The application may be used as a module in other StreamBase applications where you want the capability of managing the dimensions of a downstream Field based window so that it behaves as a reconfigurable Time based window. While a comparable application could be written using the StreamSQL text based paradigm (see A StreamSQL Version of the TimeBasedControlledWindowSize EventFlow Application), the EventFlow implementation is a little more versatile and suitable for use as a module in both EventFlow and StreamSQL text based applications.

The EventFlow Application 

Let's consider the situation in which, although the downstream window dimension will be Field based, the design requires the ability to change the length of time each window remains open. That is, the application will control a Field based window dimension so that it functions as a reconfigurable Time based window dimension. This application must manage four numeric values:

  • The time in seconds that the current window will remain open.
  • A window identifier.
  • The desired time in seconds that the next window should remain open.
  • The time at which the current window opened.

In this example, the first three values are integers, but if your application requires a larger range of window identifiers, the window identifier may be a double data type. The fourth value is a timestamp data type.

At runtime, as required, the user of the application changes the desired duration of the next window. When the current window closes, that is, when the period of time the current window has been open equals the current window duration setting, the application will increment the window identifier and then, if necessary, update the current window duration setting to the desired duration of the next window. If the desired window duration has not changed, the next window will open for the same lenght of time as the previous window; if the desired window size has changed, the new window will have a different duration. The new window duration may be either smaller or larger than the current window duration.

The easiest way to manage these four variables is by using an in memory query table. The table has only one row and five columns: a primary key column and one column for each variable. As a tuple enters the application, its schema will be modified so that it includes all of the fields from the query table as well as its original fields and a new field containing the window identifier. Then the application determines if the current window is ready to close. If so, the application increments the window identifier field and then updates the values in the query table; if the window is not ready to close, the window identifier and query table values are not altered. Finally, the application removes the query table derived fields from the tuple and emits a tuple containing the original fields and the window identifier. A downstream aggregate operator will close and open windows whenever the value in the window identifier changes.

Since this application is being developed as a reusable module, it is extremely important that all of the fields originally included in the tuple are passed unaltered to the output stream. This is accomplished by configuring each operator to implicitly pass all input fields to its emitted tuple.

The EventFlow Application Diagram

The following diagram presents the application's processing logic.

EventFlow Application Diagram

The WindowControl Query Table

This table has the structure illustrated in the following figure. The column Row is the primary key column and, since there is only one row in this table, will always be equal to 1 (or any other integer value you select).

WindowControl Query Table

The SetWindowSize Query Operator

On the Query Settings tab, this operator is configured to perform a write operation into the query table row with primary index value 1. This is the only row in the WindowControl query table. The important configuration information is on the Operation Settings tab where an update is used to set the desired duration of the next window to a new value. Notice that the values in the other columns are not changed. If the table is empty, this operator inserts a row setting each of the columns to an appropriate starting value. The Output Settings tab has been configured so that a tuple is not emitted from this operator.

SetWindowSize Query Operator

The Control Input Stream

Tuples enqueued to this input stream have a single field, named WindowSize, which is the desired size of the window. Since the design of this application requires setting the initial window size before the remainder of the application can perform as desired, a tuple must be enqueued to this stream immediately after the application starts. In an actual deployment, you might want to use the One Shot Embedded Adapter to set the initial window size and use the Control input stream to change the window size.

One Shot Embedded Input Adapter

The MatchingData Input Stream and AddTimestamp Map Operator

Tuples to be processed by the components downstream from this application enter through the MatchingData stream. In order to write this application, a schema must be assigned to this stream. When this application is used as a module, the schema assigned to this input stream will be overridden by the schema of the tuple submitted by components upstream to this application. However, depending on whether you want window durations to be based on the system time of the computer running the application, or a timestamp field in the tuples processed by this application, the schema of this input stream will vary.

If you want to use the system clock, the actual schema assigned to the MatchinData input stream does not matter as the timestamp field, named ts, which this application uses to manage the window dimensions, will be added by the AddTimestamp map operator as shown in the following figure. The schema associated with this input stream can simply include a single integer field named value. This is the approach used in developing this example.

AddTimeStamp Map Operator

If you want to use a timestamp field from the enqueued tuple, then the schema assigned to MatchingData must include a timestamp field with the same name as the enqueued tuple's timestamp field. Since the remainder of this application requires a timestamp field is named ts, use the AddTimestamp map operator to add this timestamp field, setting its value to the value of the tuple's timestamp field. In the following figure, the value in the tuple's timestamp field, named tuple_ts_field_name, is copied into the added field. The schema assigned to MatchingData must include a timestamp field with the same name.

AddTimeStamp Map Operator

The ReadWindowSizeSetTupleId Query Operator

On the Query Settings tab, this operator is configured to perform a write operation into a row with primary index value 1. This is the only row in the WindowControl query table. The important configuration information is on the Operation Settings and Output Settings tabs. An update is used to increment the number of tuples in the current window. Note that if the row does not already exist in the query table, the update operation is aborted.

ReadWindowSizeSetTupleId Query Operator

The emitted tuple will contain all of the fields from the input tuple and the query table.

ReadWindowSizeSetTupleId Query Operator

The schema of the emitted tuple contains many fields. All of the fields derived from the query table and the timestamp field ts will eventually dropped from the tuples emitted onto the DataOutWithWndowId output stream. The field value will be replaced by the fields in the tuple being processed.

ReadWindowSizeSetTupleId Query Operator

The GenerateWindowId Map Operator

This operator passes all fields from the input tuple and adds another field, named WindowId, to the tuple. Ultimately, this field, along with the fields in the tuple being processed, will be included in the tuples emitted onto the DataOutWithWndowId output stream.

GenerateWindowId Map Operator

Note how the expression checks if the current window is ready to close; if true, the expression increments the window identifier, which will cause a downstream window to close and opens a new window.

The OnlyUpdateOnChange Filter Operator

This operator checks whether the window identifier has been incremented. If true, it passes a tuple to the UpdateWindowId query operator, which leads to an update of the WindowControl query table.

The expression used as the predicate is:

  t_LastWindowId != WindowId

The UpdateWindowId Query Operator

On the Query Settings tab, this operator is configured to perform a write operation into a row with primary index value 1. The important configuration information is on the Operation Settings tab. An update is used to copy the the value in the desired window size column into the column containing the current window size and to change the value in the window identifier column to a new value that was supplied by the GenerateWindowId map operator. Note that if the row does not already exist in the query table, the update operation is aborted. The combination of the GenerateWindowId map operator, OnlyUpdateOnChange filter operator, and UpdateWindowId query operator provides the logic that changes, if necessary, the dimensions of a Field based window in a downstream aggregate operator.

UpdateWindowId Query Operator

The TidyUp Map Operator

This operator removes all the fields derived from the WindowControl query table and the timestamp field added by the AddTimestamp map operator from the tuple that will be emitted on the output stream DataOutWithWindowId.

TidyUp Map Operator

The DataOutWithWindowId Output Stream

Tuples emitted by this application will include an integer field named WindowId and the fields from the tuples originally enqueued to the application.

Using the EventFlow Application as a Module in Another StreamBase Application 

This EventFlow application – TimeBasedControlledWindowSize.sbapp – may be used as a module in either another EventFlow application diagram or a StreamSQL text based application as illustrated in the following figure. When combined with a downstream aggregate operator, or StreamSQL SELECT statement against a windowed stream, that use a Field based dimension, this module mimics the functionality of a Time based window.

EventFlow Application

EventFlow Application

StreamSQL Application

   CREATE INPUT STREAM ControlStream (
       WindowSize int
   );
   CREATE INPUT STREAM DataStream (
       x int,
       y string(4),
       z double
   );

   CREATE OUTPUT STREAM AggregatedDataStream;

   CREATE STREAM ModuleOutputStream;

   APPLY MODULE "TimeBasedControlledWindowSize.sbapp"
     FROM Control = ControlStream,
          MatchingData = DataStream
     INTO DataOutWithWindowId = ModuleOutputStream;

   SELECT count() AS numberOfTuples
     FROM ModuleOutputStream
             [SIZE 1 ADVANCE 1 ON WindowId OFFSET 0]
     INTO AggregatedDataStream;
		 

When configuring the TimeBasedControlledWindowSize module reference in the EventFlow application, you must select the Override module input schemas with incoming schemas checkbox on the Properties view, Input Ports tab. For the StreamSQL version of this application, using the incoming schema to override the module input stream schema is the default behavior, so the APPLY statement does not require a specific directive.

Module Reference, Input Ports Tab

Note that the input stream ControlStream has the same schema as the module's input stream Control while the input stream DataStream has a schema completely different than the module's input stream MatchingData. Since tuples enqueued to ControlStream provide a value that is used by the module – the new window size – they must include a field with the same name and type as the field required by the module. However, since the fields in the tuples enqueued to DataStream are not used within the module, the schema associated with these tuples is not restricted in any way.

The example illustrated in the preceding figures uses a downstream aggregate operator (ConfigurableAggregate), or StreamSQL SELECT statement against a windowed stream, with a Field based window definition where SIZE and ADVANCE are both set to one. This combination of module and aggregate operator (or SELECT statement) yields an outcome equivalent to a Time based window where SIZE and ADVANCE are both set to the same value, which is the duration specified in the TimeBasedControlledWindowSize module. With a judicious choice of WindowSize, SIZE, and ADVANCE it is possible for the TimeBasedControlledWindowSize module and downstream Field based window to mimic any Time based dimension setting.

For example, setting:

  • WindowSize to 10 seconds, SIZE to 1, and ADVANCE to 1 is equivalent to a Time based dimension with SIZE and ADVANCE set to 10 seconds.
  • WindowSize to 5 seconds, SIZE to 2, and ADVANCE to 1 is equivalent to a Time based dimension with SIZE set to 10 seconds and ADVANCE set to 5 seconds.
  • WindowSize to 2 seconds, SIZE to 5, and ADVANCE to 1 is equivalent to a Time based dimension with SIZE set to 10 seconds and ADVANCE set to 2 seconds.
  • WindowSize to 5 seconds, SIZE to 2, and ADVANCE to 1 is equivalent to a Time based dimension with SIZE set to 10 seconds and ADVANCE set to 5 seconds.

From these examples, you can observe that with the TimeBasedControlledWindowSize module and downstream Field based dimension combination WindowSize * SIZE becomes the SIZE and WindowSize * ADVANCE becomes the ADVANCE, settings of a comparable Time based dimension.

A StreamSQL Version of the TimeBasedControlledWindowSize EventFlow Application 

A StreamSQL text based version of the TimeBasedControlledWindowSize EventFlow application can be easily developed (or simply generated from the existing EventFlow application). This version of the applicaiton can also be used as a module in either an EventFlow application diagram or another StreamSQL application. However, there is a slight difference in the way a module based on a .ssql file and a module based on a .sbapp file interpret the Override module input schemas with incoming schemas directive. The input stream schema of a .ssql based module must include at least one field that is also included in the schema of the stream that enqueues tuples. In this example, the module input stream MatchingData would need to have at least one field that is also included in the schema DataStream; this restriction does not exist for a .sbapp based module.

Download Application File 

You may download a StreamBase application illustrating these concepts. Right click on the hyperlink and save the .sbapp file to a convenient location; make certain you save the file with the extension .sbapp. Now import this application file into a StreamBase project.

Related Topics 

Back to Top ^