Aggregate Operator Time Dimension Sample

This topic describes AggregateByTime.sbapp, the Aggregate operator time dimension sample.

Importing This Sample into StreamBase Studio

In StreamBase Studio, import this sample with the following steps:

  • From the top menu, click FileLoad StreamBase Sample.

  • Select operator from the Applications list.

  • Click OK.

StreamBase Studio creates a single project for the all operator samples.

Sample Location

By default, the sample files are installed in:

  • On Windows: C:\Program Files\StreamBase Systems\StreamBase.n.m\sample\operator

  • On UNIX: /opt/streambase/sample/operator

When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace. StreamBase Systems recommends that you use the workspace copy of the sample, especially on UNIX, where you may not have write access to /opt/streambase. In the default installation, the path to this sample in your Studio workspace is:

UNIX:       
  ~/streambase-studio-n.m-workspace/sample_operator
Windows XP:
  C:\Documents and Settings\username\My Documents\StreamBase Studio n.m Workspace\sample_operator
Windows Vista:
  C:\Users\username\Documents\StreamBase Studio n.m Workspace\sample_operator

Running the Aggregate Time-Based Sample

This sample demonstrates use of the time-based Aggregate operator. The time-based aggregate uses elapsed time to manage windows. This example uses 2-second windows to compute average price per share of symbols.

Running AggregateByTime.sbapp in StreamBase Studio

  1. In the Package Explorer, double-click to open the AggregateByTime.sbapp application. Make sure the application is the currently active tab in the EventFlow Editor.

  2. Click the Run button. This opens the SB Test/Debug perspective and starts the application.

  3. No output is displayed in the Application Output view yet, but the dequeuer is prepared to receive output. This view will eventually show the output of the application.

  4. In the Feed Simulations view, right click on AggregateByTime.sbfs and select Run Feed Simulation.

  5. Observe application output in the Application Output view. The format is similar to the following:

    Symbol=MSFT, TimeBasedAverage=26.05, start_time=2007-04-26 13:05:26.000-0400, 
        end_time=2007-04-26 13:05:28.000-0400
    Symbol=AAPL, TimeBasedAverage=43.95, start_time=2007-04-26 13:05:26.000-0400, 
        end_time=2007-04-26 13:05:28.000-0400

    Tip

    If output rows are too long to see all the data, click a row to display its fields in the Display Fields pane below the table.

    Average, start, and end times will all vary depending on the rate of input. For example, the first time you run the sample, the first tuple may occur just before the 2-second boundary and the second time it may occur just after the 2-second boundary. The averages will be different because the tuples are seen in different windows.

  6. When done, press F9 or click the Stop Running Application button.

Running AggregateByTime.sbapp in Terminal Windows

This section describes how to run the sample in UNIX terminal windows or Windows command prompt windows. On Windows, be sure to use the StreamBase Command Prompt from the Start menu as described in the Test/Debug Guide, not the default command prompt.

  1. Open three terminal windows on UNIX, or three StreamBase Command Prompts on Windows. In each window, navigate to the directory where the sample is installed, or to your workspace copy of the sample, as described above.

  2. In window 1, type:

    sbd AggregateByTime.sbapp

    The window shows:

    notice[StreamBaseServer] listening on port 10000

  3. In window 2, type:

    sbc dequeue AvgOut

    No output is displayed at this point, but the dequeuer is prepared to receive output. This window will eventually show the output of the application.

  4. In window 3, type:

    sbfeedsim AggregateByTime.sbfs

    Data starts flowing into the application.

    Observe several lines in the dequeue window: The format is similar to the following:

    MSFT,26.05,2007-04-26 13:29:52.000-0400,2007-04-26 13:29:54.000-0400
    AAPL,43.95,2007-04-26 13:29:52.000-0400,2007-04-26 13:29:54.000-0400

    Average, start, and end times will all vary depending on where the tuples fall relative to the wall-clock time. For example, the first time you run the sample, the first tuple may occur just before the 2-second boundary and the second time may occur just after the 2-second boundary. The averages will be different because the tuples occur in different windows. The start times and end times will of course be different.

  5. In window 3, type sbadmin shutdown to terminate the daemon and dequeuer.

How We Created the AggregateByTime Sample

  1. Launched StreamBase Studio.

  2. Created (or subsequently used) the sample_operator project.

  3. From the top menu, in the SB Authoring perspective, selected FileNewEventFlow Application. Selected the sample_operator project and entered AggregateByTime as the application name.

  4. Created an Input Stream:

    1. Dragged an Input Stream from the palette to the EventFlow Editor.

    2. Clicked the stream on the EventFlow Editor, which invoked the Input Stream Properties dialog window.

    3. On the General tab, Name: TradesIn

    4. On the Edit Schema tab, added:

      • Field Name: Symbol, Type: string, Size: 5

      • Field Name: PricePerShare, Type: double

    5. Clicked OK.

  5. Created an Aggregate operator:

    1. Dragged an Aggregate operator from the palette to the EventFlow Editor.

    2. On the General tab, Name: AggregateAvgByTime.

    3. Connect the TradesIn input stream to the AggregateAvgByTime operator.

  6. Set up the AggregateAvgByTime operator:

    1. On the Dimension tab, clicked the Add button. In the Edit Dimension dialog, added:

      Name: AggregateAvgByTimeDim

      Type: time

      The window will be established and evaluated based on elapsed time . When the specified time is elapsed, any desired calculations will take place, a tuple containing the desired results will be emitted, and the window will be closed. (See also the Aggregate Operator Field Dimension Sample.)

      Opening policy: Open per: Advance: 2 and Offset: 0

      This indicates that a window should be opened for the time events. An Advance value of 2 advances the window by two seconds. Since the advance value is the same as window size, windows will not overlap.

      Window size: Close and emit every 2

      The number of seconds after which the window will be closed.

      Emission policy: Selected "No intermediate emissions based on this dimension."

      "Emission policy" allows tuples to be emitted before the window closes. For example, one could emit a tuple every second during the 2-second window, rather than waiting for the window to close.

      Optional windows Not applicable for time-based windows.

      At this point, the Edit Dimensions dialog for AggregateAvgByTimeDim looks like this:

    2. On the Aggregate Functions tab, unchecked the delta option, Output all input fields. Then added:

      Output Field Name: TimeBasedAverage

      Expression: avg(PricePerShare)

      Calculates the average price per share of all the tuples in the window. For details about the available aggregate functions, see the StreamBase Expression Language and Functions topic in the Authoring Guide.

      Output Field Name: start_time

      Expression: openval('AggregateAvgByTimeDim')

      Shows the lower boundary of the window (not necessarily the actual time of the first tuple when the window opens).

      Output Field Name: end_time

      Expression: closeval('AggregateAvgByTimeDim')

      Shows the upper boundary of the window (not necessarily the actual time of the last tuple when the window closes).

      For example:

    3. On the Group Options tab, added:

      Output Field Name: Symbol

      Expression: Symbol

      Creates a window for each set of tuples whose value for the Symbol field is the same. Note that the Output Field Name need not be the same as the input field in the expression. This is most useful when the expression is more complicated than just a field value.

  7. Created an output stream:

    1. Dragged an output stream from the palette to the EventFlow Editor.

    2. On the General tab, Name: AvgOut

    3. Connected the AggregateTrades operator to the AvgOut output stream.