Aggregate Operator Field Dimension Sample

This topic describes AggregateByField.sbapp, the Aggregate operator field dimension sample.

Importing This Sample into StreamBase Studio

In StreamBase Studio, import this sample with the following steps:

  • From the top menu, click FileLoad StreamBase Sample.

  • Select operator from the Applications list.

  • Click OK.

StreamBase Studio creates a single project for the operator samples.

Sample Location

By default, the sample files are installed in:

  • On Windows: C:\Program Files\StreamBase Systems\StreamBase.n.m\sample\operator

  • On UNIX: /opt/streambase/sample/operator

When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace. StreamBase Systems recommends that you use the workspace copy of the sample, especially on UNIX, where you may not have write access to /opt/streambase. In the default installation, the path to this sample in your Studio workspace is:

UNIX:       
  ~/streambase-studio-n.m-workspace/sample_operator
Windows XP:
  C:\Documents and Settings\username\My Documents\StreamBase Studio n.m Workspace\
      sample_operator
Windows Vista:
  C:\Users\username\Documents\StreamBase Studio n.m Workspace\
      sample_operator

Running the Field-Based Aggregate Sample

In this sample, the Aggregate operator uses a field-based dimension to sum the volume of trades in a particular stock over 30 second windows, advancing every 30 seconds. It also ensures that the aggregation works for intermixed tuples for different stocks.

Running AggregateByField.sbapp in StreamBase Studio

  1. In the Package Explorer, double-click to open the AggregateByField.sbapp application. Make sure the application is the currently active tab in the EventFlow Editor.

  2. Click the Run button. This opens the SB Test/Debug perspective and starts the application.

  3. In the Application Output view, select the OutputChunkedTrades output stream. No output is displayed at this point, but the dequeuer is prepared to receive output. This view will eventually show the output of the application.

  4. In the Manual Input view, enter 7, AMAT, and 100 in the Time, Symbol, and Volume fields, respectively. Note that strings in StreamBase are case sensitive, so "AMAT" is not the same as "amat".

  5. Click Send Data, and observe that no output is displayed yet in the Application Output view.

  6. Enter 10, AMAT, and 100 in the Time, Symbol, and Volume fields, respectively.

  7. Click Send Data, and observe that no output is displayed yet in the Application Output view.

  8. Enter 20, AMAT, and 200 in the Time, Symbol, and Volume fields, respectively.

  9. Click Send Data, and observe that no output is displayed yet in the Application Output view.

  10. Enter 40, AMAT, and 100 in the Time, Symbol, and Volume fields, respectively.

  11. Click Send Data, and observe this line in the Application Output view:

    Symbol=AMAT, TimeChunk=0, TotalVolume=400

    Tip

    If output data is too long to easily see in the Application Output table, click a row to display its field data in the Display Fields pane below the table.

  12. Enter 41, AMAT, and 100 in the Time, Symbol, and Volume fields, respectively.

  13. Click Send Data, and observe that no output is displayed in the Application Output view.

  14. Enter 45, INTC, and 100 in the Time, Symbol, and Volume fields, respectively.

  15. Click Send Data, and observe that no output is displayed in the Application Output view.

  16. Enter 50, AMAT, and 200 in the Time, Symbol, and Volume fields, respectively.

  17. Click Send Data, and observe that no output is displayed in the Application Output view.

  18. Enter 55, INTC, and 300 in the Time, Symbol, and Volume fields, respectively.

  19. Click Send Data, and observe that no output is displayed in the Application Output view.

  20. Enter 65, AMAT, and 100 in the Time, Symbol, and Volume fields, respectively.

  21. Click Send Data, and observe these lines in the Application Output view.

    Symbol=AMAT, TimeChunk=30, TotalVolume=400
    Symbol=INTC, TimeChunk=30, TotalVolume=400

  22. When done, press F9 or click the Stop Running Application button.

Running AggregateByField.sbapp in Terminal Windows

