Join Operator Sample

This sample uses a Join operator to combine trades from a Reuters and a Comstock feed, where the stocks are the same and the price per share is greater than or equal to $1, over a period of 60 seconds. The input streams contain the fields Symbol, PricePS_R (per share), and Time.

Importing This Sample into StreamBase Studio

In StreamBase Studio, import this sample with the following steps:

  • From the top menu, click FileLoad StreamBase Sample.

  • Select operator from the Applications list.

  • Click OK.

StreamBase Studio creates a single project for the operator samples.

Sample Location

By default, the sample files are installed in:

  • On Windows: C:\Program Files\StreamBase Systems\StreamBase.n.m\sample\operator

  • On UNIX: /opt/streambase/sample/operator

When you load the sample into StreamBase Studio, Studio copies the sample project's files to your Studio workspace. StreamBase Systems recommends that you use the workspace copy of the sample, especially on UNIX, where you may not have write access to /opt/streambase. In the default installation, the path to this sample in your Studio workspace is:

UNIX:       
  ~/streambase-studio-n.m-workspace/sample_operator
Windows XP:
  C:\Documents and Settings\username\My Documents\StreamBase Studio n.m Workspace\
      sample_operator
Windows Vista:
  C:\Users\username\Documents\StreamBase Studio n.m Workspace\
      sample_operator

Running Join.sbapp in StreamBase Studio

  1. In the Package Explorer, double-click to open the Join.sbapp application. Make sure the application is the currently active tab in the EventFlow Editor.

  2. Click the Run button. This opens the SB Test/Debug perspective and starts the application.

  3. In the Application Output view, select the JoinOut output stream. No output is displayed at this point, but the dequeuer is prepared to receive output. This view will eventually show the output of the application.

  4. In the Manual Input view, select the ReutersIn input stream.

  5. Enter intc, 23.0, and 10 in the Symbol_R, PricePS_R, and Time_R fields, respectively.

  6. Click Send Data, and observe that no output is displayed yet in the Application Output view.

  7. Select the ComstockIn input stream.

  8. Enter intc, 23.5, and 20 in the Symbol_C, PricePS_C, and Time_C fields, respectively.

  9. Click Send Data, and observe that no output is displayed yet in the Application Output view.

  10. Select the ReutersIn input stream.

  11. Enter intc, 24.5, and 30 in the Symbol_R, PricePS_R, and Time_R fields, respectively.

  12. Click Send Data, and observe this data appears in the Application Output view

    DeltaPricePS=1.0, PricePS_R=24.5, PricePS_C=23.5, Time_R=30
    
  13. Select the ComstockIn input stream.

  14. Enter amat, 34.5, and 40 in the Symbol_C, PricePS_C, and Time_C fields, respectively.

  15. Click Send Data, and observe that no further output is displayed yet in the Application Output view.

  16. Enter intc, 26.5, and 50 in the Symbol_C, PricePS_C, and Time_C fields, respectively.

  17. Click Send Data, and observe this data appears in the Application Output view

    DeltaPricePS=-3.5, PricePS_R=23.0, PricePS_C=26.5, Time_R=10
    DeltaPricePS=-2.0, PricePS_R=24.5, PricePS_C=26.5, Time_R=30
    
  18. Select the ReutersIn input stream.

  19. Enter amat, 32.5, and 60 in the Symbol_R, PricePS_R, and Time_R fields, respectively

  20. Click Send Data, and observe this data appears in the Application Output view

    DeltaPricePS=-2.0, PricePS_R=32.5, PricePS_C=34.5, Time_R=60
    
  21. Select the ComstockIn input stream.

  22. Enter intc, 27.5, and 100 in the Symbol_C, PricePS_C, and Time_C fields, respectively.

  23. Click Send Data, and observe that no further output is displayed yet in the Application Output view.

  24. When done, press F9 or click the Stop Running Application button.

Running Join.sbapp in Terminal Windows

