Using the Aggregate Operator

This topic explains how the Aggregate operator works, and describes the actions you can take on each tab of the operator's Properties View.

Introduction

The Aggregate operator is used to compute aggregations over moving windows of tuple values. Each window is a view on a part of the input data. For example, you could sum the volumes of trades of a stock over 30 second intervals. Or, you might calculate a moving average of prices for each of four successive trades of a given stock.

The Aggregate operator accepts a single input stream. It maintains one or more windows according to policies that are defined in the operator's properties. You define window policies in one or more dimensions. A dimension specifies:

  • When a window closes. For example, when it contains some number of tuples or some number of seconds elapse.

  • How much each new window advances relative to the previous one.

  • The type of aggregation to perform: an operation based on one of the following:

    • The number of tuples in the window

    • The time tuples arrive

    • A field in the input tuples

When the conditions prescribed in the dimension's policies are true, the operator performs one or more calculations on the tuples that currently occupy the Aggregate's window, based on expressions that you specify. The calculations performed can include aggregating functions, such as sum(Volume) or avg(PricePerShare). You can also group tuples based on the a particular input field (for example, the value of a stock's Symbol field). The result of the aggregation is released on the Aggregate operator's output port.

The sections that follow describe the Aggregate Property view fields and settings in detail. The examples at the end illustrate some Aggregate operations. For information about the available built-in aggregating functions, see the Aggregating Functions section of the StreamBase Expression Language and Functions topic.

There is no substitute for hands-on experience, so we encourage you to run the installed Aggregate operator samples, which are described in the Samples Guide. Also see the StreamBase demo applications by switching to the Demo Perspective in StreamBase Studio. Several of the demos use Aggregate operators.

General Tab

Name: Every application component must have a unique name. Use this field to specify or change the component's name. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.

Enable Error Output Port: Check this box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.

Description: Optionally, enter a description to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.

Dimensions Tab

The Dimensions tab allows you to add, edit, or remove dimensions from an Aggregate operator. When you click the Add button to define a new dimension, or select an existing dimension and then click the Edit button, StreamBase Studio displays its Edit Dimension dialog.

As explained in the Introduction, a window can be opened or closed (or prevented from opening or closing) when the conditions specified in the dimension are true.

You can define more than one dimension to cover different run-time conditions. In that case, the Aggregate operator uses the first dimension whose rules can be applied. The selected dimension, and only that dimension, applies to all the Aggregate operator windows. Note that even a multi-dimension Aggregate contains a single set of one or more aggregating functions that are applied to one or more fields in the Aggregate's windows. That is, you cannot apply one type of aggregating function on one dimension, and a different type of aggregating function on another dimension.

In the Edit Dimension dialog, first choose the Type option, which specifies how the aggregation should be done:

Aggregate Functions Tab

The Aggregate Functions tab allows you to specify one or more aggregate expressions that will be evaluated in each window. You can define output fields using either or both of two methods:

  • Select Output all input fields to automatically use the input fields in the output schema. Then, configure the output fields:

    • Specify an aggregate function. The dropdown control contains firstval by default, and you can also choose the lastval function: both of these functions work with all fields. Alternatively, you can enter a different aggregate function that applies to all data types in the input schema. For example, you should not use the avg function if one of the schema fields is a string.

    • Specify a prefix to add to the input field name to form the name of the corresponding output field. By default, the prefix is input_. Optionally, you can change the prefix or omit it.

    If the Output all input fields option is not selected, input fields are not automatically included in the output, and you must specify the output fields you want. Whether or not this option is selected, you can add output fields manually, as described in the next step.

  • Click the Add button to manually add output fields. In each row, define an expression for the output field.

In the example below, the first expression calculates the average of prices in the window and multiplies it by two. The next two expressions capture the start and end time of the window (the argument indicates a time-based dimension). The last expression sums the stock symbols in the window.

Expressions can contain aggregate functions and constants. Besides field names that are part of aggregate function calls, direct references to input stream fields must be grouped, as described in the Group Options Tab section.

For details about aggregate expressions and available functions, see the StreamBase Expression Language and Functions topic in the Authoring Guide.

Group Options Tab

By default, aggregate windows are controlled by the dimensions you define. The Group Options tab allows you to further control windows by grouping on a column of input. Each group will have its own independent windows based on one or more fields. For example, if you group on a field named id, a separate window will be created for each unique value of id that the Aggregate operator receives. If you group on multiple fields, a separate window will be created for each unique combination of values of all the fields.

A group that you define here can also be referenced in an expression in the Aggregate Functions Tab.

Click the Add button to add a row for each group you want to define. In each row:

  • In the Output Field Name column, enter the name you want to appear in the output stream of the operator for this field.

  • In the Expression column, enter the input field that you want to group on.

In the following example, the windows generated by the operator's dimensions will be further subdivided into groups by unique values of Symbol; the resulting output field will also be named Symbol.

Dynamic Variables Tab

The Dynamic Variables tab allows you to define variables for this operator that can then be used in one of its expressions. A dynamic variable can be updated by any input stream or output stream in your application. For more information, see Using Dynamic Variables.

Concurrency Tab

Run this component in a separate thread

This option causes the server to process the component's requests concurrently with other processing in the application. You can distribute the processing of the threads automatically across multiple processors on an SMP machine.

If this is a compute-intensive component and you know that it can run without data dependencies on other components in the StreamBase application, you may be able to improve performance by enabling this option.

Caution

These features are not suitable for every application. For details, see Execution Order, Concurrency, and Parallelism. It includes important guidelines for the use of these features.

Run in parallel threads

If you checked the first option, you can also choose this option, which causes the server to run multiple instances of this component. That is, each instance runs in its own thread. At run time, tuples are dispatched to particular instances based on the Key Expression value (which must evaluate to an int).

Null Values

In an Aggregate operator window, if a field's value is null, the null value is not included in the calculation. For example: an average, such as avg(price) of ten tuples in an aggregate's window, if one of the values is null, then the average is calculated for the nine values.

Key fields that are null will be accepted as a group. In an aggregate's window, the aggregating function calculates a value, but it may be for a grouped set of tuples that have an unknown value (for example, if the stock's Symbol was missing). Also, tuples with the key fields set to null will be dropped (not figured into the aggregate's calculation).

For more information, see Using Nulls.

Example 1: Simple Aggregate Operation

This example demonstrates the basic Aggregate operation by tracing the windowing action in a simple application with these characteristics:

  • Each tuple in the input stream contains a single integer.

  • The Aggregate operator has a single, tuple-based dimension. The dimension is set to advance each window by one tuple, and each window is set to close and emit after 3 tuples.

  • The sum() aggregating function calculates the total value of all the integers in each window.

The following diagrams show a succession of tuples arriving on an input stream. At each step, we depict the arrival of a tuple, the windows that are open at that time, the aggregate function performed on each window, and any output that occurs.

  1. When the first tuple arrives on the input stream, a window is immediately opened. You should notice a few things:

    • The new window contains the data for the first tuple, but it has "room" for three tuples: recall that the dimension sets the window size to 3.

    • The diagram shows the aggregation function that is performed on the window (even though here it is only a sum of one number). As more data is enqueued, the window will keep a running sum of tuple values.

    • Because the window is not "full", no output occurs. Recall that the dimension is set to emit when the size is 3

  2. When a new tuple arrives, a second window is opened and the new tuple is loaded in both windows. This is in accordance with the dimension, which sets windows to advance by one tuple. If the advance had been set to 2, a new window would open every two tuples (in the next step) instead.

    Notice the positions of the two windows relative to the tuples in the input stream. For readability, the diagram represents each window separately. In fact, both windows operate on the same data. A more realistic representation might show a succession of overlapping windows moving over a single stream of data.

    Also notice that the aggregate result changes for Window 1, and that there is still no output from the operator.

  3. When the third tuple arrives, a new window is created and all the windows are updated. For the first time, we see a tuple released on the operator's output port. The output tuple contains the aggregate result from Window 1, which is now full. As soon as the output is released, the Aggregate operator closes the window.

  4. When the fourth tuple arrives, a new window is started, as before. Notice that Window 1, which we said was closed in the previous step, is gone now. At the same time, the new tuple causes Window 2 to reach its size limit. This triggers the release of Window 2's aggregate result in an output tuple, and Window 2 is closed.

  5. The last step shown is just like the preceding one: input causes a window to become full and release its aggregate result on the output port. And again, a new window opens one tuple ahead of the preceding window,

Example 2: Counting Tuples

This example demonstrates a dimension and related Aggregate function that generate a sequence (a count) on the output stream.

In this example, notice that we have selected these two options:

  • Opening policy: Do not open window based on this dimension

  • Window size: Do not close window based on this dimension

That combination of settings creates a window that never closes (while the StreamBase Server is running, of course). Also notice the setting of the Emission policy, which gives you the option of emitting output before the window closes. Here, the policy is set to Intermediate emission every 1. This means that, as every tuple arrives, one tuple is emitted with the sequence number.

Now, look at the Aggregating Functions tab in the operator's Properties View, which is set to use the count aggregating function:

As each tuple arrives in the aggregate operator, it is counted. Because each tuple causes a tuple to be emitted, the emitted tuple contains a value in the sequence_num field that is 1 greater than the previous tuple. To understand how we have used the count() function (and for details about all the aggregate functions), see the StreamBase Expression Language and Functions topic in the Authoring Guide.

Note

More Aggregate examples are available the StreamBase Studio sample operator applications.