Contents
This topic explains how the Aggregate operator works, and describes the actions you can take on each tab of the operator's Properties view.
The Aggregate operator is used to compute aggregations over moving windows of tuple values. Each window is a view on a part of the input data. For example, you could sum the volumes of trades of a stock over 30-second intervals. Or, you might calculate a moving average of prices for each of four successive trades of a given currency.
The Aggregate operator accepts a single input stream. It maintains the state of one or more windows according to policies defined in the operator's properties. You define window policies in one or more dimensions. A dimension specifies:
-
When a window closes. For example, a window might close when it contains some number of tuples or when some number of seconds elapse.
-
How much each new window advances relative to the previous one.
-
The type of aggregation to perform, which is an operation based on one of the following:
-
The number of tuples in the window
-
The time tuples arrive
-
A field in the input tuples
-
A predicate expression
-
When the conditions prescribed in the dimension's policies evaluate to true, the operator performs one or more calculations on the tuples
that currently occupy the Aggregate's window or windows, based on aggregate
expressions you specify in the Aggregate Functions tab. The calculations performed
can include aggregate functions, such as sum(Volume) or
avg(PricePerShare). You can also group output tuples
based on a particular input field, such as the value of a stock symbol field. The
result of the aggregation is emitted on the Aggregate operator's output port.
Note
Aggregate windows count tuples starting from zero.
The sections that follow describe the Aggregate operator's Properties view fields and settings. The examples at the end of this topic illustrate some Aggregate operations. For information about the available built-in aggregate functions, see Aggregate Functions Overview on the StreamBase Expression Language Functions page.
There is no substitute for hands-on experience, so we encourage you to run the installed Aggregate operator samples, which are described in the Operator Sample Group section of the Samples Guide. Also see the StreamBase demo applications by switching to the SB Demos Perspective in StreamBase Studio. Several of the demos use Aggregate operators.
In StreamBase, aggregate expressions can consist of:
-
Aggregate functions only, such as
count(.fieldname) -
A combination of simple expressions and one or more aggregate functions, such as
sum(.field-x+field-y) > 5000
Aggregate functions are stateful functions, which means they accumulate state internally during the course of their evaluation. Aggregate expressions are evaluated in two steps, increment and calculate.
In the increment step, new data is added to
the state of aggregate functions by evaluating their arguments. In the example
above, is evaluated as a simple
expression against the current row or tuple, with its result added to the state of
the field-x + field-ysum() aggregate function.
In the calculate step, aggregate functions return their result based on applying those functions to the accumulated state.
The results of an aggregate function might be further compared or combined (as seen
in the sum() > 5000 of
the second example above). In this case, the aggregate results are combined or
compared by evaluating the simple expression logic that combines them. In the
example, this means testing whether the accumulated result is greater than 5000.
For some aggregate expressions, the calculate step is performed for every value that is incremented into the expression's state. In other expressions, the calculate step is performed only once, after all the data has been incremented into the expression.
Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.
Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
The Dimensions tab allows you to add, edit, or remove dimensions from an Aggregate
operator. As explained above, a window can be opened or closed (or prevented from
opening or closing) when the conditions specified in a dimension evaluate to
true.
You can define more than one dimension to cover different run-time conditions. In the case of multiple dimensions, the Aggregate operator uses the first dimension whose rules can be applied. The selected dimension, and only that dimension, applies to all the Aggregate operator windows. Notice that even an Aggregate operator that specifies more than one dimension in the Dimensions tab still specifies a single set of aggregate functions in the Aggregate Functions tab. That is, you cannot in the same operator apply one aggregate function to one dimension, then a different aggregate function to another dimension.
The Dimensions tab shows the information specified for each dimension in a simple text summary, as seen in this example:
|
Open the Edit Dimension dialog in one of the following ways. In the Properties view for an Aggregate operator, in the Dimensions tab:
-
Double-click an existing dimension row.
-
Select an existing dimension row and click .
-
Click .
Then, in the resulting Edit Dimension dialog, select one of the values in the Type drop-down list:
|
The dialog then reconfigures itself to conform to the selected dimension type. The options are:
Important
StreamBase Systems recommends avoiding the time-based dimension for most applications. See the Caution on Aggregate Operator: Time-Based Dimension Options for an explanation and suggested alternatives.
Use the Aggregate Functions tab to specify one or more aggregate expressions that will be evaluated in each window. The result of each specified aggregate function is added as a field to the operator's output stream. You can add, replace, or remove fields from appearing in the output stream.
For a new Aggregate operator newly placed on the EventFlow canvas, the grid in this tab has a single entry with the following default values:
| Action | Field Name | Expression |
|---|---|---|
| Add | * | lastval(*) |
Note
In StreamBase releases before 7.0, the default expression function was firstval(*).
In the example below, the first expression calculates the average of prices in the window. The next two expressions capture the start and end time of the window (the argument indicates a time-based dimension).
|
The Aggregate Functions grid has the following editing features:
-
The grid is resizable. Grab the bottom row with the mouse to resize it within the Aggregate Functions tab to show fewer rows or to show more rows without scroll bars.
-
To add a grid row, use the green plus button, then select the type of action in the Action drop-down list.
-
Available Action column entries for each grid row are: Add, Replace, Remove, and Declare.
-
When using the Remove action, you must specify a field name, but you cannot enter an expression.
-
When using the Add and Replace actions, you must specify a field name, and you must enter an expression in the Expression field for that row.
-
Use the Declare action to define a local variable that has the narrow scope of this grid in this operator. The variable can then be used to save typing in expressions later in the same grid. Use the Field Name column to name your variable; use the Expression column to specify an expression that defines your variable. Expressions for declared variables are evaluated as necessary to compute the output fields that use the variable. In many cases expressions are evaluated once per output tuple. However, expressions for unused declared variables are not evaluated, and in some cases an expression for a declared variable may be evaluated multiple times.
-
Expressions can contain aggregate functions and constants. Besides field names that are part of aggregate function calls, direct references to input stream fields must be grouped, as described in Properties: Group Options Tab.
-
For a list of aggregate functions, see StreamBase Expression Language Functions.
The following table describes the buttons at the top of the Field grids.
| Button | Name | Description | |
|---|---|---|---|
|
Add |
Adds a row below the currently selected row, or to the end of the grid if
none are selected. Click the arrow on the button's right to specify whether
the row should be added above or below the currently selected row.
When you add a row, the newly created row is highlighted. To start entering information, click in the cell you want to edit. (Some cells are not user-editable.) |
|
|
Remove | Removes the currently selected row. Click the arrow on the button's right to remove all rows or all selected rows. | |
|
Move Up, Move Down | moves the selected row up by one row. moves the selected row down by one row. |
By default, aggregate windows are controlled by the dimensions you define. The Group Options tab allows you to further control windows by grouping on a column of input. Each group has its own independent windows based on one or more fields. For example, if you group on a field named ID, a separate window is created for each unique value of ID that the Aggregate operator receives. If you group on multiple fields, a separate window is created for each unique combination of values of all the group-by fields.
A group that you define here can also be referenced in an expression in the Aggregate Functions tab.
Click the button to add a row for each group-by that you want to define. In each row:
-
In the Output Field Name column, enter the name of the field as you want it to appear in this operator's output stream.
-
In the Expression column, enter the name of the input field that you want to group on, or enter an expression that resolves to an input field name.
In the following example, the windows generated by the operator's dimensions are to
be further subdivided into groups by unique received values of the Symbol field, and the resulting field in the output tuple is named
TotalForSymbol. You can suppress appending any
Group-By fields to the output tuple by selecting the
Omit Group By fields in output check box.
|
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Caution
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
In general, in an Aggregate operator window, null field values are not included in
the calculation. For example: in an average, such as avg(price) of ten tuples in an aggregate's window, if one of the
values is null, then the average is calculated for the remaining nine values. Null
handling for the andall() and orall()
functions does not follow this general rule.
Independent of the preceding paragraph, key fields with null values still constitute
a valid group for the purposes of grouping aggregate results. For example, let's say
a simple Aggregate operator averages prices for every three tuples with schema
{symbol, price}, and groups the results by symbol. If
three tuples arrive with price information but an empty symbol field, the operator still reports an average of those three
for the null group.
For more information, see Using Nulls.
This example demonstrates the basic Aggregate operation by tracing the windowing action in a simple application with these characteristics:
-
Each tuple in the input stream is based on a schema that contains a single integer field.
-
The Aggregate operator has a single tuple-based dimension. The dimension is set to advance each window by one tuple, and each window is set to close and emit after three tuples. There are no group-by settings.
-
The
sum()aggregating function calculates the total value of all integers in each window.
|
The following diagrams show a succession of tuples arriving on an input stream. At each step, we depict the arrival of a tuple, the windows that are open at that time, the aggregate function performed on each window, and any output that occurs.
-
When the first tuple arrives on the input stream, a window is immediately opened. Consider the following points:
-
The new window contains the data for the first tuple, but it has room for three tuples, since the dimension sets the window size to
3. -
The diagram shows the aggregation function that is performed on the window (even though here it is only a sum of one number). As more data is enqueued, the window keeps a running sum of tuple values.
-
Because the window is not full, no output occurs. Remember that the dimension is set to emit when the size is
3.
-
-
When a new tuple arrives, a second window is opened and the new tuple is loaded in both windows. This is in accordance with the dimension, which sets windows to advance by one tuple. If the advance had been set to
2, a new window would open every two tuples (in the next step) instead.Notice the positions of the two windows relative to the tuples in the input stream. For readability, the diagram represents each window separately. In fact, both windows operate on the same data. A more realistic representation might show a succession of overlapping windows moving over a single stream of data.
Also notice that the aggregate result changes for Window 1, and that there is still no output from the operator.
-
When the third tuple arrives, a new window is created and all the windows are updated. For the first time, we see a tuple released on the operator's output port. The output tuple contains the aggregate result from Window 1, which is now full. As soon as the output tuple is emitted, the Aggregate operator closes the window.
-
When the fourth tuple arrives, a new window is started, as before. Notice that Window 1, which we said was closed in the previous step, is gone now. At the same time, the new tuple causes Window 2 to reach its size limit. This triggers the emission of Window 2's aggregate result in an output tuple, and Window 2 is closed.
-
The last step shown is just like the preceding one: input causes a window to become full and release its aggregate result on the output port. And again, a new window opens one tuple ahead of the preceding window,
This example demonstrates a dimension and related Aggregate function that generate a sequential count of received tuples on the output stream. In current StreamBase releases, the same functionality would be better handled with the Sequence operator, but this example is nevertheless instructive.
|
In this example, notice that we selected these two options:
-
Opening policy: Do not open window based on this dimension
-
Window size: Do not close window based on this dimension
That combination of settings creates a window that never closes (while StreamBase Server is running, of course). Also notice the setting of the Emission policy, which gives you the option of emitting output before the window closes. Here, the policy is set to Intermediate emission every 1 tuple. This means that, as every tuple arrives, one tuple is emitted.
In the Aggregating Functions tab, the operator is set to use the count() aggregate function:
|
As each tuple arrives in the Aggregate operator, it is counted. Because each tuple
causes a tuple to be emitted, the emitted tuple contains a value in the sequence_num field that is one greater than the previous tuple.
To understand how the count() function is used here,
and for further information on aggregate functions in general, see the Aggregate Functions
Overview section of the Expression Language Functions topic in the StreamBase References.
Look for further examples of using the Aggregate operator in the Operator Sample Group provided with your StreamBase installation.
-
StreamSQL: CREATE WINDOW Statement, StreamSQL Tutorial