This section describes how to run the sample in UNIX terminal windows or Windows command prompt windows. On Windows, be sure to use the StreamBase Command Prompt from the Start menu as described in the Test/Debug Guide, not the default command prompt.

  1. Open three terminal windows on UNIX, or three StreamBase Command Prompts on Windows. In each window, navigate to the directory where the sample is installed, or to your workspace copy of the sample, as described above.

  2. In window 1, enter:

    sbd Join.sbapp

    A message indicates that the StreamBase Server is listening and waiting for tuples.

  3. In window 2, enter:

    sbc dequeue JoinOut

    No output is displayed at this point, but the dequeuer is prepared to receive output. This window will eventually show the output of the application.

  4. In window 3, enter:

    sbc enqueue ReutersIn

    The sbc command is now awaiting keyboard input in window 3.

  5. Enter these two lines, pressing Enter after each:

    intc, 23.0, 10

    intc, 23.5, 20

    No output is displayed yet in the dequeue window.

  6. Enter a new line:

    intc, 24.5, 30

    Observe this output in the dequeue window:

    1,24.5,23.5,30
    
  7. Enter:

    amat, 34.5, 40

    No further output is displayed yet in the dequeue window.

  8. Enter:

    intc, 26.5, 50

    Observe this output in the dequeue window:

    -3.5,23,26.5,10
    -2,24.5,26.5,30
    
  9. Enter:

    amat, 32.5, 60

    Observe this output in the dequeue window:

    -2,32.5,34.5,60
    
  10. Enter:

    intc, 27.5, 100

    No output is displayed yet in the dequeue window.

  11. In window 2, type: Ctrl-Z (Windows) or Ctrl-D (UNIX) to exit the sbc session.

  12. In window 3, type the following command to terminate the server and dequeuer:

    sbadmin shutdown

Back to Top ^

How We Created the Join Sample

  1. Launched StreamBase Studio.

  2. Created (or subsequently used) the sample_operator project.

  3. From the top menu, in the SB Authoring perspective, selected FileNewEventFlow Application. Selected the sample_operator project, entered Join for the file name.

  4. Created an input stream:

    1. Dragged an input stream from the palette to the canvas.

    2. Clicked it on the canvas, which invoked the Input Stream Properties view.

    3. On the General tab, Name: ReutersIn

    4. On the Edit Schema tab, added

      • Field Name: Symbol_R, Type: string, Size: 5

      • Field Name: PricePS_R, Type: double

      • Field Name: Time_R, Type: int

  5. Created a second input stream:

    1. Dragged an input stream from the palette to the canvas.

    2. Clicked it on the canvas, which invoked the Input Stream Properties view.

    3. On the General tab, Name: ComstockIn

    4. On the Edit Schema tab, added

      • Field Name: Symbol_C, Type: string, Size:5

      • Field Name: PricePS_C, Type: double

      • Field Name: Time_C, Type: int

  6. Created a Join operator:

    1. Dragged a Join operator from the palette to the canvas.

    2. On the General tab, Name: Join

    3. Connected the ReutersIn input stream to the top input port of the Join operator and the ComstockIn input stream to the bottom input port.

  7. Set up the Join operator:

    1. On the Join Settings tab:

      • Join by: values

      • Tuple timeout: We did not enter a value in this sample. The default is no timeout.

        The tuple timeout is the maximum time that a tuple can stay in the window. When the time expires, the tuple is flushed from the buffer.

      • Predict: Symbol_R ==Symbol_C && abs(PricePS_R - PricePS_C) >= 1.0

    2. On the Value-based Settings tab:

      • Ordering field:

        • First Stream: Time_R

        • Second Stream: Time_C

          Each input stream has one window that contains a set of tuples based on the values of the two "Ordering" fields. When a tuple arrives in either buffer with a value that is within the range of one of the tuples in the other window (as reflected in the "Ordering field"), any desired calculations will take place, and a tuple containing the results and other fields of interest will be emitted. Note that a tuple may be matched more than once. Tuples are dropped from their respective windows when there can be no more tuples that will join with them on the other input stream, or when a tuple from the opposite stream goes into the matching window that is beyond the Ordering field value. (The other option is Join by: tuples.)

      • Join Range:

        • First Stream Ordering Field - 60

        • First Stream Ordering Field + 60

        In this sample, we set the range values to 60 seconds. That is, tuples are considered matched when their "Ordering" fields are within 60 seconds of each other.

    3. On the Output Settings tab, selected the explicitly specified fields option, and added:

      Output Field Name: DeltaPricePS Expression: PricePS_R - PricePS_C

      Output Field Name: PricePS_R Expression: PricePS_R

      Output Field Name: PricePS_C Expression: PricePS_C

      Output Field Name: Time_R Expression: Time_R

      This operator outputs the four fields above. Note that new fields may be added as functions of the other fields (as done, for example, by DeltaPricePS)

  8. Created an output stream:

    1. Dragged an output stream from the palette to the canvas.

    2. On the General tab, Name: JoinOut

    3. Connected the Join operator to the JoinOut output stream.

Back to Top ^