Contents
This sample uses two Heartbeat operators to prevent a Merge operator from starving when either of its inputs stop receiving tuples. Without the Heartbeat operators, the Merge operator would stop outputting tuples as soon as either of its two input streams is inactive, even if it continued to receive tuples on the other input stream.
This sample also demonstrates how to flag the non-heartbeat tuples on an input stream, so that the heartbeat tuples can be filtered out later on.
In StreamBase Studio, import this sample with the following steps:
-
From the top menu, click → .
-
Select operator from the Applications list.
-
Click OK.
StreamBase Studio creates a single project for the operator samples.
By default, the sample files are installed in:
-
On Windows:
C:\Program Files\StreamBase Systems\StreamBase.n.m\sample\operator -
On UNIX:
/opt/streambase/sample/operator
When you load the sample into StreamBase Studio, Studio copies the
sample project's files to your Studio workspace. StreamBase Systems
recommends that you use the workspace copy of the sample, especially on UNIX, where
you may not have write access to /opt/streambase. In
the default installation, the path to this sample in your Studio workspace is:
UNIX: ~/streambase-studio-n.m-workspace/sample_operator Windows XP: C:\Documents and Settings\username\My Documents\StreamBase Studion.mWorkspace\ sample_operator Windows Vista: C:\Users\username\Documents\StreamBase Studion.mWorkspace\ sample_operator
-
In the Package Explorer, double-click to open the
HeartbeatMerge.sbappapplication. Make sure the application is the currently active tab in the EventFlow Editor. -
Click the
Run button. This opens the
SB Test/Debug perspective and starts the application.
-
In the Application Output view, select the
MergedFeedsstream. No output is displayed at this point, but the Output View is prepared to receive output. This view will eventually show the output of the application. -
In the Manual Input view, select the
Feed1input stream. -
Enter
IBMfor symbol,83.17for price, and2005-11-02 12:00:00for time. -
Click Send Data. You should not see any output in the Output View yet. The tuple you just sent is currently buffered in the Merge operator.
-
Select the
Feed2input stream. -
Enter
MSFTfor symbol,24.89for price, and2005-11-02 12:00:30for time. -
Click Send Data. You should now see the IBM tuple in Output View.
-
Wait a few moments. Within 30 seconds, look for the MSFT tuple to appear in the Output View. If this application had not made use of Heartbeat operators, the MSFT tuple would have remained inside the Merge operator until
Feed1received a tuple with a timestamp greater than or equal to 2005-11-02 12:00:30. Instead, the Heartbeat operators continually emit such tuples, thereby preventing the Merge operator from getting stuck. You can see this for yourself by selecting theFeed1WithHeartbeatsoutput stream in the Output View. -
When done, press F9 or click the
Stop Running Application button.
This section describes how to run the sample in UNIX terminal windows or Windows command prompt windows. On Windows, be sure to use the StreamBase Command Prompt from the Start menu as described in the Test/Debug Guide, not the default command prompt.
-
Open three terminal windows on UNIX, or three StreamBase Command Prompts on Windows. In each window, navigate to the directory where the sample is installed, or to your workspace copy of the sample, as described above.
-
In window 1, type:
sbd HeartbeatAggregate.sbappThe window shows
notice[StreamBaseServer] listening on port 10000. -
In window 2, type:
sbc dequeue TicCountsNo output is displayed at this point, but the dequeuer is prepared to receive output of the application.
-
In window 3, type:
echo IBM,84.17,2005-11-02 12:00:00 | sbc enqueue Feed1You should not see any output in the dequeuer window yet. The tuple you just sent is currently buffered in the Merge operator.
-
In window 3, type:
echo MSFT,24.89,2005-11-02 12:00:30 | sbc enqueue Feed2Wait a few moments. Within 30 seconds, look for the MSFT tuple to appear in the dequeuer window. If this application had not made use of Heartbeat operators, the MSFT tuple would have remained stuck inside the Merge operator until Feed1 received a tuple with a timestamp greater than or equal to 2005-11-02 12:00:30. Instead, the Heartbeat operators continually emit such tuples, thereby preventing the Merge operator from getting stuck. You can see this for yourself by dequeuing the Feed1WithHeartbeats output stream.
-
In window 2, type: Ctrl-Z (Windows) or Ctrl-D (UNIX) to exit the sbc session.
-
In window 3, type the following command to terminate the server and dequeuer:
sbadmin shutdown
-
Launched StreamBase Studio
-
Created (or subsequently used) the
sample_operatorproject. -
From the top menu, in the SB Authoring perspective, selected → → . Selected the
sample_operatorproject and enteredHeartbeatMergefor the diagram name. -
Added two input streams:
-
Dragged an input stream from the palette to the canvas.
-
Clicked the new input stream to open its properties.
-
On the Input Stream Properties view General tab, added Name:
Feed1 -
On the Edit Schema tab, added:
-
Field Name:
symbol,Type:string, Size:4 -
Field Name:
price, Type:double, Size:8 -
Field Name:
time, Type:timestamp, Size:8
-
-
On the canvas, copied and pasted
Feed1to create the second input stream below it. -
Selected the new input stream. On the Properties view's General tab, changed the input stream name to
Feed2.
-
-
Added two Map operators:
-
Dragged a Map operator from the palette to the canvas.
-
Dragged a connector from the
Feed1input stream to the Map operator. -
Clicked the new Map operator. On the Map Properties view General tab, added Name:
TagDataTuples1 -
On the Output Settings tab:
-
Selected the
All input fields with specified changesoption. -
Clicked the button and added the following
Output Fieldsrow:Action:
Add, Field Name:isDataTuple, Expression:true
-
-
Copied and pasted
TagDataTuples1to create the second Map operator below it. -
Dragged a connector from the
Feed2input stream to theTagDataTuples2Map operator. -
Selected the new Map operator. On the Properties view General tab, changed the operator name to
TagDataTuples2.
-
-
Added two Heartbeat operators:
-
Dragged a Heartbeat operator from the palette to the canvas.
-
Connected the
TagDataTuples1Map operator to the new Heartbeat operator. -
Clicked the Heartbeat operator. On the Properties view General tab, added Name:
Heartbeat1 -
On the Output Settings tab, added:
-
Tuple output interval:
0.5(seconds) -
Input tuple timestamp field:
time -
Maximum delay:
2(seconds)
-
-
Copied and pasted
Heartbeat1to create the second Heartbeat operator below it. -
Connected the
TagDataTuples2Map operator to theHeartbeat2operator. -
Selected the new Heartbeat operator. On the Properties view's General tab, changed the operator name to
Heartbeat2.
-
-
Added a Merge operator:
-
Dragged a Merge operator from the palette to the canvas. Placed it after the Heartbeat2 operator.
-
Connected the
Heartbeat1operator to the top input (left) port of the Merge operator. -
Connected the
Heartbeat2operator to the bottom input port of the Merge operator. -
Selected the Merge operator. On the Properties view's Merge Settings tab, added
Field by which to merge tuples:time.
-
-
Added a Filter operator:
-
Dragged a Filter operator from the palette to the canvas, after the Merge1 operator.
-
Connected the
Merge1operator to the Filter operator. -
Selected the Filter operator. On the Properties view's General tab, added Name:
DropHeartbeatTuples. -
On the Predicate Settings tab, clicked the button and added the following
Predicatesrow:Output Port:
1, Predicate:isDataTuple
-
-
Added two output streams:
-
Dragged an output stream from the palette to the canvas, placing it after the
Heartbeat1operator. -
Drew a second connection from the
Heartbeat1operator to the output stream (adding to the existing connection fromHeartbeat1toMerge1). -
Selected the new output stream. On the Properties view's General tab, added Name:
Feed1WithHeartbeats -
Dragged a second output stream from the palette to the canvas, placing it after the
DropHeartbeatTuplesFilter operator. -
Connected the Filter operator to the second output stream.
-
Selected the new output stream. On the Properties view's General tab, added Name:
MergedFeeds
-
