This topic explains how the Gather operator works and the actions you can take on its Properties view.
The Gather operator receives input from two or more streams and concatenates tuples that share the same key value. The Gather operator is often used when a tuple needs to be split into multiple branches of an application, and then rejoined later on for further processing.
The Gather operator releases a new tuple, on a single output stream, that can be a function of any of the input tuple fields. By default, Gather buffers input tuples for each value of the key field until that value is seen on each of the input streams; only then is the output tuple released. Matching tuples can arrive in any order (input need not be synchronized), and tuples are emitted in the order that they are fully matched. You can change the default behavior by setting a timeout interval.
Avoid duplicate keys or use them with caution, because they can have unexpected results. If two or more tuples with duplicate keys are received on the same stream before the first is emitted, the second tuple replaces the first in the buffer (unless timeout is enabled, in which case, the incoming tuple is ignored).
Also, keep in mind when designing Gather operations that unmatched input tuples remain buffered at run time. Gather operations with large numbers of unmatched tuples can cause the buffer size to become excessive.
This topic describes how to set Gather operator properties in the Properties View, and traces the execution of a gather operation.
Name: Use this field to specify or change the component's name, which must be unique in the application. The name must contain only alphabetic characters, numbers, and underscores, and no hyphens or other special characters. The first character must be alphabetic or an underscore.
Enable Error Output Port: Select this check box to add an Error Port to this component. In the EventFlow canvas, the Error Port shows as a red output port, always the last port for the component. See Using Error Ports and Error Streams to learn about Error Ports.
Description: Optionally enter text to briefly describe the component's purpose and function. In the EventFlow canvas, you can see the description by pressing Ctrl while the component's tooltip is displayed.
In the Gather Settings tab, specify the number of incoming streams that will participate in the gather, and the field by which to gather.
You can limit buffering by selecting the Enable timeout option. Specify a timeout interval and a field on which gathered tuples should be sorted. The field can be any data type that is orderable and differenceable. The timestamp data type is typically used. The timeout interval is measured in units appropriate for the specified field's data type. If you specify an int or double field, the timeout interval is measured in units. If you specify a timestamp field, the timeout interval is measured in seconds.
The timeout limits how long Gather continues to buffer tuples. Instead of waiting indefinitely until the key value has been seen on all the input streams, Gather first establishes the current time by listening for a tuple on each input port. It then waits until either the required gather conditions are satisfied, or until a timeout occurs, whichever occurs first. A timeout occurs when a tuple arrives whose time (compared to the initial time established earlier) exceeds the specified timeout period. When the time is updated this way, the Gather operator emits a tuple. The missing input tuples' fields are set to null when the output expressions are evaluated.
Enabling timeout also changes the way duplicate tuples are handled. Without timeout, if a tuple arrives whose key field matches an existing tuple on the same stream, the new tuple replaces the existing tuple. With timeout enabled, the opposite happens: the existing tuple remains and the new one is lost.
If you enable Timeout, you can also enable another option, Output arrival time of last tuple. This option adds a field named
last_time to the output schema. This field is populated
at runtime with the latest time at which an input tuple was received corresponding to
that output tuple. The
last_time field is always sorted
across output tuples, so that it can reliably be passed to Aggregate operators. To
ensure this sorted condition, Gather keeps a full timeout interval's worth of data
buffered at all times.
This means that the timeout is not guaranteed to occur when the specified time elapses, as you might expect. Instead, the timeout occurs only when the time field on all the input ports has advanced by the specified interval, ensuring that tuples can be sorted correctly.
The Output Settings tab allows you to specify the schema of the operator's output tuple. Output tuples are released when the Gather operator has seen a tuple with a specific key value on each of the streams.
The Output Settings tab contains one Input
n Fields grid for each input port specified on
the Gather Settings tab, plus an Additional Expressions
grid. In practice, Gather operators always have at least two grids, Input 1 Fields and Input 2 Fields,
corresponding to input ports 1 and 2.
Field grids and the Additional Expressions grid operate the same way they do in the Output Settings tabs for the Map and Query operators. That is, the changes you specify in this tab are applied in top-down order in two ways: top to bottom in the order of grids in the tab, and top to bottom in the order of field expressions in each grid. That is, the output tuple is assembled in the following way:
Fields in the incoming tuple on port 1, if any are specified.
Fields in the incoming tuple on port 2, if any are specified.
Fields in the incoming tuple on ports 3 through n, if any are specified.
Any additions, subtractions, or reorderings for input tuple fields in grid and port number order.
Any additions, subtractions, or reorderings for any of the above fields as determined by expressions in the Additional Expressions grid.
Use the field grids as described for the Map operator in Using Field Grids. Differences for the Gather operator are:
When an Input
nFields grid is open, the Prefix field is not blank, but is filled in for you with
input1_for Input 1 Fields,
input2_for Input 2 Fields, and so on. You can change the prefix or remove it, as your application requires. StreamBase Systems recommends leaving the default prefixes in place, or using similar prefixes, to help distinguish same-named fields on incoming streams.
When referencing fields in expression cells, use
input1.as a qualifier for fields in input port 1's tuple, use
input2.to qualify fields in input port 2's tuple, and so on.
In the following figure, the Gather operator's Output Settings tab specifies the following structure for the schema of the tuple to be emitted from the operator:
From input port 1, select only the
Symbolfield, and prefix its name with
From input port 2, select no fields.
Append an additional boolean field named
The effect of settings is reflected in the Input Streams and Output Streams sub-tabs on the right side of the Properties view:
Use the Concurrency tab to specify parallel regions for this instance of this component, or multiplicity options, or both. The Concurrency tab settings are described in Concurrency Options, and dispatch styles are described in Dispatch Styles.
Concurrency settings are not suitable for every application, and using these settings requires a thorough analysis of your application. For details, see Execution Order and Concurrency, which includes important guidelines for using the concurrency options.
Consider a simple application with a Gather operator that has two inputs. Both input streams receive tuples containing a stock symbol and a volume of shares. The inputs can be asynchronous: tuples may arrive at different rates on each stream. Using the stock symbol field as the key, the Gather operator sums the volume of matching stocks from the two input streams, and emits a tuple listing the stock's symbol and its summed volume. The timeout option is not enabled. Let's trace a series of tuples flowing into the Gather operator in this application.
In the diagrams below, each step depicts a new tuple being enqueued to the Gather operator, and any output that results. The contents of the two operator buffers are shown after each tuple arrives and after any tuples are released.
A tuple enters the Gather operator on the first input port and is stored in the buffer:
A second tuple enters on the first port. Nothing is emitted by the Gather operator because both buffered tuples entered on the same input port, and because the symbols don't match:
A tuple enters on the second port. This time, the conditions for the gather operation are met: the key field is matched by tuples on both input ports. In the gather operation, the volumes of the AAPL tuples are summed, the result is emitted in an output tuple, and the matching tuples are released from the buffer.
This step introduces the problem that can be caused by duplicate keys on the same stream. The new tuple's key field (IBM) matches the key field of an existing tuple on the same port. As you should expect, there is no output because the match did not occur on both ports. But notice in the figure that the new tuple replaces the existing tuple. In the next step, we will see how this affects the Gather result.
A tuple arriving on the second port matches a tuple on the first port, and Gather releases the result. Notice that the value of the output tuple is the sum of the last two key field values (289 and 350). The earlier IBM value of 300 will not be used: it is effectively lost in this Gather operation.
To summarize this example:
No output is emitted until step 3, when the same key has been seen on both input ports. The output is the sum of the AAPL volumes that arrived on the Gather operator's first and second input ports.
Steps 4 and 5 show the potential side effect of duplicate keys on the same stream.
This example traces the execution of a Gather operator that has the timeout option set. The input schema has three fields: a stock symbol, its price, and its timestamp. The Gather operator has two input ports. Each accepts stocks with different values (this might be accomplished by using a Filter operator upstream from the Gather operator). The first input port receives Stocks with values equal to or greater than 200; the second port contains stocks worth less than 200. Like the first example, this operation gathers on the symbol field. Timeout is enabled with a timeout interval of 10 seconds, ordered by the timestamp field. Notice that the Output arrival time of last tuple option is not selected.
As in the first example, each step shows a new tuple being enqueued to the Gather operator, and any output that results. In addition, these figures have an additional timestamp field at the end.
A tuple enters the Gather operator on the second input port and is stored in the buffer. This arrival establishes the start time on the second port:
A tuple enters the Gather operator on the first input port and is stored in the buffer. Notice that this tuple arrives 11 seconds after the first tuple. Thus, the specified timeout interval has elapsed without a gather occurring:
The third tuple arrives on the second port, and the Gather operator emits two output tuples:
Why are two tuples emitted?
The input tuple on Port 1 triggers a gather because its key field (IBM) matches the key field on a tuple in the Port 2 buffer.
When the gather occurs, the timeout interval has already elapsed (the time values of the first and second tuples is greater than 10). This is why a second tuple is emitted with the missing gather fields set to null.
If the third input had not matched on the key field, only the timeout tuple with the null fields would have been emitted.
This example traces the execution of a Gather operator that has both the timeout and Output arrival time of last tuple options set. With these options, we are interested not only in the gather result, but also in the arrival time of the last tuple. Otherwise, the application in this example is identical to the one in Example 2: Gather with Timeout. Each step of this example shows a new tuple being enqueued to the Gather operator, and any output that results.
A tuple enters the Gather operator on the first input port and is stored in the buffer. This tuple establishes the start time on the first port:
A tuple enters the Gather operator on the first input port and is stored in the buffer. Notice that this tuple meets the normal criteria for a gather to occur: the symbols are matched on both input ports, and both tuples arrived within the timeout interval.
Why is no tuple emitted? The
last_timevalue cannot be established until the timeout period expires, and the time is then updated on all of the Gather operator's input ports. So at this point, more input must arrive before a gather operation can occur.
A third tuple arrives on the second port. Notice that the elapsed time since the last gather tuple (15 seconds) exceeds the timeout interval. This establishes the elapsed time on the second port for purposes of the timeout:
When the fourth tuple arrives on the first port, Gather is able to establish the elapsed time since the last gather event. The operator emits a tuple with the gather result and the time value for the second tuple, which was the last tuple in the gather operation: