Developers: Controlling Window Size — Part 3

Home
Documentation
Library
Sample Code and Applications
FAQs
Articles
Community
Training
Download Center
Contact DevZone

Printer Friendly

Library Articles

Controlling Window Size — Part 3

Author: Simon Keen
Contributor: Dr. John Lifter
StreamBase Systems
3-May-2007

Topics:

 
Introduction

 
In many StreamBase applications data is read from a query table or JDBC table data construct or materialized window. Frequently these queries return a result set with more than one row, which results in multiple tuples being emitted from the query operator. Your application might then aggregate the information contained in this stream of tuples, as illustrated in the following figure.

EventFlow Application Diagram

Beginning with StreamBase 3.7, the query operator, or StreamSQL SELECT statement in which the FROM clause references both a stream and a query table or materialized window, can execute the aggregate functions directly on the result set, which eliminates the downstream aggregate operator or StreamSQL SELECT statement in which the FROM clause references a windowed stream. This feature, which is described in the article Aggregation over Query Tables and Materialized Windows is useful if you do not want to pass field values from the input tuple past this EventFlow operator or StreamSQL statement. If you are reading data from a JDBC table data construct, or you want to pass field values from the input tuple past this step in your application, you cannot execute the aggregate functions as part of the query and must continue to pass the individual tuples from the query to a separate aggregation.

Since sequential read operations may each return a result set containing a variable number of rows, which leads to a variable number of tuples being created and sent to the downstream aggregation, your application will need some mechanism that allows differentiation between the tuples belonging to sequential result sets. This allows the aggregate functions to be independently applied to each result set's data.

The approach typically used to differentiate tuples from sequential result sets is to add an identifier field to each tuple before executing the query and then passing this field to the emitted tuples. Since each identifier is unique, tuples derived from the same result set will share the same identifier and aggregate functions can be applied to set of tuples derived from a single result set. This concept is illustrated in the following figure.

EventFlow Application Diagram

The AddIdentifier aggregate operator includes a Tuple based window that emits after each tuple arrives.

AddIdentifier Aggregate Operator

This operator uses the lastval function to pass all tuple fields and adds a new field, typically named TickNum, to the emitted tuple. The TickNum field uniquely identifies each tuple before it triggers the query.

AddIdentifier Aggregate Operator

The ReadTable query operator is then configured to pass the TickNum field, as well as any desired fields from the input tuple and each result set row, to the emitted tuples. As a result, the downstream AggregateByID aggregate operator can use the TickNum field to define a Field based dimension that applies aggregate expressions to the tuples from each result set.

AggregateByID Aggregate Operator

The AggregateByID aggregate operator emits its result tuple whenever the value of the TickNum field changes. The tuple containing the aggregate function values for the result set derived from one query will not be emitted until the first tuple of a following result set enters the AggregateByID operator. This does not present a problem if the stream of tuples entering the AddIdentifier aggregate operator is continuous, but it could present a problem if this stream is interrupted. In this situation, emission of tuples from the AggregateByID operator will be delayed.

This article describes how this delays of this type can be avoided.

Using a Single Tuple to Open and Close a Window  
The trick to avoiding any sort of delay in emitting tuples from the AggregateByID operator is to use the same tuple to both open and close its associated window. The following application diagram illustrates this technique.

EventFlow Application Diagram

The DataIn Input Stream

The schema assigned to this stream describes the tuples enqueued to this application.
 

The AddIdentifier Aggregate Operator

The AddIdentifier aggregate operator adds the TickNum field to each incoming tuple. The ReadTableFirst split operator sends a copy of each tuple along two downstream pathways. Initially a copy of the tuple is submitted to the upper pathway, which then performs the query and sends tuples derived from the result set downstream into the AggregateByID aggregate operator. After all the tuples derived from a result set have been processed, the split operator submits a copy of the original tuple to the lower pathway, which also forwards the tuple to the AggregateByID operator. So, How do these series of steps allow a single tuple to both open and close the window in the AggregateByID operator? The answer is in the processing performed within the RealDataWindowId and SetCloseWindowId map operators.
 

The RealDataWindowId Map Operator

This operator passes all input fields to its emitted tuple, adding a new field named WindowId. The value assigned to this field is TickNum * 2. Multiplication by two ensures that WindowId will be an even value.

RealDataWindowId Map Operator

The modified tuple is then passed to the ReadTable query operator.

The ReadTable Query Operator

This operator performs the read operation and passes the WindowId field to each emitted tuple. Consequently, all tuples derived from a common result set have the same, even value, in a field named WindowId.

The AggregateByID Aggregate Operator

This operator includes a Field based dimension with opening and closing values set to one. Consequently, it will apply aggregate expressions to the collection of tuples derived from a result set. When the first tuple from a subsequent result set enters the operator, it will close the window and emit its result tuple.

AggregateById Aggregate Operator

After all the tuples corresponding to a result set enter the AggregateById operator, the split operator submits a copy of the initiating tuple to the lower pathway where it enters the SetCloseWindowId map operator.

The SetCloseWindowId Map Operator

This operator passes all input fields to its emitted tuple, adding a new field named WindowId. The value assigned to this field is (TickNum * 2) + 1, which ensures that WindowId will be an odd value.

SetCloseWindowId Map Operator

The modified tuple is then passed to the AggregateById operator. Since the value of the WindowId field is greater than the same field in the tuples derived from the result set generated in the upper pathway, the aggregate operator closes its window and emits its results. The aggregate operator opens a new window and begins its aggregate calculations.

The CombineStreams Union Operator

Both input streams to this operator must have the same schema. You must be certain that the schemas of the streams emitted by the ReadTable query operator and the SetCloseWindowId map operator are identical. Depending on which tuple and table fields you choose to emit from the ReadTable query operator, you may need to use SetCloseWindowId operator to include additional fields in its emitted tuple; simply initialize any additional fields with a null value.

The SelectTuple Filter Operator

This operator selects tuples in which the value of the WindowId field is even. These tuples were emitted by the AggregateById operator from tuples emitted by the ReadTable query operator. Tuples emitted by the SetCloseWindowId map operator will have odd WindowId values and will be dropped.

The predicate within the SelectTuple filter operator is:

  WindowId % 2==0
The TidyUp Map Operator

Use this operator to remove any unnecessary fields, for example, TickNum or WindowId from the result tuple.

The DataOut Output Stream

The schema associated with this stream is derived from the fields produced by the AggregateById operator.

Download Application File

 
You may download a StreamBase application illustrating these concepts. Right click on the hyperlink and save the .sbapp file to a convenient location; make certain you save the file with the extension .sbapp. Now import this application file into a StreamBase project.

 
 

Back to Top ^