This section describes how to run the sample in UNIX terminal windows or Windows command prompt windows. On Windows, be sure to use the StreamBase Command Prompt from the Start menu as described in the Test/Debug Guide, not the default command prompt.

  1. Open three terminal windows on UNIX, or three StreamBase Command Prompts on Windows. In each window, navigate to the directory where the sample is installed, or to your workspace copy of the sample, as described above.

  2. In window 1, type: sbd AggregateByField.sbapp.

    The window shows: .

    notice[StreamBaseServer] listening on port 10000

  3. In window 2, type sbc dequeue OutputChunkedTrades.

    No output is displayed at this point, but the dequeuer is prepared to receive output. This window will eventually show the output of the application.

  4. In window 3, type sbc enqueue TradesIn.

    The sbc command is now awaiting keyboard input. Then type:

    7,AMAT,100

    No output is displayed yet in the dequeue window.

  5. Type:

    10,AMAT,100

    No output is displayed yet in the dequeue window.

  6. Type:

    20,AMAT,200

    No output is displayed yet in the dequeue window.

  7. Type:

    40,AMAT,100

    Observe this line in the dequeue window: AMAT,0,400

  8. Type:

    41,AMAT,100

    No output is displayed in the dequeue window.

  9. Type:

    45,INTC,100

    No output is displayed in the dequeue window.

  10. Type:

    50,AMAT,200

    No output is displayed in the dequeue window.

  11. Type:

    55,INTC,300

    No output is displayed in the dequeue window.

  12. Type:

    65,AMAT,100

    Observe these lines in the dequeue window:

    AMAT,30,400
    INTC,30,400

  13. Press Control-Z (Windows) or Control-D (UNIX).

    The sbc process will exit.

  14. In window 3, type the following command to terminate the server and dequeuer:

    sbadmin shutdown

How We Created the AggregateByField Sample

  1. Launched StreamBase Studio

  2. Created (or subsequently used) the sample_operator project.

  3. From the top menu, in the SB Authoring perspective, selected FileNewEventFlow Application. Selected the sample_operator project and entered AggregateByField for the diagram name.

  4. Created an input stream:

    1. Dragged an input stream from the palette to the EventFlow Editor.

    2. Clicked the stream on the EventFlow Editor, which invoked the Input Stream Properties view.

    3. On the General tab, Name: TradesIn

    4. On the Edit Schema tab, added

      • Field Name: Time, Type: int

      • Field Name: Symbol, Type: string, Size:5

      • Field Name: Volume, Type: int.

  5. Created an Aggregate operator:

    1. Dragged an Aggregate operator from the palette to the EventFlow Editor.

    2. On the General tab, Name: AggregateTrades

    3. Connected the TradesIn input stream to the AggregateTrades operator.

  6. Set up the AggregateTrades operator:

    1. On the Dimension tab, clicked the Add button. In the Edit Dimension dialog, added:

      Name: AggregateTradesDim

      Type: field. In the pull-down list of possible value fields, selected Time.

      The buffer set up for each window will contain a set of tuples based on the value of the Time field. When a tuple arrives whose field value exceeds the range of the open window, any desired calculations will take place, a tuple containing the desired results will be emitted, and the window will be closed. (See also the Aggregate Operator Tuple Dimension Sample.)

      Opening policy: Open per: Advance: 30 and Offset: 0

      This indicates that a window should be open for a group of tuples. An Advance value of 30 will advance the window by 30 seconds. In this case, where the "Close and emit" value is also 30 seconds, only one window will be open at a time for each group (see "Group by" below). If Advance were chosen to be 15, then windows would be created every 15 seconds and stay open for 30 seconds, overlapping each other.

      Window size: Close and emit every 30

      This indicates each window is open for 30 seconds (as reflected in the Time field for each tuple), and its buffer contains the tuples whose "Time" field falls within that 30 second period.

      Emission policy: Selected "No intermediate emissions based on this dimension."

      Emission policy allows tuples to be emitted before the window closes. For example, one could emit a tuple every second during the 30-second window, rather than waiting for the window to close.

      Optional windows Unchecked the Open windows before first tuple checkbox.

      When set, this option creates windows which encompass the values that would have occurred before the arrival of the first tuple. For example, where Advance is less than Size, additional windows would be opened to include the first tuple; these windows would start before the "Time" in the first tuple.

      At this point our Edit Dimension dialog looks like this:

      Clicked OK.

    2. On the Aggregate Functions tab, unchecked the delta option, Output all input fields. Then added:

      Output Field Name: TimeChunk

      Expression: openval('AggregateTradesDim')

      Returns the lower boundary of the window. In this case, because the dimension's offset is 0, each window starts at a multiple of 30 seconds.

      Output Field Name: TotalVolume

      Expression: sum(Volume)

      Calculates the total volume for all the trades represented by tuples in the window. For details about the available aggregate functions, see the StreamBase Expression Language and Functions topic in the Authoring Guide.

      For example:

    3. On the Group Options tab, added:

      Output Field Name: Symbol

      Expression: Symbol

      Creates a window for each set of tuples whose value for the Symbol field is the same.

      For example:

      Note that the Output Field Name can be different from the input field in the Expression column, as when the Expression is more complicated than just a field value.

    That completed the definition of this field-based aggregate operator.

  7. Created an output stream:

    1. Dragged an output stream from the palette to the EventFlow Editor.

    2. On the General tab, Name: OutputChunkedTrades

    3. Connected the AggregateTrades operator to the OutputChunkedTrades output stream.